Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F1842480
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
70 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/crmd/callbacks.c b/crmd/callbacks.c
index 9e7dc5d305..f98d4a4145 100644
--- a/crmd/callbacks.c
+++ b/crmd/callbacks.c
@@ -1,651 +1,652 @@
/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <crm_internal.h>
#include <sys/param.h>
#include <crm/crm.h>
#include <string.h>
#include <crmd_fsa.h>
#include <heartbeat.h>
#include <crm/msg_xml.h>
#include <crm/common/xml.h>
#include <crm/common/msg.h>
#include <crm/common/cluster.h>
#include <crm/cib.h>
#include <crmd.h>
#include <crmd_messages.h>
#include <crmd_callbacks.h>
xmlNode *find_xml_in_hamessage(const xmlNode * msg);
void crmd_ha_connection_destroy(gpointer user_data);
void crmd_ha_msg_filter(xmlNode *msg);
/* From join_dc... */
extern gboolean check_join_state(
enum crmd_fsa_state cur_state, const char *source);
/* #define MAX_EMPTY_CALLBACKS 20 */
/* int empty_callbacks = 0; */
#define trigger_fsa(source) crm_debug_3("Triggering FSA: %s", __FUNCTION__); \
G_main_set_trigger(source);
#if SUPPORT_HEARTBEAT
gboolean
crmd_ha_msg_dispatch(ll_cluster_t *cluster_conn, gpointer user_data)
{
IPC_Channel *channel = NULL;
gboolean stay_connected = TRUE;
crm_debug_3("Invoked");
if(cluster_conn != NULL) {
channel = cluster_conn->llc_ops->ipcchan(cluster_conn);
}
CRM_CHECK(cluster_conn != NULL, ;);
CRM_CHECK(channel != NULL, ;);
if(channel != NULL && IPC_ISRCONN(channel)) {
if(cluster_conn->llc_ops->msgready(cluster_conn) == 0) {
crm_debug_2("no message ready yet");
}
/* invoke the callbacks but dont block */
cluster_conn->llc_ops->rcvmsg(cluster_conn, 0);
}
if (channel == NULL || channel->ch_status != IPC_CONNECT) {
if(is_set(fsa_input_register, R_HA_DISCONNECTED) == FALSE) {
crm_crit("Lost connection to heartbeat service.");
} else {
crm_info("Lost connection to heartbeat service.");
}
trigger_fsa(fsa_source);
stay_connected = FALSE;
}
return stay_connected;
}
#endif
void
crmd_ha_connection_destroy(gpointer user_data)
{
crm_debug_3("Invoked");
if(is_set(fsa_input_register, R_HA_DISCONNECTED)) {
/* we signed out, so this is expected */
crm_info("Heartbeat disconnection complete");
return;
}
crm_crit("Lost connection to heartbeat service!");
register_fsa_input(C_HA_DISCONNECT, I_ERROR, NULL);
trigger_fsa(fsa_source);
}
void
crmd_ha_msg_filter(xmlNode *msg)
{
ha_msg_input_t *new_input = NULL;
const char *from = crm_element_value(msg, F_ORIG);
const char *seq = crm_element_value(msg, F_SEQ);
const char *op = crm_element_value(msg, F_CRM_TASK);
const char *sys_to = crm_element_value(msg, F_CRM_SYS_TO);
const char *sys_from = crm_element_value(msg, F_CRM_SYS_FROM);
if(safe_str_eq(sys_to, CRM_SYSTEM_DC) && AM_I_DC == FALSE) {
crm_debug_2("Ignoring message for the DC [F_SEQ=%s]", seq);
+ free_xml(msg);
return;
} else if(safe_str_eq(sys_from, CRM_SYSTEM_DC)) {
if(AM_I_DC && safe_str_neq(from, fsa_our_uname)) {
crm_err("Another DC detected: %s (op=%s)", from, op);
/* make sure the election happens NOW */
if(fsa_state != S_ELECTION) {
new_input = new_ha_msg_input(msg);
register_fsa_error_adv(C_FSA_INTERNAL, I_ELECTION, NULL,
new_input, __FUNCTION__);
}
} else {
crm_debug_2("Processing DC message from %s [F_SEQ=%s]", from, seq);
}
}
if(new_input == NULL) {
crm_log_xml(LOG_MSG, "HA[inbound]", msg);
- new_input = new_ha_msg_input(msg);
- route_message(C_HA_MESSAGE, new_input);
+ route_message(C_HA_MESSAGE, msg);
+
+ } else {
+ free_xml(msg);
+ crm_free(new_input);
}
- crm_free(new_input);
trigger_fsa(fsa_source);
}
#if SUPPORT_HEARTBEAT
void
crmd_ha_msg_callback(HA_Message *hamsg, void* private_data)
{
int level = LOG_DEBUG;
oc_node_t *from_node = NULL;
xmlNode *msg = convert_ha_message(NULL, hamsg, __FUNCTION__);
const char *from = crm_element_value(msg, F_ORIG);
const char *op = crm_element_value(msg, F_CRM_TASK);
const char *sys_from = crm_element_value(msg, F_CRM_SYS_FROM);
- CRM_CHECK(from != NULL, crm_log_xml_err(msg, "anon"); return);
+ CRM_CHECK(from != NULL, crm_log_xml_err(msg, "anon"); goto bail);
crm_debug_2("HA[inbound]: %s from %s", op, from);
if(crm_peer_cache == NULL || crm_active_members() == 0) {
crm_debug("Ignoring HA messages until we are"
" connected to the CCM (%s op from %s)", op, from);
crm_log_xml(LOG_MSG, "HA[inbound]: Ignore (No CCM)", msg);
goto bail;
}
from_node = g_hash_table_lookup(crm_peer_cache, from);
if(from_node == NULL) {
if(safe_str_eq(op, CRM_OP_VOTE)) {
level = LOG_WARNING;
} else if(AM_I_DC && safe_str_eq(op, CRM_OP_JOIN_ANNOUNCE)) {
level = LOG_WARNING;
} else if(safe_str_eq(sys_from, CRM_SYSTEM_DC)) {
level = LOG_WARNING;
}
do_crm_log(level,
"Ignoring HA message (op=%s) from %s: not in our"
" membership list (size=%d)", op, from,
crm_active_members());
crm_log_xml(LOG_MSG, "HA[inbound]: CCM Discard", msg);
} else {
crmd_ha_msg_filter(msg);
+ return;
}
bail:
free_xml(msg);
return;
}
#endif
/*
* Apparently returning TRUE means "stay connected, keep doing stuff".
* Returning FALSE means "we're all done, close the connection"
*/
gboolean
crmd_ipc_msg_callback(IPC_Channel *client, gpointer user_data)
{
int lpc = 0;
xmlNode *msg = NULL;
- ha_msg_input_t *new_input = NULL;
crmd_client_t *curr_client = (crmd_client_t*)user_data;
gboolean stay_connected = TRUE;
crm_debug_2("Invoked: %s",
curr_client->table_key);
while(IPC_ISRCONN(client)) {
if(client->ops->is_message_pending(client) == 0) {
break;
}
msg = xmlfromIPC(client, 0);
if (msg == NULL) {
break;
}
lpc++;
- new_input = new_ha_msg_input(msg);
-
crm_debug_2("Processing msg from %s", curr_client->table_key);
- crm_log_xml(LOG_DEBUG_2, "CRMd[inbound]", new_input->msg);
- if(crmd_authorize_message(new_input, curr_client)) {
- route_message(C_IPC_MESSAGE, new_input);
+ crm_log_xml(LOG_DEBUG_2, "CRMd[inbound]", msg);
+
+ if(crmd_authorize_message(msg, curr_client)) {
+ route_message(C_IPC_MESSAGE, msg);
+ } else {
+ free_xml(msg);
}
- crm_free(new_input);
- free_xml(msg);
- new_input = NULL;
msg = NULL;
if(client->ch_status != IPC_CONNECT) {
break;
}
}
crm_debug_2("Processed %d messages", lpc);
if (client->ch_status != IPC_CONNECT) {
stay_connected = FALSE;
process_client_disconnect(curr_client);
}
trigger_fsa(fsa_source);
return stay_connected;
}
extern GCHSource *lrm_source;
gboolean
lrm_dispatch(IPC_Channel *src_not_used, gpointer user_data)
{
/* ?? src == lrm_channel ?? */
ll_lrm_t *lrm = (ll_lrm_t*)user_data;
IPC_Channel *lrm_channel = lrm->lrm_ops->ipcchan(lrm);
crm_debug_3("Invoked");
lrm->lrm_ops->rcvmsg(lrm, FALSE);
if(lrm_channel->ch_status != IPC_CONNECT) {
if(is_set(fsa_input_register, R_LRM_CONNECTED)) {
crm_crit("LRM Connection failed");
register_fsa_input(C_FSA_INTERNAL, I_ERROR, NULL);
clear_bit_inplace(fsa_input_register, R_LRM_CONNECTED);
} else {
crm_info("LRM Connection disconnected");
}
lrm_source = NULL;
return FALSE;
}
return TRUE;
}
extern gboolean process_lrm_event(lrm_op_t *op);
void
lrm_op_callback(lrm_op_t* op)
{
CRM_CHECK(op != NULL, return);
process_lrm_event(op);
}
void
crmd_ha_status_callback(const char *node, const char *status, void *private)
{
xmlNode *update = NULL;
crm_node_t *member = NULL;
crm_notice("Status update: Node %s now has status [%s] (DC=%s)",
node, status, AM_I_DC?"true":"false");
member = g_hash_table_lookup(crm_peer_cache, node);
if(member == NULL) {
/* Make sure it is created so crm_update_peer_proc() succeeds */
const char *uuid = get_uuid(node);
member = crm_update_peer(0, 0, -1, 0, uuid, node, NULL, NULL);
}
if(safe_str_eq(status, PINGSTATUS)) {
return;
}
if(safe_str_eq(status, DEADSTATUS)) {
/* this node is toast */
crm_update_peer_proc(node, crm_proc_ais, OFFLINESTATUS);
if(AM_I_DC) {
update = create_node_state(
node, DEADSTATUS, XML_BOOLEAN_NO, OFFLINESTATUS,
CRMD_JOINSTATE_DOWN, NULL, TRUE, __FUNCTION__);
}
} else {
crm_update_peer_proc(node, crm_proc_ais, ONLINESTATUS);
if(AM_I_DC) {
update = create_node_state(
node, ACTIVESTATUS, NULL, NULL, NULL, NULL,
FALSE, __FUNCTION__);
}
}
trigger_fsa(fsa_source);
if(update != NULL) {
fsa_cib_anon_update(
XML_CIB_TAG_STATUS, update, cib_scope_local|cib_quorum_override);
free_xml(update);
}
}
void
crmd_client_status_callback(const char * node, const char * client,
const char * status, void * private)
{
const char *join = NULL;
crm_node_t *member = NULL;
xmlNode *update = NULL;
gboolean clear_shutdown = FALSE;
crm_debug_3("Invoked");
if(safe_str_neq(client, CRM_SYSTEM_CRMD)) {
return;
}
if(safe_str_eq(status, JOINSTATUS)){
status = ONLINESTATUS;
clear_shutdown = TRUE;
} else if(safe_str_eq(status, LEAVESTATUS)){
status = OFFLINESTATUS;
join = CRMD_JOINSTATE_DOWN;
/* clear_shutdown = TRUE; */
}
set_bit_inplace(fsa_input_register, R_PEER_DATA);
crm_notice("Status update: Client %s/%s now has status [%s] (DC=%s)",
node, client, status, AM_I_DC?"true":"false");
if(safe_str_eq(status, ONLINESTATUS)) {
/* remove the cached value in case it changed */
crm_debug_2("Uncaching UUID for %s", node);
unget_uuid(node);
}
member = g_hash_table_lookup(crm_peer_cache, node);
if(member == NULL) {
/* Make sure it is created so crm_update_peer_proc() succeeds */
const char *uuid = get_uuid(node);
member = crm_update_peer(0, 0, -1, 0, uuid, node, NULL, NULL);
}
crm_update_peer_proc(node, crm_proc_crmd, status);
if(is_set(fsa_input_register, R_CIB_CONNECTED) == FALSE) {
return;
} else if(fsa_state == S_STOPPING) {
return;
}
if(safe_str_eq(node, fsa_our_dc) && safe_str_eq(status, OFFLINESTATUS)){
/* did our DC leave us */
crm_info("Got client status callback - our DC is dead");
register_fsa_input(C_CRMD_STATUS_CALLBACK, I_ELECTION, NULL);
} else if(AM_I_DC == FALSE) {
crm_info("Not the DC");
} else {
crm_debug_3("Got client status callback");
update = create_node_state(node, NULL, NULL, status, join,
NULL, clear_shutdown, __FUNCTION__);
if(safe_str_eq(status, ONLINESTATUS)){
crm_xml_add(update, XML_CIB_ATTR_REPLACE, XML_CIB_TAG_LRM","XML_TAG_TRANSIENT_NODEATTRS",");
}
fsa_cib_anon_update(
XML_CIB_TAG_STATUS, update, cib_scope_local|cib_quorum_override);
free_xml(update);
if(safe_str_eq(status, OFFLINESTATUS)) {
erase_node_from_join(node);
check_join_state(fsa_state, __FUNCTION__);
}
}
trigger_fsa(fsa_source);
}
void
crmd_ipc_connection_destroy(gpointer user_data)
{
GCHSource *source = NULL;
crmd_client_t *client = user_data;
/* Calling this function on an _active_ connection results in:
* crmd_ipc_connection_destroy (callbacks.c:431)
* -> G_main_del_IPC_Channel (GSource.c:478)
* -> g_source_unref
* -> G_CH_destroy_int (GSource.c:647)
* -> crmd_ipc_connection_destroy (callbacks.c:437)\
*
* A better alternative is to call G_main_del_IPC_Channel() directly
*/
if(client == NULL) {
crm_debug_4("No client to delete");
return;
}
crm_debug_2("Disconnecting client %s (%p)", client->table_key, client);
source = client->client_source;
client->client_source = NULL;
if(source != NULL) {
crm_debug_3("Deleting %s (%p) from mainloop",
client->table_key, source);
G_main_del_IPC_Channel(source);
}
crm_free(client->table_key);
crm_free(client->sub_sys);
crm_free(client->uuid);
crm_free(client);
return;
}
gboolean
crmd_client_connect(IPC_Channel *client_channel, gpointer user_data)
{
crm_debug_3("Invoked");
if (client_channel == NULL) {
crm_err("Channel was NULL");
} else if (client_channel->ch_status == IPC_DISCONNECT) {
crm_err("Channel was disconnected");
} else {
crmd_client_t *blank_client = NULL;
crm_debug_3("Channel connected");
crm_malloc0(blank_client, sizeof(crmd_client_t));
CRM_ASSERT(blank_client != NULL);
crm_debug_2("Created client: %p", blank_client);
client_channel->ops->set_recv_qlen(client_channel, 1024);
client_channel->ops->set_send_qlen(client_channel, 1024);
blank_client->client_channel = client_channel;
blank_client->sub_sys = NULL;
blank_client->uuid = NULL;
blank_client->table_key = NULL;
blank_client->client_source =
G_main_add_IPC_Channel(
G_PRIORITY_LOW, client_channel,
FALSE, crmd_ipc_msg_callback,
blank_client, crmd_ipc_connection_destroy);
}
return TRUE;
}
#if SUPPORT_HEARTBEAT
static gboolean fsa_have_quorum = FALSE;
gboolean ccm_dispatch(int fd, gpointer user_data)
{
int rc = 0;
oc_ev_t *ccm_token = (oc_ev_t*)user_data;
gboolean was_error = FALSE;
crm_debug_3("Invoked");
rc = oc_ev_handle_event(ccm_token);
if(rc != 0) {
if(is_set(fsa_input_register, R_CCM_DISCONNECTED) == FALSE) {
/* we signed out, so this is expected */
register_fsa_input(C_CCM_CALLBACK, I_ERROR, NULL);
crm_err("CCM connection appears to have failed: rc=%d.",
rc);
}
was_error = TRUE;
}
trigger_fsa(fsa_source);
return !was_error;
}
void
crmd_ccm_msg_callback(
oc_ed_t event, void *cookie, size_t size, const void *data)
{
gboolean update_cache = FALSE;
const oc_ev_membership_t *membership = data;
gboolean update_quorum = FALSE;
gboolean trigger_transition = FALSE;
crm_debug_3("Invoked");
CRM_ASSERT(data != NULL);
crm_info("Quorum %s after event=%s (id=%d)",
ccm_have_quorum(event)?"(re)attained":"lost",
ccm_event_name(event), membership->m_instance);
if(crm_peer_seq > membership->m_instance) {
crm_err("Membership instance ID went backwards! %llu->%d",
crm_peer_seq, membership->m_instance);
CRM_ASSERT(crm_peer_seq <= membership->m_instance);
return;
}
/*
* OC_EV_MS_NEW_MEMBERSHIP: membership with quorum
* OC_EV_MS_MS_INVALID: membership without quorum
* OC_EV_MS_NOT_PRIMARY: previous membership no longer valid
* OC_EV_MS_PRIMARY_RESTORED: previous membership restored
* OC_EV_MS_EVICTED: the client is evicted from ccm.
*/
switch(event) {
case OC_EV_MS_NEW_MEMBERSHIP:
case OC_EV_MS_INVALID:
update_cache = TRUE;
update_quorum = TRUE;
break;
case OC_EV_MS_NOT_PRIMARY:
break;
case OC_EV_MS_PRIMARY_RESTORED:
update_cache = TRUE;
crm_peer_seq = membership->m_instance;
if(AM_I_DC && need_transition(fsa_state)) {
trigger_transition = TRUE;
}
break;
case OC_EV_MS_EVICTED:
update_quorum = TRUE;
register_fsa_input(C_FSA_INTERNAL, I_STOP, NULL);
crm_err("Shutting down after CCM event: %s",
ccm_event_name(event));
break;
default:
crm_err("Unknown CCM event: %d", event);
}
if(update_quorum) {
crm_have_quorum = ccm_have_quorum(event);
crm_update_quorum(crm_have_quorum);
if(crm_have_quorum == FALSE) {
/* did we just loose quorum? */
if(fsa_have_quorum && need_transition(fsa_state)) {
crm_info("Quorum lost: triggering transition (%s)",
ccm_event_name(event));
trigger_transition = TRUE;
}
}
}
if(update_cache) {
crm_debug_2("Updating cache after event %s", ccm_event_name(event));
do_ccm_update_cache(C_CCM_CALLBACK, fsa_state, event, data, NULL);
} else if(event != OC_EV_MS_NOT_PRIMARY) {
crm_peer_seq = membership->m_instance;
register_fsa_action(A_TE_CANCEL);
}
oc_ev_callback_done(cookie);
return;
}
#endif
void
crmd_cib_connection_destroy(gpointer user_data)
{
crm_debug_3("Invoked");
trigger_fsa(fsa_source);
if(is_set(fsa_input_register, R_CIB_CONNECTED) == FALSE) {
crm_info("Connection to the CIB terminated...");
return;
}
/* eventually this will trigger a reconnect, not a shutdown */
crm_err("Connection to the CIB terminated...");
register_fsa_input(C_FSA_INTERNAL, I_ERROR, NULL);
clear_bit_inplace(fsa_input_register, R_CIB_CONNECTED);
return;
}
longclock_t fsa_start = 0;
longclock_t fsa_stop = 0;
longclock_t fsa_diff = 0;
gboolean
crm_fsa_trigger(gpointer user_data)
{
unsigned int fsa_diff_ms = 0;
if(fsa_diff_max_ms > 0) {
fsa_start = time_longclock();
}
crm_debug_2("Invoked (queue len: %d)", g_list_length(fsa_message_queue));
s_crmd_fsa(C_FSA_INTERNAL);
crm_debug_2("Exited (queue len: %d)", g_list_length(fsa_message_queue));
if(fsa_diff_max_ms > 0) {
fsa_stop = time_longclock();
fsa_diff = sub_longclock(fsa_stop, fsa_start);
fsa_diff_ms = longclockto_ms(fsa_diff);
if(fsa_diff_ms > fsa_diff_max_ms) {
crm_err("FSA took %dms to complete", fsa_diff_ms);
} else if(fsa_diff_ms > fsa_diff_warn_ms) {
crm_warn("FSA took %dms to complete", fsa_diff_ms);
}
}
return TRUE;
}
diff --git a/crmd/crmd_messages.h b/crmd/crmd_messages.h
index 2a512a71e7..8a93809120 100644
--- a/crmd/crmd_messages.h
+++ b/crmd/crmd_messages.h
@@ -1,114 +1,113 @@
/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef XML_CRM_MESSAGES__H
#define XML_CRM_MESSAGES__H
#include <crm/crm.h>
#include <crm/common/ipc.h>
#include <crm/common/xml.h>
#include <crmd_fsa.h>
extern void *fsa_typed_data_adv(
fsa_data_t *fsa_data, enum fsa_data_type a_type, const char *caller);
#define fsa_typed_data(x) fsa_typed_data_adv(msg_data, x, __FUNCTION__)
extern void register_fsa_error_adv(
enum crmd_fsa_cause cause, enum crmd_fsa_input input,
fsa_data_t *cur_data, void *new_data, const char *raised_from);
#define register_fsa_error(cause, input, new_data) register_fsa_error_adv(cause, input, msg_data, new_data, __FUNCTION__)
extern int register_fsa_input_adv(
enum crmd_fsa_cause cause, enum crmd_fsa_input input,
void *data, long long with_actions,
gboolean prepend, const char *raised_from);
extern void fsa_dump_queue(int log_level);
-extern void route_message(enum crmd_fsa_cause cause, ha_msg_input_t *input);
+extern void route_message(enum crmd_fsa_cause cause, xmlNode *input);
#define crmd_fsa_stall(cur_input) if(cur_input != NULL) { \
register_fsa_input_adv( \
((fsa_data_t*)cur_input)->fsa_cause, I_WAIT_FOR_EVENT, \
((fsa_data_t*)cur_input)->data, action, TRUE, __FUNCTION__); \
} else { \
register_fsa_input_adv( \
C_FSA_INTERNAL, I_WAIT_FOR_EVENT, \
NULL, action, TRUE, __FUNCTION__); \
} \
#define register_fsa_input(cause, input, data) register_fsa_input_adv(cause, input, data, A_NOTHING, FALSE, __FUNCTION__)
#define register_fsa_action(action) { \
fsa_actions |= action; \
if(fsa_source) { \
G_main_set_trigger(fsa_source); \
} \
crm_debug("%s added action %s to the FSA", \
__FUNCTION__, fsa_action2string(action)); \
}
#define register_fsa_input_before(cause, input, data) register_fsa_input_adv(cause, input, data, A_NOTHING, TRUE, __FUNCTION__)
#define register_fsa_input_later(cause, input, data) register_fsa_input_adv(cause, input, data, A_NOTHING, FALSE, __FUNCTION__)
void delete_fsa_input(fsa_data_t *fsa_data);
GListPtr put_message(fsa_data_t *new_message);
fsa_data_t *get_message(void);
gboolean is_message(void);
gboolean have_wait_message(void);
extern gboolean relay_message(
xmlNode *relay_message, gboolean originated_locally);
extern gboolean crmd_ipc_msg_callback(IPC_Channel *client, gpointer user_data);
extern void process_message(
xmlNode *msg, gboolean originated_locally,const char *src_node_name);
extern gboolean crm_dc_process_message(xmlNode *whole_message,
xmlNode *action,
const char *host_from,
const char *sys_from,
const char *sys_to,
const char *op,
gboolean dc_mode);
extern gboolean send_msg_via_ha(xmlNode *msg);
extern gboolean send_msg_via_ipc(xmlNode *msg, const char *sys);
extern gboolean add_pending_outgoing_reply(const char *originating_node_name,
const char *crm_msg_reference,
const char *sys_to,
const char *sys_from);
-extern gboolean crmd_authorize_message(
- ha_msg_input_t *client_msg, crmd_client_t *curr_client);
+extern gboolean crmd_authorize_message(xmlNode *client_msg, crmd_client_t *curr_client);
extern gboolean send_request(xmlNode *msg, char **msg_reference);
-extern enum crmd_fsa_input handle_message(ha_msg_input_t *stored_msg);
+extern enum crmd_fsa_input handle_message(xmlNode *stored_msg);
extern void lrm_op_callback(lrm_op_t* op);
extern void msg_queue_helper(void);
#endif
diff --git a/crmd/election.c b/crmd/election.c
index 6ff0249b19..c3c72da65c 100644
--- a/crmd/election.c
+++ b/crmd/election.c
@@ -1,453 +1,455 @@
/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <crm_internal.h>
#include <heartbeat.h>
#include <crm/cib.h>
#include <crm/msg_xml.h>
#include <crm/common/xml.h>
#include <crm/common/cluster.h>
#include <crm/crm.h>
#include <crmd_fsa.h>
#include <crmd_messages.h>
#include <crmd_callbacks.h>
#include <clplumbing/Gmain_timeout.h>
#include <clplumbing/cl_uuid.h>
#include <ha_version.h>
GHashTable *voted = NULL;
uint highest_born_on = -1;
static int current_election_id = 1;
const char *get_hg_version(void);
const char *get_hg_version(void)
{
/* limit this #define's use to a single file to avoid rebuilding more than necessary */
return HA_HG_VERSION;
}
/* A_ELECTION_VOTE */
void
do_election_vote(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input,
fsa_data_t *msg_data)
{
gboolean not_voting = FALSE;
xmlNode *vote = NULL;
/* don't vote if we're in one of these states or wanting to shut down */
switch(cur_state) {
case S_RECOVERY:
case S_STOPPING:
case S_TERMINATE:
crm_warn("Not voting in election, we're in state %s",
fsa_state2string(cur_state));
not_voting = TRUE;
break;
default:
break;
}
if(not_voting == FALSE) {
if(is_set(fsa_input_register, R_STARTING)) {
not_voting = TRUE;
}
}
if(not_voting) {
if(AM_I_DC) {
register_fsa_input(C_FSA_INTERNAL, I_RELEASE_DC, NULL);
} else {
register_fsa_input(C_FSA_INTERNAL, I_PENDING, NULL);
}
return;
}
vote = create_request(
CRM_OP_VOTE, NULL, NULL,
CRM_SYSTEM_CRMD, CRM_SYSTEM_CRMD, NULL);
current_election_id++;
crm_xml_add(vote, F_CRM_ELECTION_OWNER, fsa_our_uuid);
crm_xml_add_int(vote, F_CRM_ELECTION_ID, current_election_id);
send_request(vote, NULL);
crm_debug("Destroying voted hash");
g_hash_table_destroy(voted);
voted = NULL;
if(cur_state == S_ELECTION || cur_state == S_RELEASE_DC) {
crm_timer_start(election_timeout);
} else if(cur_state != S_INTEGRATION) {
crm_err("Broken? Voting in state %s",
fsa_state2string(cur_state));
}
return;
}
char *dc_hb_msg = NULL;
int beat_num = 0;
gboolean
do_dc_heartbeat(gpointer data)
{
#if 0
fsa_timer_t *timer = (fsa_timer_t *)data;
crm_debug_3("Sending DC Heartbeat %d", beat_num);
xmlNode *msg = ha_msg_new(5);
crm_xml_add(msg, F_TYPE, T_CRM);
crm_xml_add(msg, F_SUBTYPE, XML_ATTR_REQUEST);
crm_xml_add(msg, F_CRM_SYS_TO, CRM_SYSTEM_CRMD);
crm_xml_add(msg, F_CRM_SYS_FROM, CRM_SYSTEM_DC);
crm_xml_add(msg, F_CRM_TASK, CRM_OP_HBEAT);
crm_xml_add_int(msg, "dc_beat_seq", beat_num);
beat_num++;
if(send_msg_via_ha(msg) == FALSE) {
/* this is bad */
crm_timer_stop(timer); /* make it not go off again */
register_fsa_input(C_HEARTBEAT_FAILED, I_SHUTDOWN, NULL);
return FALSE;
}
#endif
return TRUE;
}
struct election_data_s
{
const char *winning_uname;
unsigned int winning_bornon;
};
static void
log_member_uname(gpointer key, gpointer value, gpointer user_data)
{
if(crm_is_member_active(value)) {
crm_err("%s: %s", (char*)user_data, (char*)key);
}
}
static void
log_node(gpointer key, gpointer value, gpointer user_data)
{
crm_err("%s: %s", (char*)user_data, (char*)key);
}
void
do_election_check(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input,
fsa_data_t *msg_data)
{
int voted_size = g_hash_table_size(voted);
int num_members = crm_active_members();
/* in the case of #voted > #members, it is better to
* wait for the timeout and give the cluster time to
* stabilize
*/
if(fsa_state != S_ELECTION) {
crm_debug("Ignore election check: we not in an election");
} else if(voted_size >= num_members) {
/* we won and everyone has voted */
crm_timer_stop(election_timeout);
register_fsa_input(C_FSA_INTERNAL, I_ELECTION_DC, NULL);
if(voted_size > num_members) {
char *data = NULL;
data = crm_strdup("member");
g_hash_table_foreach(crm_peer_cache, log_member_uname, data);
crm_free(data);
data = crm_strdup("voted");
g_hash_table_foreach(voted, log_node, data);
crm_free(data);
}
crm_debug("Destroying voted hash");
g_hash_table_destroy(voted);
voted = NULL;
} else {
crm_info("Still waiting on %d non-votes (%d total)",
num_members - voted_size, num_members);
}
return;
}
/* A_ELECTION_COUNT */
void
do_election_count_vote(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input,
fsa_data_t *msg_data)
{
int election_id = -1;
gboolean we_loose = FALSE;
static time_t last_election_loss = 0;
enum crmd_fsa_input election_result = I_NULL;
crm_node_t *our_node = NULL, *your_node = NULL;
ha_msg_input_t *vote = fsa_typed_data(fsa_dt_ha_msg);
const char *op = crm_element_value(vote->msg, F_CRM_TASK);
const char *vote_from = crm_element_value(vote->msg, F_CRM_HOST_FROM);
const char *your_version = crm_element_value(vote->msg, F_CRM_VERSION);
const char *election_owner= crm_element_value(vote->msg, F_CRM_ELECTION_OWNER);
/* if the membership copy is NULL we REALLY shouldnt be voting
* the question is how we managed to get here.
*/
+
+ CRM_CHECK(vote->msg != NULL, crm_err("Bogus data from %s", msg_data->origin); return);
CRM_CHECK(crm_peer_cache != NULL, return);
CRM_CHECK(vote_from != NULL, vote_from = fsa_our_uname);
our_node = g_hash_table_lookup(crm_peer_cache, fsa_our_uname);
your_node = g_hash_table_lookup(crm_peer_cache, vote_from);
if(your_node == NULL) {
crm_debug("Election ignore: The other side doesn't exist in CCM: %s", vote_from);
return;
}
if(voted == NULL) {
crm_debug("Created voted hash");
voted = g_hash_table_new_full(
g_str_hash, g_str_equal,
g_hash_destroy_str, g_hash_destroy_str);
}
crm_element_value_int(vote->msg, F_CRM_ELECTION_ID, &election_id);
crm_debug("Election %d, owner: %s", election_id, election_owner);
/* update the list of nodes that have voted */
if(crm_str_eq(fsa_our_uuid, election_owner, TRUE)
|| crm_str_eq(fsa_our_uname, election_owner, TRUE)) {
if(election_id == current_election_id) {
char *uname_copy = NULL;
char *op_copy = crm_strdup(op);
uname_copy = crm_strdup(your_node->uname);
g_hash_table_replace(voted, uname_copy, op_copy);
crm_info("Updated voted hash for %s to %s",
your_node->uname, op);
} else {
crm_debug("Ignore old '%s' from %s: %d vs. %d",
op, your_node->uname,
election_id, current_election_id);
return;
}
} else {
CRM_CHECK(safe_str_neq(op, CRM_OP_NOVOTE), return);
}
if(vote_from == NULL || crm_str_eq(vote_from, fsa_our_uname, TRUE)) {
/* don't count our own vote */
crm_info("Election ignore: our %s (%s)", op,crm_str(vote_from));
return;
} else if(crm_str_eq(op, CRM_OP_NOVOTE, TRUE)) {
crm_info("Election ignore: no-vote from %s", vote_from);
return;
}
crm_info("Election check: %s from %s", op, vote_from);
if(our_node == NULL || safe_str_neq(our_node->state, CRM_NODE_MEMBER)) {
crm_info("Election fail: we don't exist in CCM");
we_loose = TRUE;
} else if(compare_version(your_version, CRM_FEATURE_SET) < 0) {
crm_info("Election fail: version");
we_loose = TRUE;
} else if(compare_version(your_version, CRM_FEATURE_SET) > 0) {
crm_info("Election pass: version");
} else if(is_heartbeat_cluster() && your_node->born < our_node->born) {
crm_debug("Election fail: born_on");
we_loose = TRUE;
} else if(is_heartbeat_cluster() && your_node->born > our_node->born) {
crm_debug("Election pass: born_on");
} else if(fsa_our_uname == NULL
|| strcasecmp(fsa_our_uname, vote_from) > 0) {
crm_debug("Election fail: uname");
we_loose = TRUE;
} else {
CRM_CHECK(strcasecmp(fsa_our_uname, vote_from) != 0, ;);
crm_debug("Them: %s (born=%llu) Us: %s (born=%llu)",
vote_from, (unsigned long long)your_node->born,
fsa_our_uname, (unsigned long long)our_node->born);
/* cant happen...
* } else if(strcasecmp(fsa_our_uname, vote_from) == 0) {
*
* default...
* } else { // strcasecmp(fsa_our_uname, vote_from) < 0
* we win
*/
}
if(we_loose) {
gboolean vote_sent = FALSE;
xmlNode *novote = create_request(
CRM_OP_NOVOTE, NULL, vote_from,
CRM_SYSTEM_CRMD, CRM_SYSTEM_CRMD, NULL);
update_dc(NULL, FALSE);
crm_timer_stop(election_timeout);
crm_debug("Election lost to %s (%d)", vote_from, election_id);
if(fsa_input_register & R_THE_DC) {
crm_debug_3("Give up the DC to %s", vote_from);
election_result = I_RELEASE_DC;
} else {
crm_debug_3("We werent the DC anyway");
election_result = I_PENDING;
}
crm_xml_add(novote, F_CRM_ELECTION_OWNER, election_owner);
crm_xml_add_int(novote, F_CRM_ELECTION_ID, election_id);
vote_sent = send_request(novote, NULL);
CRM_DEV_ASSERT(vote_sent);
fsa_cib_conn->cmds->set_slave(fsa_cib_conn, cib_scope_local);
last_election_loss = time(NULL);
} else {
int dampen = 2;
time_t tm_now = time(NULL);
if(tm_now - last_election_loss < (time_t)dampen) {
crm_debug("Election ignore: We already lost an election less than %ds ago", dampen);
return;
}
last_election_loss = 0;
election_result = I_ELECTION;
crm_info("Election won over %s", vote_from);
g_hash_table_destroy(voted);
voted = NULL;
}
register_fsa_input(C_FSA_INTERNAL, election_result, NULL);
}
/* A_ELECT_TIMER_START, A_ELECTION_TIMEOUT */
/* we won */
void
do_election_timer_ctrl(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input,
fsa_data_t *msg_data)
{
}
static void
feature_update_callback(xmlNode *msg, int call_id, int rc,
xmlNode *output, void *user_data)
{
if(rc != cib_ok) {
fsa_data_t *msg_data = NULL;
register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL);
}
}
/* A_DC_TAKEOVER */
void
do_dc_takeover(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input,
fsa_data_t *msg_data)
{
int rc = cib_ok;
xmlNode *cib = NULL;
crm_info("Taking over DC status for this partition");
set_bit_inplace(fsa_input_register, R_THE_DC);
if(voted != NULL) {
crm_debug_2("Destroying voted hash");
g_hash_table_destroy(voted);
voted = NULL;
}
set_bit_inplace(fsa_input_register, R_JOIN_OK);
set_bit_inplace(fsa_input_register, R_INVOKE_PE);
fsa_cib_conn->cmds->set_slave_all(fsa_cib_conn, cib_none);
fsa_cib_conn->cmds->set_master(fsa_cib_conn, cib_none);
cib = createEmptyCib();
crm_xml_add(cib, XML_ATTR_CRM_VERSION, CRM_FEATURE_SET);
crm_xml_add(cib, XML_ATTR_CIB_REVISION, CIB_FEATURE_SET);
fsa_cib_update(XML_TAG_CIB, cib, cib_quorum_override, rc);
add_cib_op_callback(rc, FALSE, NULL, feature_update_callback);
update_attr(fsa_cib_conn, cib_none, XML_CIB_TAG_CRMCONFIG,
NULL, NULL, NULL, "dc-version", VERSION"-"HA_HG_VERSION, FALSE);
free_xml(cib);
}
/* A_DC_RELEASE */
void
do_dc_release(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input,
fsa_data_t *msg_data)
{
if(action & A_DC_RELEASE) {
crm_debug("Releasing the role of DC");
clear_bit_inplace(fsa_input_register, R_THE_DC);
} else if (action & A_DC_RELEASED) {
crm_info("DC role released");
#if 0
if( are there errors ) {
/* we cant stay up if not healthy */
/* or perhaps I_ERROR and go to S_RECOVER? */
result = I_SHUTDOWN;
}
#endif
register_fsa_input(C_FSA_INTERNAL, I_RELEASE_SUCCESS, NULL);
} else {
crm_err("Unknown action %s", fsa_action2string(action));
}
crm_debug_2("Am I still the DC? %s", AM_I_DC?XML_BOOLEAN_YES:XML_BOOLEAN_NO);
}
diff --git a/crmd/messages.c b/crmd/messages.c
index 258f8b335f..76e1005909 100644
--- a/crmd/messages.c
+++ b/crmd/messages.c
@@ -1,1203 +1,1216 @@
/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <crm_internal.h>
#include <sys/param.h>
#include <crm/crm.h>
#include <string.h>
#include <time.h>
#include <crmd_fsa.h>
#include <lrm/lrm_api.h>
#include <crm/msg_xml.h>
#include <crm/common/xml.h>
#include <crm/common/msg.h>
#include <crm/common/cluster.h>
#include <crm/cib.h>
#include <crmd.h>
#include <crmd_messages.h>
#include <crmd_lrm.h>
GListPtr fsa_message_queue = NULL;
extern void crm_shutdown(int nsig);
-enum crmd_fsa_input handle_request(ha_msg_input_t *stored_msg);
-enum crmd_fsa_input handle_response(ha_msg_input_t *stored_msg);
+enum crmd_fsa_input handle_request(xmlNode *stored_msg);
+enum crmd_fsa_input handle_response(xmlNode *stored_msg);
enum crmd_fsa_input handle_shutdown_request(xmlNode *stored_msg);
ha_msg_input_t *copy_ha_msg_input(ha_msg_input_t *orig);
gboolean ipc_queue_helper(gpointer key, gpointer value, gpointer user_data);
#ifdef MSG_LOG
# define ROUTER_RESULT(x) crm_debug_3("Router result: %s", x); \
crm_log_xml(LOG_MSG, "router.log", relay_message);
#else
# define ROUTER_RESULT(x) crm_debug_3("Router result: %s", x)
#endif
/* debug only, can wrap all it likes */
int last_data_id = 0;
void
register_fsa_error_adv(
enum crmd_fsa_cause cause, enum crmd_fsa_input input,
fsa_data_t *cur_data, void *new_data, const char *raised_from)
{
/* save the current actions if any */
if(fsa_actions != A_NOTHING) {
register_fsa_input_adv(
cur_data?cur_data->fsa_cause:C_FSA_INTERNAL,
I_NULL, cur_data?cur_data->data:NULL,
fsa_actions, TRUE, __FUNCTION__);
}
/* reset the action list */
fsa_actions = A_NOTHING;
/* register the error */
register_fsa_input_adv(
cause, input, new_data, A_NOTHING, TRUE, raised_from);
}
static gboolean last_was_vote = FALSE;
int
register_fsa_input_adv(
enum crmd_fsa_cause cause, enum crmd_fsa_input input,
void *data, long long with_actions,
gboolean prepend, const char *raised_from)
{
unsigned old_len = g_list_length(fsa_message_queue);
fsa_data_t *fsa_data = NULL;
last_data_id++;
CRM_CHECK(raised_from != NULL, raised_from = "<unknown>");
crm_debug("%s %s FSA input %d (%s) (cause=%s) %s data",
raised_from, prepend?"prepended":"appended",last_data_id, fsa_input2string(input),
fsa_cause2string(cause), data?"with":"without");
if(input == I_WAIT_FOR_EVENT) {
do_fsa_stall = TRUE;
crm_debug("Stalling the FSA pending further input: cause=%s",
fsa_cause2string(cause));
if(old_len > 0) {
crm_warn("%s stalled the FSA with pending inputs",
raised_from);
fsa_dump_queue(LOG_DEBUG);
}
if(data == NULL) {
set_bit_inplace(fsa_actions, with_actions);
with_actions = A_NOTHING;
return 0;
}
crm_err("%s stalled the FSA with data - this may be broken",
raised_from);
}
if(old_len == 0) {
last_was_vote = FALSE;
}
if(input == I_NULL && with_actions == A_NOTHING /* && data == NULL */){
/* no point doing anything */
crm_err("Cannot add entry to queue: no input and no action");
return 0;
} else if(data == NULL) {
last_was_vote = FALSE;
#if 0
} else if(last_was_vote && cause == C_HA_MESSAGE && input == I_ROUTER) {
const char *op = crm_element_value(
((ha_msg_input_t*)data)->msg, F_CRM_TASK);
if(safe_str_eq(op, CRM_OP_VOTE)) {
/* It is always safe to treat N successive votes as
* a single one
*
* If all the discarded votes are more "loosing" than
* the first then the result is accurate
* (win or loose).
*
* If any of the discarded votes are less "loosing"
* than the first then we will cast our vote and the
* eventual winner will vote us down again (which
* even in the case that N=2, is no worse than if we
* had not disarded the vote).
*/
crm_debug_2("Vote compression: %d", old_len);
return 0;
}
#endif
} else if (cause == C_HA_MESSAGE && input == I_ROUTER) {
const char *op = crm_element_value(
((ha_msg_input_t*)data)->msg, F_CRM_TASK);
if(safe_str_eq(op, CRM_OP_VOTE)) {
last_was_vote = TRUE;
crm_debug_3("Added vote: %d", old_len);
}
} else {
last_was_vote = FALSE;
}
crm_malloc0(fsa_data, sizeof(fsa_data_t));
fsa_data->id = last_data_id;
fsa_data->fsa_input = input;
fsa_data->fsa_cause = cause;
fsa_data->origin = raised_from;
fsa_data->data = NULL;
fsa_data->data_type = fsa_dt_none;
fsa_data->actions = with_actions;
if(with_actions != A_NOTHING) {
crm_debug_3("Adding actions %.16llx to input", with_actions);
}
if(data != NULL) {
switch(cause) {
case C_FSA_INTERNAL:
case C_CRMD_STATUS_CALLBACK:
case C_IPC_MESSAGE:
case C_HA_MESSAGE:
crm_debug_3("Copying %s data from %s as a HA msg",
fsa_cause2string(cause),
raised_from);
+ CRM_CHECK(((ha_msg_input_t*)data)->msg != NULL,
+ crm_err("Bogus data from %s", raised_from));
fsa_data->data = copy_ha_msg_input(data);
fsa_data->data_type = fsa_dt_ha_msg;
break;
case C_LRM_OP_CALLBACK:
crm_debug_3("Copying %s data from %s as lrm_op_t",
fsa_cause2string(cause),
raised_from);
fsa_data->data = copy_lrm_op((lrm_op_t*)data);
fsa_data->data_type = fsa_dt_lrm;
break;
case C_CCM_CALLBACK:
case C_SUBSYSTEM_CONNECT:
case C_LRM_MONITOR_CALLBACK:
case C_TIMER_POPPED:
case C_SHUTDOWN:
case C_HEARTBEAT_FAILED:
case C_HA_DISCONNECT:
case C_ILLEGAL:
case C_UNKNOWN:
case C_STARTUP:
crm_err("Copying %s data (from %s)"
" not yet implemented",
fsa_cause2string(cause), raised_from);
exit(1);
break;
}
crm_debug_4("%s data copied",
fsa_cause2string(fsa_data->fsa_cause));
}
/* make sure to free it properly later */
if(prepend) {
crm_debug_2("Prepending input");
fsa_message_queue = g_list_prepend(fsa_message_queue, fsa_data);
} else {
fsa_message_queue = g_list_append(fsa_message_queue, fsa_data);
}
crm_debug_2("Queue len: %d", g_list_length(fsa_message_queue));
fsa_dump_queue(LOG_DEBUG_2);
if(old_len == g_list_length(fsa_message_queue)){
crm_err("Couldnt add message to the queue");
}
if(fsa_source) {
crm_debug_3("Triggering FSA: %s", __FUNCTION__);
G_main_set_trigger(fsa_source);
}
return last_data_id;
}
void
fsa_dump_queue(int log_level)
{
if(log_level < (int)crm_log_level) {
return;
}
slist_iter(
data, fsa_data_t, fsa_message_queue, lpc,
do_crm_log(log_level,
"queue[%d(%d)]: input %s raised by %s()\t(cause=%s)",
lpc, data->id, fsa_input2string(data->fsa_input),
data->origin, fsa_cause2string(data->fsa_cause));
);
}
ha_msg_input_t *
copy_ha_msg_input(ha_msg_input_t *orig)
{
+ ha_msg_input_t *copy = NULL;
xmlNodePtr data = NULL;
if(orig != NULL) {
crm_debug_4("Copy msg");
data = copy_xml(orig->msg);
} else {
crm_debug_3("No message to copy");
}
- return new_ha_msg_input(data);
+ copy = new_ha_msg_input(data);
+ if(orig->msg != NULL) {
+ CRM_CHECK(copy->msg != NULL, crm_err("copy failed"));
+ }
+ return copy;
}
void
delete_fsa_input(fsa_data_t *fsa_data)
{
lrm_op_t *op = NULL;
xmlNode *foo = NULL;
if(fsa_data == NULL) {
return;
}
crm_debug_4("About to free %s data",
fsa_cause2string(fsa_data->fsa_cause));
if(fsa_data->data != NULL) {
switch(fsa_data->data_type) {
case fsa_dt_ha_msg:
delete_ha_msg_input(fsa_data->data);
break;
case fsa_dt_xml:
foo = fsa_data->data;
free_xml(foo);
break;
case fsa_dt_lrm:
op = (lrm_op_t*)fsa_data->data;
-
free_lrm_op(op);
-
break;
case fsa_dt_none:
if(fsa_data->data != NULL) {
crm_err("Dont know how to free %s data from %s",
fsa_cause2string(fsa_data->fsa_cause),
fsa_data->origin);
exit(1);
}
break;
}
crm_debug_4("%s data freed",
fsa_cause2string(fsa_data->fsa_cause));
}
crm_free(fsa_data);
}
/* returns the next message */
fsa_data_t *
get_message(void)
{
fsa_data_t* message = g_list_nth_data(fsa_message_queue, 0);
fsa_message_queue = g_list_remove(fsa_message_queue, message);
crm_debug_2("Processing input %d", message->id);
return message;
}
/* returns the current head of the FIFO queue */
gboolean
is_message(void)
{
return (g_list_length(fsa_message_queue) > 0);
}
void *
fsa_typed_data_adv(
fsa_data_t *fsa_data, enum fsa_data_type a_type, const char *caller)
{
void *ret_val = NULL;
if(fsa_data == NULL) {
do_crm_log(LOG_ERR, "%s: No FSA data available", caller);
} else if(fsa_data->data == NULL) {
do_crm_log(LOG_ERR, "%s: No message data available", caller);
} else if(fsa_data->data_type != a_type) {
do_crm_log(LOG_CRIT,
"%s: Message data was the wrong type! %d vs. requested=%d."
" Origin: %s", caller,
fsa_data->data_type, a_type, fsa_data->origin);
CRM_ASSERT(fsa_data->data_type == a_type);
} else {
ret_val = fsa_data->data;
}
return ret_val;
}
/* A_MSG_ROUTE */
void
do_msg_route(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input,
fsa_data_t *msg_data)
{
ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg);
- route_message(msg_data->fsa_cause, input);
+ route_message(msg_data->fsa_cause, input->msg);
}
void
-route_message(enum crmd_fsa_cause cause, ha_msg_input_t *input)
+route_message(enum crmd_fsa_cause cause, xmlNode *input)
{
+ ha_msg_input_t fsa_input;
enum crmd_fsa_input result = I_NULL;
+ fsa_input.msg = input;
CRM_CHECK(cause == C_IPC_MESSAGE || cause == C_HA_MESSAGE, return);
/* try passing the buck first */
crm_debug_4("Attempting to route message");
- if(relay_message(input->msg, cause==C_IPC_MESSAGE)) {
+ if(relay_message(input, cause==C_IPC_MESSAGE)) {
crm_debug_4("Message routed...");
- input->msg = NULL;
return;
}
crm_debug_4("Message wasn't routed... try handling locally");
/* calculate defer */
result = handle_message(input);
switch(result) {
case I_NULL:
crm_debug_4("Message processed");
break;
case I_CIB_OP:
break;
case I_ROUTER:
break;
case I_NODE_JOIN:
case I_JOIN_REQUEST:
case I_JOIN_RESULT:
break;
default:
crm_debug_4("Defering local processing of message");
- register_fsa_input_later(cause, result, input);
+ register_fsa_input_later(cause, result, &fsa_input);
result = I_NULL;
break;
}
if(result != I_NULL) {
/* add to the front of the queue */
- register_fsa_input(cause, result, input);
+ register_fsa_input(cause, result, &fsa_input);
}
+
+ free_xml(input);
}
/*
* This method frees msg
*/
gboolean
send_request(xmlNode *msg, char **msg_reference)
{
gboolean was_sent = FALSE;
/* crm_log_xml_debug_3(request, "Final request..."); */
if(msg_reference != NULL) {
*msg_reference = crm_strdup(
crm_element_value(msg, XML_ATTR_REFERENCE));
}
was_sent = relay_message(msg, TRUE);
if(was_sent == FALSE) {
ha_msg_input_t *fsa_input = new_ha_msg_input(msg);
register_fsa_input(C_IPC_MESSAGE, I_ROUTER, fsa_input);
crm_free(fsa_input);
free_xml(msg);
}
return was_sent;
}
/* unless more processing is required, relay_message is freed */
gboolean
relay_message(xmlNode *relay_message, gboolean originated_locally)
{
int is_for_dc = 0;
int is_for_dcib = 0;
int is_for_te = 0;
int is_for_crm = 0;
int is_for_cib = 0;
int is_local = 0;
gboolean processing_complete = FALSE;
const char *host_to = crm_element_value(relay_message, F_CRM_HOST_TO);
const char *sys_to = crm_element_value(relay_message, F_CRM_SYS_TO);
const char *sys_from= crm_element_value(relay_message, F_CRM_SYS_FROM);
const char *type = crm_element_value(relay_message, F_TYPE);
const char *msg_error = NULL;
crm_debug_3("Routing message %s",
crm_element_value(relay_message, XML_ATTR_REFERENCE));
if(relay_message == NULL) {
msg_error = "Cannot route empty message";
} else if(safe_str_eq(CRM_OP_HELLO,
crm_element_value(relay_message, F_CRM_TASK))){
/* quietly ignore */
processing_complete = TRUE;
} else if(safe_str_neq(type, T_CRM)) {
msg_error = "Bad message type";
} else if(sys_to == NULL) {
msg_error = "Bad message destination: no subsystem";
}
if(msg_error != NULL) {
processing_complete = TRUE;
crm_err("%s", msg_error);
crm_log_xml(LOG_WARNING, "bad msg", relay_message);
}
if(processing_complete) {
free_xml(relay_message);
return TRUE;
}
processing_complete = TRUE;
is_for_dc = (strcasecmp(CRM_SYSTEM_DC, sys_to) == 0);
is_for_dcib = (strcasecmp(CRM_SYSTEM_DCIB, sys_to) == 0);
is_for_te = (strcasecmp(CRM_SYSTEM_TENGINE, sys_to) == 0);
is_for_cib = (strcasecmp(CRM_SYSTEM_CIB, sys_to) == 0);
is_for_crm = (strcasecmp(CRM_SYSTEM_CRMD, sys_to) == 0);
is_local = 0;
if(host_to == NULL || strlen(host_to) == 0) {
if(is_for_dc || is_for_te) {
is_local = 0;
} else if(is_for_crm && originated_locally) {
is_local = 0;
} else {
is_local = 1;
}
} else if(safe_str_eq(fsa_our_uname, host_to)) {
is_local=1;
}
if(is_for_dc || is_for_dcib || is_for_te) {
if(AM_I_DC && is_for_te) {
ROUTER_RESULT("Message result: Local relay");
send_msg_via_ipc(relay_message, sys_to);
} else if(AM_I_DC) {
ROUTER_RESULT("Message result: DC/CRMd process");
processing_complete = FALSE; /* more to be done by caller */
} else if(originated_locally
&& safe_str_neq(sys_from, CRM_SYSTEM_PENGINE)
&& safe_str_neq(sys_from, CRM_SYSTEM_TENGINE)) {
/* Neither the TE or PE should be sending messages
* to DC's on other nodes
*
* By definition, if we are no longer the DC, then
* the PE or TE's data should be discarded
*/
ROUTER_RESULT("Message result: External relay to DC");
send_msg_via_ha(relay_message);
} else {
/* discard */
ROUTER_RESULT("Message result: Discard, not DC");
free_xml(relay_message);
}
} else if(is_local && (is_for_crm || is_for_cib)) {
ROUTER_RESULT("Message result: CRMd process");
processing_complete = FALSE; /* more to be done by caller */
} else if(is_local) {
ROUTER_RESULT("Message result: Local relay");
send_msg_via_ipc(relay_message, sys_to);
} else {
ROUTER_RESULT("Message result: External relay");
send_msg_via_ha(relay_message);
}
return processing_complete;
}
gboolean
-crmd_authorize_message(ha_msg_input_t *client_msg, crmd_client_t *curr_client)
+crmd_authorize_message(xmlNode *client_msg, crmd_client_t *curr_client)
{
/* check the best case first */
- const char *sys_from = crm_element_value(client_msg->msg, F_CRM_SYS_FROM);
+ const char *sys_from = crm_element_value(client_msg, F_CRM_SYS_FROM);
char *uuid = NULL;
char *client_name = NULL;
char *major_version = NULL;
char *minor_version = NULL;
const char *filtered_from;
gpointer table_key = NULL;
gboolean auth_result = FALSE;
struct crm_subsystem_s *the_subsystem = NULL;
gboolean can_reply = FALSE; /* no-one has registered with this id */
- const char *op = crm_element_value(client_msg->msg, F_CRM_TASK);
+ xmlNode *xml = NULL;
+ const char *op = crm_element_value(client_msg, F_CRM_TASK);
if (safe_str_neq(CRM_OP_HELLO, op)) {
if(sys_from == NULL) {
crm_warn("Message [%s] was had no value for %s... discarding",
- crm_element_value(client_msg->msg, XML_ATTR_REFERENCE),
+ crm_element_value(client_msg, XML_ATTR_REFERENCE),
F_CRM_SYS_FROM);
return FALSE;
}
filtered_from = sys_from;
/* The CIB can have two names on the DC */
if(strcasecmp(sys_from, CRM_SYSTEM_DCIB) == 0)
filtered_from = CRM_SYSTEM_CIB;
if (g_hash_table_lookup (ipc_clients, filtered_from) != NULL) {
can_reply = TRUE; /* reply can be routed */
}
crm_debug_2("Message reply can%s be routed from %s.",
can_reply?"":" not", sys_from);
if(can_reply == FALSE) {
crm_warn("Message [%s] not authorized",
- crm_element_value(client_msg->msg, XML_ATTR_REFERENCE));
+ crm_element_value(client_msg, XML_ATTR_REFERENCE));
}
return can_reply;
}
crm_debug_3("received client join msg");
- crm_log_xml(LOG_MSG, "join", client_msg->msg);
+ crm_log_xml(LOG_MSG, "join", client_msg);
+ xml = get_message_xml(client_msg, F_CRM_DATA);
auth_result = process_hello_message(
- client_msg->xml, &uuid, &client_name,
+ xml, &uuid, &client_name,
&major_version, &minor_version);
if (auth_result == TRUE) {
if(client_name == NULL || uuid == NULL) {
crm_err("Bad client details (client_name=%s, uuid=%s)",
crm_str(client_name), crm_str(uuid));
auth_result = FALSE;
}
}
if (auth_result == TRUE) {
/* check version */
int mav = atoi(major_version);
int miv = atoi(minor_version);
crm_debug_3("Checking client version number");
if (mav < 0 || miv < 0) {
crm_err("Client version (%d:%d) is not acceptable",
mav, miv);
auth_result = FALSE;
}
crm_free(major_version);
crm_free(minor_version);
}
-
if (safe_str_eq(CRM_SYSTEM_PENGINE, client_name)) {
the_subsystem = pe_subsystem;
} else if (safe_str_eq(CRM_SYSTEM_TENGINE, client_name)) {
the_subsystem = te_subsystem;
}
if (auth_result == TRUE && the_subsystem != NULL) {
/* if we already have one of those clients
* only applies to te, pe etc. not admin clients
*/
crm_debug_3("Checking if %s is required/already connected",
client_name);
table_key = (gpointer)crm_strdup(client_name);
if(is_set(fsa_input_register, the_subsystem->flag_connected)) {
auth_result = FALSE;
crm_free(table_key);
table_key = NULL;
crm_warn("Bit\t%.16llx set in %.16llx",
the_subsystem->flag_connected,
fsa_input_register);
crm_err("Client %s is already connected",
client_name);
} else if(FALSE == is_set(fsa_input_register,
the_subsystem->flag_required)) {
crm_warn("Bit\t%.16llx not set in %.16llx",
the_subsystem->flag_connected,
fsa_input_register);
crm_warn("Client %s joined but we dont need it",
client_name);
stop_subsystem(the_subsystem, TRUE);
} else {
the_subsystem->ipc = curr_client->client_channel;
set_bit_inplace(fsa_input_register,
the_subsystem->flag_connected);
}
} else {
table_key = (gpointer)generate_hash_key(client_name, uuid);
}
if (auth_result == TRUE) {
crm_debug_2("Accepted client %s", crm_str(table_key));
curr_client->table_key = table_key;
curr_client->sub_sys = crm_strdup(client_name);
curr_client->uuid = crm_strdup(uuid);
g_hash_table_insert (ipc_clients,
table_key, curr_client->client_channel);
send_hello_message(curr_client->client_channel,
"n/a", CRM_SYSTEM_CRMD,
"0", "1");
crm_debug_3("Updated client list with %s", crm_str(table_key));
crm_debug_3("Triggering FSA: %s", __FUNCTION__);
G_main_set_trigger(fsa_source);
if(the_subsystem != NULL) {
CRM_CHECK(the_subsystem->client == NULL,
process_client_disconnect(the_subsystem->client));
the_subsystem->client = curr_client;
}
} else {
crm_free(table_key);
crm_warn("Rejected client logon request");
curr_client->client_channel->ch_status = IPC_DISC_PENDING;
}
if(uuid != NULL) crm_free(uuid);
if(minor_version != NULL) crm_free(minor_version);
if(major_version != NULL) crm_free(major_version);
if(client_name != NULL) crm_free(client_name);
/* hello messages should never be processed further */
return FALSE;
}
enum crmd_fsa_input
-handle_message(ha_msg_input_t *stored_msg)
+handle_message(xmlNode *stored_msg)
{
enum crmd_fsa_input next_input = I_NULL;
const char *type = NULL;
- if(stored_msg == NULL || stored_msg->msg == NULL) {
+ if(stored_msg == NULL) {
crm_err("No message to handle");
return I_NULL;
}
- type = crm_element_value(stored_msg->msg, F_CRM_MSG_TYPE);
+ type = crm_element_value(stored_msg, F_CRM_MSG_TYPE);
if(safe_str_eq(type, XML_ATTR_REQUEST)) {
next_input = handle_request(stored_msg);
} else if(safe_str_eq(type, XML_ATTR_RESPONSE)) {
next_input = handle_response(stored_msg);
} else {
crm_err("Unknown message type: %s", type);
}
/* crm_debug_2("%s: Next input is %s", __FUNCTION__, */
/* fsa_input2string(next_input)); */
return next_input;
}
#define schedule_pe() do { \
next_input = I_PE_CALC; \
if(fsa_pe_ref) { \
crm_debug("Cancelling %s...", fsa_pe_ref); \
crm_free(fsa_pe_ref); \
fsa_pe_ref = NULL; \
} \
} while(0)
enum crmd_fsa_input
-handle_request(ha_msg_input_t *stored_msg)
+handle_request(xmlNode *stored_msg)
{
xmlNode *msg = NULL;
enum crmd_fsa_input next_input = I_NULL;
- const char *op = crm_element_value(stored_msg->msg, F_CRM_TASK);
- const char *sys_to = crm_element_value(stored_msg->msg, F_CRM_SYS_TO);
- const char *host_from = crm_element_value(stored_msg->msg, F_CRM_HOST_FROM);
+ const char *op = crm_element_value(stored_msg, F_CRM_TASK);
+ const char *sys_to = crm_element_value(stored_msg, F_CRM_SYS_TO);
+ const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM);
crm_debug_2("Received %s "XML_ATTR_REQUEST" from %s in state %s",
op, host_from, fsa_state2string(fsa_state));
if(op == NULL) {
- crm_log_xml(LOG_ERR, "Bad message", stored_msg->msg);
+ crm_log_xml(LOG_ERR, "Bad message", stored_msg);
/*========== common actions ==========*/
} else if(strcasecmp(op, CRM_OP_NOOP) == 0) {
crm_debug_2("no-op from %s", crm_str(host_from));
} else if(strcasecmp(op, CRM_OP_NOVOTE) == 0) {
- register_fsa_input_adv(C_HA_MESSAGE, I_NULL, stored_msg,
+ ha_msg_input_t fsa_input;
+ fsa_input.msg = stored_msg;
+ register_fsa_input_adv(C_HA_MESSAGE, I_NULL, &fsa_input,
A_ELECTION_COUNT|A_ELECTION_CHECK, FALSE, __FUNCTION__);
} else if(strcasecmp(op, CRM_OP_VOTE) == 0) {
/* count the vote and decide what to do after that */
- register_fsa_input_adv(C_HA_MESSAGE, I_NULL, stored_msg,
- A_ELECTION_COUNT|A_ELECTION_CHECK, FALSE, __FUNCTION__);
+ ha_msg_input_t fsa_input;
+ fsa_input.msg = stored_msg;
+ register_fsa_input_adv(C_HA_MESSAGE, I_NULL, &fsa_input,
+ A_ELECTION_COUNT|A_ELECTION_CHECK, FALSE, __FUNCTION__);
/* Sometimes we _must_ go into S_ELECTION */
if(fsa_state == S_HALT) {
crm_debug("Forcing an election from S_HALT");
next_input = I_ELECTION;
#if 0
} else if(AM_I_DC) {
/* This is the old way of doing things but what is gained? */
next_input = I_ELECTION;
#endif
}
} else if(strcasecmp(op, CRM_OP_LOCAL_SHUTDOWN) == 0) {
crm_shutdown(SIGTERM);
/*next_input = I_SHUTDOWN; */
next_input = I_NULL;
} else if(strcasecmp(op, CRM_OP_PING) == 0) {
/* eventually do some stuff to figure out
* if we /are/ ok
*/
xmlNode *ping = createPingAnswerFragment(sys_to, "ok");
crm_xml_add(ping, "crmd_state", fsa_state2string(fsa_state));
crm_info("Current ping state: %s", fsa_state2string(fsa_state));
- msg = create_reply(stored_msg->msg, ping);
+ msg = create_reply(stored_msg, ping);
free_xml(ping);
if(relay_message(msg, TRUE) == FALSE) {
free_xml(msg);
}
/* probably better to do this via signals on the
* local node
*/
} else if(strcasecmp(op, CRM_OP_DEBUG_UP) == 0) {
alter_debug(DEBUG_INC);
crm_info("Debug set to %d", get_crm_log_level());
} else if(strcasecmp(op, CRM_OP_DEBUG_DOWN) == 0) {
alter_debug(DEBUG_DEC);
crm_info("Debug set to %d", get_crm_log_level());
} else if(strcasecmp(op, CRM_OP_JOIN_OFFER) == 0) {
next_input = I_JOIN_OFFER;
crm_debug("Raising I_JOIN_OFFER: join-%s",
- crm_element_value(stored_msg->msg, F_CRM_JOIN_ID));
+ crm_element_value(stored_msg, F_CRM_JOIN_ID));
} else if(strcasecmp(op, CRM_OP_JOIN_ACKNAK) == 0) {
next_input = I_JOIN_RESULT;
crm_debug("Raising I_JOIN_RESULT: join-%s",
- crm_element_value(stored_msg->msg, F_CRM_JOIN_ID));
+ crm_element_value(stored_msg, F_CRM_JOIN_ID));
} else if(strcasecmp(op, CRM_OP_LRM_DELETE) == 0
|| strcasecmp(op, CRM_OP_LRM_FAIL) == 0
|| strcasecmp(op, CRM_OP_LRM_REFRESH) == 0
|| strcasecmp(op, CRM_OP_REPROBE) == 0) {
- crm_xml_add(stored_msg->msg, F_CRM_SYS_TO, CRM_SYSTEM_LRMD);
+ crm_xml_add(stored_msg, F_CRM_SYS_TO, CRM_SYSTEM_LRMD);
next_input = I_ROUTER;
/* this functionality should only be enabled
* if this is a development build
*/
} else if(CRM_DEV_BUILD && strcasecmp(op, CRM_OP_DIE) == 0/*constant condition*/) {
crm_warn("Test-only code: Killing the CRM without mercy");
crm_warn("Inhibiting respawns");
exit(100);
/*========== (NOT_DC)-Only Actions ==========*/
} else if(AM_I_DC == FALSE){
gboolean dc_match = safe_str_eq(host_from, fsa_our_dc);
if(dc_match || fsa_our_dc == NULL) {
if(strcasecmp(op, CRM_OP_HBEAT) == 0) {
crm_debug_3("Received DC heartbeat from %s",
host_from);
next_input = I_DC_HEARTBEAT;
} else if(fsa_our_dc == NULL) {
crm_warn("CRMd discarding request: %s"
" (DC: %s, from: %s)",
op, crm_str(fsa_our_dc), host_from);
- crm_log_xml(LOG_WARNING, "Ignored Request", stored_msg->msg);
+ crm_log_xml(LOG_WARNING, "Ignored Request", stored_msg);
} else if(strcasecmp(op, CRM_OP_SHUTDOWN) == 0) {
next_input = I_STOP;
} else {
crm_err("CRMd didnt expect request: %s", op);
- crm_log_xml(LOG_ERR, "bad request", stored_msg->msg);
+ crm_log_xml(LOG_ERR, "bad request", stored_msg);
}
} else {
crm_warn("Discarding %s op from %s", op, host_from);
}
/*========== DC-Only Actions ==========*/
} else if(AM_I_DC) {
const char *message = crm_element_value(
- stored_msg->msg, "message");
+ stored_msg, "message");
/* setting "fsa_pe_ref = NULL" makes sure we ignore any
* PE reply that might be pending or in the queue while
* we ask the CIB for a more up-to-date copy
*/
if(safe_str_eq(op, CRM_OP_TEABORT)) {
crm_debug("Transition cancelled: %s/%s", op, message);
clear_bit_inplace(fsa_input_register, R_IN_TRANSITION);
if(need_transition(fsa_state)) {
schedule_pe();
} else {
crm_debug("Filtering %s op in state %s",
op, fsa_state2string(fsa_state));
}
} else if(strcasecmp(op, CRM_OP_TECOMPLETE) == 0) {
crm_debug("Transition complete: %s/%s", op, message);
clear_bit_inplace(fsa_input_register, R_IN_TRANSITION);
if(fsa_state == S_TRANSITION_ENGINE) {
next_input = I_TE_SUCCESS;
} else {
crm_debug("Filtering %s op in state %s",
op, fsa_state2string(fsa_state));
}
} else if(strcasecmp(op, CRM_OP_JOIN_ANNOUNCE) == 0) {
next_input = I_NODE_JOIN;
} else if(strcasecmp(op, CRM_OP_JOIN_REQUEST) == 0) {
next_input = I_JOIN_REQUEST;
} else if(strcasecmp(op, CRM_OP_JOIN_CONFIRM) == 0) {
next_input = I_JOIN_RESULT;
} else if(strcasecmp(op, CRM_OP_SHUTDOWN) == 0) {
gboolean dc_match = safe_str_eq(host_from, fsa_our_dc);
if(dc_match) {
crm_err("We didnt ask to be shut down yet our"
" TE is telling us too."
" Better get out now!");
next_input = I_TERMINATE;
} else if(is_set(fsa_input_register, R_SHUTDOWN)) {
crm_info("Shutting ourselves down (DC)");
next_input = I_STOP;
} else if(fsa_state != S_STOPPING) {
crm_err("Another node is asking us to shutdown"
" but we think we're ok.");
next_input = I_ELECTION;
}
} else if(strcasecmp(op, CRM_OP_SHUTDOWN_REQ) == 0) {
/* a slave wants to shut down */
/* create cib fragment and add to message */
- next_input = handle_shutdown_request(stored_msg->msg);
+ next_input = handle_shutdown_request(stored_msg);
} else {
crm_err("Unexpected request (%s) sent to the DC", op);
- crm_log_xml(LOG_ERR, "Unexpected", stored_msg->msg);
- }
+ crm_log_xml(LOG_ERR, "Unexpected", stored_msg);
+ }
}
return next_input;
}
enum crmd_fsa_input
-handle_response(ha_msg_input_t *stored_msg)
+handle_response(xmlNode *stored_msg)
{
enum crmd_fsa_input next_input = I_NULL;
- const char *op = crm_element_value(stored_msg->msg, F_CRM_TASK);
- const char *sys_from = crm_element_value(stored_msg->msg, F_CRM_SYS_FROM);
- const char *host_from = crm_element_value(stored_msg->msg, F_CRM_HOST_FROM);
- const char *msg_ref = crm_element_value(stored_msg->msg, XML_ATTR_REFERENCE);
+ const char *op = crm_element_value(stored_msg, F_CRM_TASK);
+ const char *sys_from = crm_element_value(stored_msg, F_CRM_SYS_FROM);
+ const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM);
+ const char *msg_ref = crm_element_value(stored_msg, XML_ATTR_REFERENCE);
crm_debug_2("Received %s "XML_ATTR_RESPONSE" from %s in state %s",
op, host_from, fsa_state2string(fsa_state));
if(op == NULL) {
- crm_log_xml(LOG_ERR, "Bad message", stored_msg->msg);
+ crm_log_xml(LOG_ERR, "Bad message", stored_msg);
} else if(AM_I_DC && strcasecmp(op, CRM_OP_PECALC) == 0) {
crm_debug_2("Processing %s reply %s (fsa=%s)",
sys_from, msg_ref, crm_str(fsa_pe_ref));
if(msg_ref != NULL && safe_str_eq(msg_ref, fsa_pe_ref)) {
next_input = I_PE_SUCCESS;
crm_debug_2("Completed: %s...", fsa_pe_ref);
crm_free(fsa_pe_ref);
fsa_pe_ref = NULL;
} else {
crm_debug_2("Skipping superceeded reply from %s",
sys_from);
}
} else if(strcasecmp(op, CRM_OP_VOTE) == 0
|| strcasecmp(op, CRM_OP_HBEAT) == 0
|| strcasecmp(op, CRM_OP_SHUTDOWN_REQ) == 0
|| strcasecmp(op, CRM_OP_SHUTDOWN) == 0) {
crm_debug_2("Ignoring %s from %s in %s",
op, host_from, fsa_state2string(fsa_state));
next_input = I_NULL;
} else {
crm_err("Unexpected response (op=%s) sent to the %s",
op, AM_I_DC?"DC":"CRMd");
next_input = I_NULL;
}
return next_input;
}
enum crmd_fsa_input
handle_shutdown_request(xmlNode *stored_msg)
{
/* handle here to avoid potential version issues
* where the shutdown message/proceedure may have
* been changed in later versions.
*
* This way the DC is always in control of the shutdown
*/
time_t now = time(NULL);
xmlNode *node_state = NULL;
const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM);
if(host_from == NULL) {
/* we're shutting down and the DC */
host_from = fsa_our_uname;
}
crm_info("Creating shutdown request for %s",host_from);
crm_log_xml(LOG_MSG, "message", stored_msg);
node_state = create_node_state(
host_from, NULL, NULL, NULL, NULL,
CRMD_STATE_INACTIVE, FALSE, __FUNCTION__);
crm_xml_add_int(node_state, XML_CIB_ATTR_SHUTDOWN, (int)now);
fsa_cib_anon_update(XML_CIB_TAG_STATUS,node_state, cib_quorum_override);
crm_log_xml_debug_2(node_state, "Shutdown update");
free_xml(node_state);
/* will be picked up by the TE as long as its running */
if(need_transition(fsa_state)
&& is_set(fsa_input_register, R_TE_CONNECTED) == FALSE) {
register_fsa_action(A_TE_CANCEL);
}
return I_NULL;
}
/* frees msg upon completion */
gboolean
send_msg_via_ha(xmlNode *msg)
{
int log_level = LOG_DEBUG_3;
gboolean broadcast = FALSE;
gboolean all_is_good = TRUE;
const char *op = crm_element_value(msg, F_CRM_TASK);
const char *sys_to = crm_element_value(msg, F_CRM_SYS_TO);
const char *host_to = crm_element_value(msg, F_CRM_HOST_TO);
enum crm_ais_msg_types dest = 0;
if(is_openais_cluster()) {
dest = 1;
#if SUPPORT_AIS
dest = text2msg_type(sys_to);
#endif
}
if (msg == NULL) {
crm_err("Attempt to send NULL Message via HA failed.");
all_is_good = FALSE;
} else {
crm_debug_4("Relaying message to (%s) via HA", host_to);
}
if (all_is_good) {
if (sys_to == NULL || strlen(sys_to) == 0) {
crm_err("You did not specify a destination sub-system"
" for this message.");
all_is_good = FALSE;
}
}
/* There are a number of messages may not need to be ordered.
* At a later point perhaps we should detect them and send them
* as unordered messages.
*/
if (all_is_good) {
if (host_to == NULL
|| strlen(host_to) == 0
|| safe_str_eq(sys_to, CRM_SYSTEM_DC)) {
broadcast = TRUE;
all_is_good = send_cluster_message(NULL, dest, msg, FALSE);
} else {
all_is_good = send_cluster_message(host_to, dest, msg, FALSE);
}
}
if(all_is_good == FALSE) {
log_level = LOG_WARNING;
}
if(log_level == LOG_WARNING
|| (safe_str_neq(op, CRM_OP_HBEAT))) {
do_crm_log(log_level,
"Sending %sHA message (ref=%s) to %s@%s %s.",
broadcast?"broadcast ":"directed ",
crm_element_value(msg, XML_ATTR_REFERENCE),
crm_str(sys_to), host_to==NULL?"<all>":host_to,
all_is_good?"succeeded":"failed");
}
free_xml(msg);
return all_is_good;
}
/* msg is deleted by the time this returns */
gboolean
send_msg_via_ipc(xmlNode *msg, const char *sys)
{
gboolean send_ok = TRUE;
IPC_Channel *client_channel;
crm_debug_4("relaying msg to sub_sys=%s via IPC", sys);
client_channel = (IPC_Channel*)g_hash_table_lookup(ipc_clients, sys);
if(crm_element_value(msg, F_CRM_HOST_FROM) == NULL) {
crm_xml_add(msg, F_CRM_HOST_FROM, fsa_our_uname);
}
if (client_channel != NULL) {
crm_debug_3("Sending message via channel %s.", sys);
send_ok = send_ipc_message(client_channel, msg);
} else if(sys != NULL && strcasecmp(sys, CRM_SYSTEM_CIB) == 0) {
crm_err("Sub-system (%s) has been incorporated into the CRMd.",
sys);
crm_err("Change the way we handle this CIB message");
crm_log_xml(LOG_ERR, "cib op", msg);
send_ok = FALSE;
} else if(sys != NULL && strcasecmp(sys, CRM_SYSTEM_LRMD) == 0) {
fsa_data_t *fsa_data = NULL;
ha_msg_input_t *msg_copy = new_ha_msg_input(msg);
crm_malloc0(fsa_data, sizeof(fsa_data_t));
fsa_data->fsa_input = I_MESSAGE;
fsa_data->fsa_cause = C_IPC_MESSAGE;
fsa_data->data = msg_copy;
fsa_data->origin = __FUNCTION__;
fsa_data->data_type = fsa_dt_ha_msg;
#ifdef FSA_TRACE
crm_debug_2("Invoking action %s (%.16llx)",
fsa_action2string(A_LRM_INVOKE),
A_LRM_INVOKE);
#endif
do_lrm_invoke(A_LRM_INVOKE, C_IPC_MESSAGE, fsa_state, I_MESSAGE, fsa_data);
crm_free(msg_copy);
crm_free(fsa_data);
} else {
crm_err("Unknown Sub-system (%s)... discarding message.",
crm_str(sys));
send_ok = FALSE;
}
free_xml(msg);
return send_ok;
}
void
msg_queue_helper(void)
{
#if SUPPORT_HEARTBEAT
IPC_Channel *ipc = NULL;
if(fsa_cluster_conn != NULL) {
ipc = fsa_cluster_conn->llc_ops->ipcchan(
fsa_cluster_conn);
}
if(ipc != NULL) {
ipc->ops->resume_io(ipc);
}
/* g_hash_table_foreach_remove(ipc_clients, ipc_queue_helper, NULL); */
#endif
}
gboolean
ipc_queue_helper(gpointer key, gpointer value, gpointer user_data)
{
crmd_client_t *ipc_client = value;
if(ipc_client->client_channel != NULL) {
ipc_client->client_channel->ops->is_message_pending(ipc_client->client_channel);
}
return FALSE;
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Nov 23, 4:49 PM (14 h, 2 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1018983
Default Alt Text
(70 KB)
Attached To
Mode
rP Pacemaker
Attached
Detach File
Event Timeline
Log In to Comment