diff --git a/crmd/ccm.c b/crmd/ccm.c index e8e9cad561..7459ebe437 100644 --- a/crmd/ccm.c +++ b/crmd/ccm.c @@ -1,476 +1,476 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /* put these first so that uuid_t is defined without conflicts */ #include #if SUPPORT_HEARTBEAT #include #include #endif #include #include #include #include #include #include #include #include #include #include #include #include gboolean membership_flux_hack = FALSE; void post_cache_update(int instance); #if SUPPORT_HEARTBEAT oc_ev_t *fsa_ev_token; void oc_ev_special(const oc_ev_t *, oc_ev_class_t , int ); void crmd_ccm_msg_callback( oc_ed_t event, void *cookie, size_t size, const void *data); #endif void ghash_update_cib_node(gpointer key, gpointer value, gpointer user_data); void check_dead_member(const char *uname, GHashTable *members); void reap_dead_ccm_nodes(gpointer key, gpointer value, gpointer user_data); #define CCM_EVENT_DETAIL 0 #define CCM_EVENT_DETAIL_PARTIAL 0 int num_ccm_register_fails = 0; int max_ccm_register_fails = 30; int last_peer_update = 0; extern GHashTable *voted; struct update_data_s { const char *state; const char *caller; xmlNode *updates; gboolean overwrite_join; }; void reap_dead_ccm_nodes(gpointer key, gpointer value, gpointer user_data) { crm_node_t *node = value; if(crm_is_member_active(node) == FALSE) { check_dead_member(node->uname, NULL); } } extern gboolean check_join_state(enum crmd_fsa_state cur_state, const char *source); void check_dead_member(const char *uname, GHashTable *members) { CRM_CHECK(uname != NULL, return); if(members != NULL && g_hash_table_lookup(members, uname) != NULL) { crm_err("%s didnt really leave the membership!", uname); return; } erase_node_from_join(uname); if(voted != NULL) { g_hash_table_remove(voted, uname); } if(safe_str_eq(fsa_our_uname, uname)) { crm_err("We're not part of the cluster anymore"); } else if(AM_I_DC == FALSE && safe_str_eq(uname, fsa_our_dc)) { crm_warn("Our DC node (%s) left the cluster", uname); register_fsa_input(C_FSA_INTERNAL, I_ELECTION, NULL); } else if(fsa_state == S_INTEGRATION || fsa_state == S_FINALIZE_JOIN) { check_join_state(fsa_state, __FUNCTION__); } } /* A_CCM_CONNECT */ void do_ccm_control(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { #if SUPPORT_HEARTBEAT if(is_heartbeat_cluster()) { if(action & A_CCM_DISCONNECT){ set_bit_inplace(fsa_input_register, R_CCM_DISCONNECTED); oc_ev_unregister(fsa_ev_token); } if(action & A_CCM_CONNECT) { int ret; int fsa_ev_fd; gboolean did_fail = FALSE; crm_peer_init(); crm_debug_3("Registering with CCM"); clear_bit_inplace(fsa_input_register, R_CCM_DISCONNECTED); ret = oc_ev_register(&fsa_ev_token); if (ret != 0) { crm_warn("CCM registration failed"); did_fail = TRUE; } if(did_fail == FALSE) { crm_debug_3("Setting up CCM callbacks"); ret = oc_ev_set_callback(fsa_ev_token, OC_EV_MEMB_CLASS, crmd_ccm_msg_callback, NULL); if (ret != 0) { crm_warn("CCM callback not set"); did_fail = TRUE; } } if(did_fail == FALSE) { oc_ev_special(fsa_ev_token, OC_EV_MEMB_CLASS, 0/*don't care*/); crm_debug_3("Activating CCM token"); ret = oc_ev_activate(fsa_ev_token, &fsa_ev_fd); if (ret != 0){ crm_warn("CCM Activation failed"); did_fail = TRUE; } } if(did_fail) { num_ccm_register_fails++; oc_ev_unregister(fsa_ev_token); if(num_ccm_register_fails < max_ccm_register_fails) { crm_warn("CCM Connection failed" " %d times (%d max)", num_ccm_register_fails, max_ccm_register_fails); crm_timer_start(wait_timer); crmd_fsa_stall(NULL); return; } else { crm_err("CCM Activation failed %d (max) times", num_ccm_register_fails); register_fsa_error(C_FSA_INTERNAL, I_FAIL, NULL); return; } } crm_info("CCM connection established..." " waiting for first callback"); G_main_add_fd(G_PRIORITY_HIGH, fsa_ev_fd, FALSE, ccm_dispatch, fsa_ev_token, default_ipc_connection_destroy); } } #endif if(action & ~(A_CCM_CONNECT|A_CCM_DISCONNECT)) { crm_err("Unexpected action %s in %s", fsa_action2string(action), __FUNCTION__); } } #if SUPPORT_HEARTBEAT void ccm_event_detail(const oc_ev_membership_t *oc, oc_ed_t event) { int lpc; gboolean member = FALSE; member = FALSE; crm_debug_2("-----------------------"); crm_info("%s: trans=%d, nodes=%d, new=%d, lost=%d n_idx=%d, " "new_idx=%d, old_idx=%d", ccm_event_name(event), oc->m_instance, oc->m_n_member, oc->m_n_in, oc->m_n_out, oc->m_memb_idx, oc->m_in_idx, oc->m_out_idx); #if !CCM_EVENT_DETAIL_PARTIAL for(lpc=0; lpc < oc->m_n_member; lpc++) { crm_info("\tCURRENT: %s [nodeid=%d, born=%d]", oc->m_array[oc->m_memb_idx+lpc].node_uname, oc->m_array[oc->m_memb_idx+lpc].node_id, oc->m_array[oc->m_memb_idx+lpc].node_born_on); if(safe_str_eq(fsa_our_uname, oc->m_array[oc->m_memb_idx+lpc].node_uname)) { member = TRUE; } } if (member == FALSE) { crm_warn("MY NODE IS NOT IN CCM THE MEMBERSHIP LIST"); } #endif for(lpc=0; lpc<(int)oc->m_n_in; lpc++) { crm_info("\tNEW: %s [nodeid=%d, born=%d]", oc->m_array[oc->m_in_idx+lpc].node_uname, oc->m_array[oc->m_in_idx+lpc].node_id, oc->m_array[oc->m_in_idx+lpc].node_born_on); } for(lpc=0; lpc<(int)oc->m_n_out; lpc++) { crm_info("\tLOST: %s [nodeid=%d, born=%d]", oc->m_array[oc->m_out_idx+lpc].node_uname, oc->m_array[oc->m_out_idx+lpc].node_id, oc->m_array[oc->m_out_idx+lpc].node_born_on); } crm_debug_2("-----------------------"); } #endif gboolean ever_had_quorum = FALSE; void post_cache_update(int instance) { xmlNode *no_op = NULL; crm_peer_seq = instance; ever_had_quorum |= crm_have_quorum; crm_debug("Updated cache after membership event %d.", instance); g_hash_table_foreach(crm_peer_cache, reap_dead_ccm_nodes, NULL); set_bit_inplace(fsa_input_register, R_CCM_DATA); if(AM_I_DC) { populate_cib_nodes(FALSE); do_update_cib_nodes(FALSE, __FUNCTION__); } /* Membership changed, remind everyone we're here. * This will aid detection of duplicate DCs */ no_op = create_request( CRM_OP_NOOP, NULL, NULL, CRM_SYSTEM_CRMD, AM_I_DC?CRM_SYSTEM_DC:CRM_SYSTEM_CRMD, NULL); - send_msg_via_ha(no_op); + send_cluster_message(NULL, crm_msg_crmd, no_op, FALSE); free_xml(no_op); } /* A_CCM_UPDATE_CACHE */ /* * Take the opportunity to update the node status in the CIB as well */ #if SUPPORT_HEARTBEAT void do_ccm_update_cache( enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, oc_ed_t event, const oc_ev_membership_t *oc, xmlNode *xml) { unsigned long long instance = 0; unsigned int lpc = 0; if(is_heartbeat_cluster()) { CRM_ASSERT(oc != NULL); instance = oc->m_instance; } CRM_ASSERT(crm_peer_seq <= instance); switch(cur_state) { case S_STOPPING: case S_TERMINATE: case S_HALT: crm_debug("Ignoring %s CCM event %llu, we're in state %s", ccm_event_name(event), instance, fsa_state2string(cur_state)); return; case S_ELECTION: register_fsa_action(A_ELECTION_CHECK); break; default: break; } if(is_heartbeat_cluster()) { ccm_event_detail(oc, event); /*--*-- Recently Dead Member Nodes --*--*/ for(lpc=0; lpc < oc->m_n_out; lpc++) { crm_update_ccm_node(oc, lpc+oc->m_out_idx, CRM_NODE_LOST, instance); } /*--*-- All Member Nodes --*--*/ for(lpc=0; lpc < oc->m_n_member; lpc++) { crm_update_ccm_node(oc, lpc+oc->m_memb_idx, CRM_NODE_ACTIVE, instance); } } if(event == OC_EV_MS_EVICTED) { crm_update_peer( 0, 0, 0, -1, 0, fsa_our_uuid, fsa_our_uname, NULL, CRM_NODE_EVICTED); /* todo: drop back to S_PENDING instead */ /* get out... NOW! * * go via the error recovery process so that HA will * restart us if required */ register_fsa_error_adv(cause, I_ERROR, NULL, NULL, __FUNCTION__); } post_cache_update(instance); return; } #endif static void ccm_node_update_complete(xmlNode *msg, int call_id, int rc, xmlNode *output, void *user_data) { fsa_data_t *msg_data = NULL; last_peer_update = 0; if(rc == cib_ok) { crm_debug_2("Node update %d complete", call_id); } else { crm_err("Node update %d failed", call_id); crm_log_xml(LOG_DEBUG, "failed", msg); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } } void ghash_update_cib_node(gpointer key, gpointer value, gpointer user_data) { xmlNode *tmp1 = NULL; const char *join = NULL; crm_node_t *node = value; struct update_data_s* data = (struct update_data_s*)user_data; data->state = XML_BOOLEAN_NO; if(safe_str_eq(node->state, CRM_NODE_ACTIVE)) { data->state = XML_BOOLEAN_YES; } crm_debug("Updating %s: %s (overwrite=%s) hash_size=%d", node->uname, data->state, data->overwrite_join?"true":"false", g_hash_table_size(confirmed_nodes)); if(data->overwrite_join) { if((node->processes & crm_proc_crmd) == FALSE) { join = CRMD_JOINSTATE_DOWN; } else { const char *peer_member = g_hash_table_lookup( confirmed_nodes, node->uname); if(peer_member != NULL) { join = CRMD_JOINSTATE_MEMBER; } else { join = CRMD_JOINSTATE_PENDING; } } } tmp1 = create_node_state( node->uname, (node->processes&crm_proc_ais)?ACTIVESTATUS:DEADSTATUS, data->state, (node->processes&crm_proc_crmd)?ONLINESTATUS:OFFLINESTATUS, join, NULL, FALSE, data->caller); add_node_copy(data->updates, tmp1); free_xml(tmp1); } void do_update_cib_nodes(gboolean overwrite, const char *caller) { int call_id = 0; int call_options = cib_scope_local|cib_quorum_override; struct update_data_s update_data; xmlNode *fragment = NULL; if(crm_peer_cache == NULL) { /* We got a replace notification before being connected to * the CCM. * So there is no need to update the local CIB with our values * - since we have none. */ return; } else if(AM_I_DC == FALSE) { return; } fragment = create_xml_node(NULL, XML_CIB_TAG_STATUS); update_data.caller = caller; update_data.updates = fragment; update_data.overwrite_join = overwrite; g_hash_table_foreach(crm_peer_cache, ghash_update_cib_node, &update_data); fsa_cib_update(XML_CIB_TAG_STATUS, fragment, call_options, call_id); add_cib_op_callback(fsa_cib_conn, call_id, FALSE, NULL, ccm_node_update_complete); last_peer_update = call_id; free_xml(fragment); } static void cib_quorum_update_complete( xmlNode *msg, int call_id, int rc, xmlNode *output, void *user_data) { fsa_data_t *msg_data = NULL; if(rc == cib_ok) { crm_debug_2("Quorum update %d complete", call_id); } else { crm_err("Quorum update %d failed", call_id); crm_log_xml(LOG_DEBUG, "failed", msg); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } } void crm_update_quorum(gboolean quorum, gboolean force_update) { if(AM_I_DC && (force_update || fsa_has_quorum != quorum)) { int call_id = 0; xmlNode *update = NULL; int call_options = cib_scope_local|cib_quorum_override; update = create_xml_node(NULL, XML_TAG_CIB); crm_xml_add_int(update, XML_ATTR_HAVE_QUORUM, quorum); set_uuid(update, XML_ATTR_DC_UUID, fsa_our_uname); fsa_cib_update(XML_TAG_CIB, update, call_options, call_id); crm_info("Updating quorum status to %s (call=%d)", quorum?"true":"false", call_id); add_cib_op_callback(fsa_cib_conn, call_id, FALSE, NULL, cib_quorum_update_complete); free_xml(update); } fsa_has_quorum = quorum; } diff --git a/crmd/crmd_messages.h b/crmd/crmd_messages.h index 8a93809120..5856edb6ee 100644 --- a/crmd/crmd_messages.h +++ b/crmd/crmd_messages.h @@ -1,113 +1,113 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef XML_CRM_MESSAGES__H #define XML_CRM_MESSAGES__H #include #include #include +#include #include extern void *fsa_typed_data_adv( fsa_data_t *fsa_data, enum fsa_data_type a_type, const char *caller); #define fsa_typed_data(x) fsa_typed_data_adv(msg_data, x, __FUNCTION__) extern void register_fsa_error_adv( enum crmd_fsa_cause cause, enum crmd_fsa_input input, fsa_data_t *cur_data, void *new_data, const char *raised_from); #define register_fsa_error(cause, input, new_data) register_fsa_error_adv(cause, input, msg_data, new_data, __FUNCTION__) extern int register_fsa_input_adv( enum crmd_fsa_cause cause, enum crmd_fsa_input input, void *data, long long with_actions, gboolean prepend, const char *raised_from); extern void fsa_dump_queue(int log_level); extern void route_message(enum crmd_fsa_cause cause, xmlNode *input); #define crmd_fsa_stall(cur_input) if(cur_input != NULL) { \ register_fsa_input_adv( \ ((fsa_data_t*)cur_input)->fsa_cause, I_WAIT_FOR_EVENT, \ ((fsa_data_t*)cur_input)->data, action, TRUE, __FUNCTION__); \ } else { \ register_fsa_input_adv( \ C_FSA_INTERNAL, I_WAIT_FOR_EVENT, \ NULL, action, TRUE, __FUNCTION__); \ } \ #define register_fsa_input(cause, input, data) register_fsa_input_adv(cause, input, data, A_NOTHING, FALSE, __FUNCTION__) #define register_fsa_action(action) { \ fsa_actions |= action; \ if(fsa_source) { \ G_main_set_trigger(fsa_source); \ } \ crm_debug("%s added action %s to the FSA", \ __FUNCTION__, fsa_action2string(action)); \ } #define register_fsa_input_before(cause, input, data) register_fsa_input_adv(cause, input, data, A_NOTHING, TRUE, __FUNCTION__) #define register_fsa_input_later(cause, input, data) register_fsa_input_adv(cause, input, data, A_NOTHING, FALSE, __FUNCTION__) void delete_fsa_input(fsa_data_t *fsa_data); GListPtr put_message(fsa_data_t *new_message); fsa_data_t *get_message(void); gboolean is_message(void); gboolean have_wait_message(void); extern gboolean relay_message( xmlNode *relay_message, gboolean originated_locally); extern gboolean crmd_ipc_msg_callback(IPC_Channel *client, gpointer user_data); extern void process_message( xmlNode *msg, gboolean originated_locally,const char *src_node_name); extern gboolean crm_dc_process_message(xmlNode *whole_message, xmlNode *action, const char *host_from, const char *sys_from, const char *sys_to, const char *op, gboolean dc_mode); -extern gboolean send_msg_via_ha(xmlNode *msg); extern gboolean send_msg_via_ipc(xmlNode *msg, const char *sys); extern gboolean add_pending_outgoing_reply(const char *originating_node_name, const char *crm_msg_reference, const char *sys_to, const char *sys_from); extern gboolean crmd_authorize_message(xmlNode *client_msg, crmd_client_t *curr_client); extern gboolean send_request(xmlNode *msg, char **msg_reference); extern enum crmd_fsa_input handle_message(xmlNode *stored_msg); extern void lrm_op_callback(lrm_op_t* op); extern void msg_queue_helper(void); #endif diff --git a/crmd/join_client.c b/crmd/join_client.c index 22ab1dd4d6..4f7e775310 100644 --- a/crmd/join_client.c +++ b/crmd/join_client.c @@ -1,274 +1,274 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include int reannounce_count = 0; void join_query_callback(xmlNode *msg, int call_id, int rc, xmlNode *output, void *user_data); extern ha_msg_input_t *copy_ha_msg_input(ha_msg_input_t *orig); /* A_CL_JOIN_QUERY */ /* is there a DC out there? */ void do_cl_join_query(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { xmlNode *req = create_request(CRM_OP_JOIN_ANNOUNCE, NULL, NULL, CRM_SYSTEM_DC, CRM_SYSTEM_CRMD, NULL); sleep(1); /* give the CCM time to propogate to the DC */ crm_debug("Querying for a DC"); - send_msg_via_ha(req); + send_cluster_message(NULL, crm_msg_crmd, req, FALSE); free_xml(req); } /* A_CL_JOIN_ANNOUNCE */ /* this is kind of a workaround for the fact that we may not be around * or are otherwise unable to reply when the DC sends out A_WELCOME_ALL */ void do_cl_join_announce(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { /* Once we hear from the DC, we can stop the timer * * This timer was started either on startup or when a node * left the CCM list */ /* dont announce if we're in one of these states */ if(cur_state != S_PENDING) { crm_warn("Do not announce ourselves in state %s", fsa_state2string(cur_state)); return; } if(AM_I_OPERATIONAL) { /* send as a broadcast */ xmlNode *req = create_request( CRM_OP_JOIN_ANNOUNCE, NULL, NULL, CRM_SYSTEM_DC, CRM_SYSTEM_CRMD, NULL); crm_debug("Announcing availability"); update_dc(NULL, FALSE); - send_msg_via_ha(req); + send_cluster_message(NULL, crm_msg_crmd, req, FALSE); free_xml(req); } else { /* Delay announce until we have finished local startup */ crm_warn("Delaying announce until local startup is complete"); return; } } static int query_call_id = 0; /* A_CL_JOIN_REQUEST */ /* aka. accept the welcome offer */ void do_cl_join_offer_respond(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg); const char *welcome_from = crm_element_value(input->msg, F_CRM_HOST_FROM); const char *join_id = crm_element_value(input->msg, F_CRM_JOIN_ID); #if 0 if(we are sick) { log error ; /* save the request for later? */ return; } #endif crm_debug_2("Accepting join offer: join-%s", crm_element_value(input->msg, F_CRM_JOIN_ID)); /* we only ever want the last one */ if(query_call_id > 0) { crm_debug_3("Cancelling previous join query: %d", query_call_id); remove_cib_op_callback(query_call_id, FALSE); query_call_id = 0; } update_dc(input->msg, FALSE); if(safe_str_neq(welcome_from, fsa_our_dc)) { /* dont do anything until DC's sort themselves out */ crm_err("Expected a welcome from %s, but %s replied", fsa_our_dc, welcome_from); return; } CRM_DEV_ASSERT(input != NULL); query_call_id = fsa_cib_conn->cmds->query( fsa_cib_conn, NULL, NULL, cib_scope_local); add_cib_op_callback( fsa_cib_conn, query_call_id, FALSE, crm_strdup(join_id), join_query_callback); crm_debug_2("Registered join query callback: %d", query_call_id); register_fsa_action(A_DC_TIMER_STOP); } void join_query_callback(xmlNode *msg, int call_id, int rc, xmlNode *output, void *user_data) { xmlNode *local_cib = NULL; char *join_id = user_data; xmlNode *generation = create_xml_node( NULL, XML_CIB_TAG_GENERATION_TUPPLE); CRM_DEV_ASSERT(join_id != NULL); query_call_id = 0; if(rc == cib_ok) { #if CRM_DEPRECATED_SINCE_2_0_4 if(safe_str_eq(crm_element_name(output), XML_TAG_CIB)) { local_cib = output; } else { local_cib = find_xml_node(output, XML_TAG_CIB, TRUE); } #else local_cib = output; CRM_DEV_ASSERT(safe_str_eq(crm_element_name(local_cib), XML_TAG_CIB)); #endif } if(local_cib != NULL) { xmlNode *reply = NULL; crm_debug("Respond to join offer join-%s", join_id); crm_debug("Acknowledging %s as our DC", fsa_our_dc); copy_in_properties(generation, local_cib); reply = create_request( CRM_OP_JOIN_REQUEST, generation, fsa_our_dc, CRM_SYSTEM_DC, CRM_SYSTEM_CRMD, NULL); crm_xml_add(reply, F_CRM_JOIN_ID, join_id); - send_msg_via_ha(reply); + send_cluster_message(fsa_our_dc, crm_msg_crmd, reply, TRUE); free_xml(reply); } else { crm_err("Could not retrieve Generation to attach to our" " join acknowledgement: %s", cib_error2string(rc)); register_fsa_error_adv( C_FSA_INTERNAL, I_ERROR, NULL, NULL, __FUNCTION__); } crm_free(join_id); free_xml(generation); } /* A_CL_JOIN_RESULT */ /* aka. this is notification that we have (or have not) been accepted */ void do_cl_join_finalize_respond(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { xmlNode *tmp1 = NULL; gboolean was_nack = TRUE; ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg); int join_id = -1; const char *op = crm_element_value(input->msg,F_CRM_TASK); const char *ack_nack = crm_element_value(input->msg,CRM_OP_JOIN_ACKNAK); const char *welcome_from = crm_element_value(input->msg,F_CRM_HOST_FROM); if(safe_str_neq(op, CRM_OP_JOIN_ACKNAK)) { crm_debug_2("Ignoring op=%s message", op); return; } /* calculate if it was an ack or a nack */ if(crm_is_true(ack_nack)) { was_nack = FALSE; } crm_element_value_int(input->msg, F_CRM_JOIN_ID, &join_id); if(was_nack) { crm_err("Join (join-%d) with leader %s failed (NACK'd): Shutting down", join_id, welcome_from); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); return; } if(AM_I_DC == FALSE && safe_str_eq(welcome_from, fsa_our_uname)) { crm_warn("Discarding our own welcome - we're no longer the DC"); return; } update_dc(input->msg, TRUE); /* send our status section to the DC */ crm_debug("Confirming join join-%d: %s", join_id, crm_element_value(input->msg, F_CRM_TASK)); tmp1 = do_lrm_query(TRUE); if(tmp1 != NULL) { xmlNode *reply = create_request( CRM_OP_JOIN_CONFIRM, tmp1, fsa_our_dc, CRM_SYSTEM_DC, CRM_SYSTEM_CRMD, NULL); crm_xml_add_int(reply, F_CRM_JOIN_ID, join_id); crm_debug("join-%d: Join complete." " Sending local LRM status to %s", join_id, fsa_our_dc); - send_msg_via_ha(reply); + send_cluster_message(fsa_our_dc, crm_msg_crmd, reply, TRUE); free_xml(reply); if(AM_I_DC == FALSE) { register_fsa_input_adv( cause, I_NOT_DC, NULL, A_NOTHING, TRUE, __FUNCTION__); update_attrd(NULL, NULL, NULL); } free_xml(tmp1); } else { crm_err("Could not send our LRM state to the DC"); register_fsa_error(C_FSA_INTERNAL, I_FAIL, NULL); } } diff --git a/crmd/join_dc.c b/crmd/join_dc.c index e5b4d5a972..21b69f0900 100644 --- a/crmd/join_dc.c +++ b/crmd/join_dc.c @@ -1,708 +1,708 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include GHashTable *welcomed_nodes = NULL; GHashTable *integrated_nodes = NULL; GHashTable *finalized_nodes = NULL; GHashTable *confirmed_nodes = NULL; char *max_epoch = NULL; char *max_generation_from = NULL; xmlNode *max_generation_xml = NULL; void initialize_join(gboolean before); gboolean finalize_join_for(gpointer key, gpointer value, gpointer user_data); void finalize_sync_callback(xmlNode *msg, int call_id, int rc, xmlNode *output, void *user_data); gboolean check_join_state(enum crmd_fsa_state cur_state, const char *source); static int current_join_id = 0; unsigned long long saved_ccm_membership_id = 0; void initialize_join(gboolean before) { /* clear out/reset a bunch of stuff */ crm_debug("join-%d: Initializing join data (flag=%s)", current_join_id, before?"true":"false"); g_hash_table_destroy(welcomed_nodes); g_hash_table_destroy(integrated_nodes); g_hash_table_destroy(finalized_nodes); g_hash_table_destroy(confirmed_nodes); if(before) { if(max_generation_from != NULL) { crm_free(max_generation_from); max_generation_from = NULL; } if(max_generation_xml != NULL) { free_xml(max_generation_xml); max_generation_xml = NULL; } clear_bit_inplace(fsa_input_register, R_HAVE_CIB); clear_bit_inplace(fsa_input_register, R_CIB_ASKED); } welcomed_nodes = g_hash_table_new_full( g_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); integrated_nodes = g_hash_table_new_full( g_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); finalized_nodes = g_hash_table_new_full( g_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); confirmed_nodes = g_hash_table_new_full( g_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); } void erase_node_from_join(const char *uname) { gboolean w = FALSE, i = FALSE, f = FALSE, c = FALSE; if(uname == NULL) { return; } if(welcomed_nodes != NULL) { w = g_hash_table_remove(welcomed_nodes, uname); } if(integrated_nodes != NULL) { i = g_hash_table_remove(integrated_nodes, uname); } if(finalized_nodes != NULL) { f = g_hash_table_remove(finalized_nodes, uname); } if(confirmed_nodes != NULL) { c = g_hash_table_remove(confirmed_nodes, uname); } if(w || i || f || c) { crm_info("Removed node %s from join calculations:" " welcomed=%d itegrated=%d finalized=%d confirmed=%d", uname, w, i, f, c); } } static void join_make_offer(gpointer key, gpointer value, gpointer user_data) { const char *join_to = NULL; const crm_node_t *member = value; CRM_ASSERT(member != NULL); if(crm_is_member_active(member) == FALSE) { return; } join_to = member->uname; if(join_to == NULL) { crm_err("No recipient for welcome message"); return; } erase_node_from_join(join_to); if(saved_ccm_membership_id != crm_peer_seq) { saved_ccm_membership_id = crm_peer_seq; crm_info("Making join offers based on membership %llu", crm_peer_seq); } if(member->processes & crm_proc_crmd) { xmlNode *offer = create_request( CRM_OP_JOIN_OFFER, NULL, join_to, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL); char *join_offered = crm_itoa(current_join_id); crm_xml_add_int(offer, F_CRM_JOIN_ID, current_join_id); /* send the welcome */ crm_debug("join-%d: Sending offer to %s", current_join_id, join_to); - send_msg_via_ha(offer); + send_cluster_message(join_to, crm_msg_crmd, offer, TRUE); free_xml(offer); g_hash_table_insert( welcomed_nodes, crm_strdup(join_to), join_offered); } else { crm_info("Peer process on %s is not active (yet?): %.8lx %d", join_to, (long)member->processes, g_hash_table_size(crm_peer_cache)); } } /* A_DC_JOIN_OFFER_ALL */ void do_dc_join_offer_all(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { /* reset everyones status back to down or in_ccm in the CIB * * any nodes that are active in the CIB but not in the CCM list * will be seen as offline by the PE anyway */ current_join_id++; initialize_join(TRUE); /* do_update_cib_nodes(TRUE, __FUNCTION__); */ update_dc(NULL, FALSE); if(cause == C_HA_MESSAGE && current_input == I_NODE_JOIN) { crm_info("A new node joined the cluster"); } g_hash_table_foreach(crm_peer_cache, join_make_offer, NULL); /* dont waste time by invoking the PE yet; */ crm_info("join-%d: Waiting on %d outstanding join acks", current_join_id, g_hash_table_size(welcomed_nodes)); } /* A_DC_JOIN_OFFER_ONE */ void do_dc_join_offer_one(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { crm_node_t *member; ha_msg_input_t *welcome = NULL; const char *op = NULL; const char *join_to = NULL; if(msg_data->data) { welcome = fsa_typed_data(fsa_dt_ha_msg); } else { crm_info("A new node joined - wait until it contacts us"); return; } if(welcome == NULL) { crm_err("Attempt to send welcome message " "without a message to reply to!"); return; } join_to = crm_element_value(welcome->msg, F_CRM_HOST_FROM); if(join_to == NULL) { crm_err("Attempt to send welcome message " "without a host to reply to!"); return; } member = crm_get_peer(0, join_to); if(member == NULL || crm_is_member_active(member) == FALSE) { crm_err("Attempt to send welcome message " "to a node not part of our partition!"); return; } op = crm_element_value(welcome->msg, F_CRM_TASK); if(join_to != NULL && (cur_state == S_INTEGRATION || cur_state == S_FINALIZE_JOIN)) { /* note: it _is_ possible that a node will have been * sick or starting up when the original offer was made. * however, it will either re-announce itself in due course * _or_ we can re-store the original offer on the client. */ crm_debug("(Re-)offering membership to %s...", join_to); } crm_info("join-%d: Processing %s request from %s in state %s", current_join_id, op, join_to, fsa_state2string(cur_state)); join_make_offer(NULL, member, NULL); /* always offer to the DC (ourselves) * this ensures the correct value for max_generation_from */ member = crm_get_peer(0, fsa_our_uname); join_make_offer(NULL, member, NULL); /* this was a genuine join request, cancel any existing * transition and invoke the PE */ start_transition(fsa_state); /* dont waste time by invoking the pe yet; */ crm_debug("Waiting on %d outstanding join acks for join-%d", g_hash_table_size(welcomed_nodes), current_join_id); } static int compare_int_fields(xmlNode *left, xmlNode *right, const char *field) { const char *elem_l = crm_element_value(left, field); const char *elem_r = crm_element_value(right, field); int int_elem_l = crm_int_helper(elem_l, NULL); int int_elem_r = crm_int_helper(elem_r, NULL); if(int_elem_l < int_elem_r) { return -1; } else if(int_elem_l > int_elem_r) { return 1; } return 0; } /* A_DC_JOIN_PROCESS_REQ */ void do_dc_join_filter_offer(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { xmlNode *generation = NULL; int cmp = 0; int join_id = -1; gboolean ack_nack_bool = TRUE; const char *ack_nack = CRMD_JOINSTATE_MEMBER; ha_msg_input_t *join_ack = fsa_typed_data(fsa_dt_ha_msg); const char *join_from = crm_element_value(join_ack->msg, F_CRM_HOST_FROM); const char *ref = crm_element_value(join_ack->msg, XML_ATTR_REFERENCE); crm_node_t *join_node = crm_get_peer(0, join_from); crm_debug("Processing req from %s", join_from); generation = join_ack->xml; crm_element_value_int(join_ack->msg, F_CRM_JOIN_ID, &join_id); if(max_generation_xml != NULL && generation != NULL) { int lpc = 0; const char *attributes[] = { XML_ATTR_GENERATION_ADMIN, XML_ATTR_GENERATION, XML_ATTR_NUMUPDATES, }; for(lpc = 0; cmp == 0 && lpc < DIMOF(attributes); lpc++) { cmp = compare_int_fields(max_generation_xml, generation, attributes[lpc]); } } if(join_id != current_join_id) { crm_debug("Invalid response from %s: join-%d vs. join-%d", join_from, join_id, current_join_id); check_join_state(cur_state, __FUNCTION__); return; } else if(join_node == NULL || crm_is_member_active(join_node) == FALSE) { crm_err("Node %s is not a member", join_from); ack_nack_bool = FALSE; } else if(generation == NULL) { crm_err("Generation was NULL"); ack_nack_bool = FALSE; } else if(max_generation_xml == NULL) { max_generation_xml = copy_xml(generation); max_generation_from = crm_strdup(join_from); } else if(cmp < 0 || (cmp == 0 && safe_str_eq(join_from, fsa_our_uname))) { crm_debug("%s has a better generation number than" " the current max %s", join_from, max_generation_from); if(max_generation_xml) { crm_log_xml_debug(max_generation_xml, "Max generation"); } crm_log_xml_debug(generation, "Their generation"); crm_free(max_generation_from); free_xml(max_generation_xml); max_generation_from = crm_strdup(join_from); max_generation_xml = copy_xml(join_ack->xml); } if(ack_nack_bool == FALSE) { /* NACK this client */ ack_nack = CRMD_JOINSTATE_NACK; crm_err("join-%d: NACK'ing node %s (ref %s)", join_id, join_from, ref); } else { crm_debug("join-%d: Welcoming node %s (ref %s)", join_id, join_from, ref); } /* add them to our list of CRMD_STATE_ACTIVE nodes */ g_hash_table_insert( integrated_nodes, crm_strdup(join_from), crm_strdup(ack_nack)); crm_debug("%u nodes have been integrated into join-%d", g_hash_table_size(integrated_nodes), join_id); g_hash_table_remove(welcomed_nodes, join_from); if(check_join_state(cur_state, __FUNCTION__) == FALSE) { /* dont waste time by invoking the PE yet; */ crm_debug("join-%d: Still waiting on %d outstanding offers", join_id, g_hash_table_size(welcomed_nodes)); } } /* A_DC_JOIN_FINALIZE */ void do_dc_join_finalize(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { char *sync_from = NULL; enum cib_errors rc = cib_ok; /* This we can do straight away and avoid clients timing us out * while we compute the latest CIB */ crm_debug("Finializing join-%d for %d clients", current_join_id, g_hash_table_size(integrated_nodes)); if(g_hash_table_size(integrated_nodes) == 0) { /* If we don't even have ourself, start again */ register_fsa_error_adv( C_FSA_INTERNAL, I_ELECTION_DC, NULL, NULL, __FUNCTION__); return; } clear_bit_inplace(fsa_input_register, R_HAVE_CIB); if(max_generation_from == NULL || safe_str_eq(max_generation_from, fsa_our_uname)){ set_bit_inplace(fsa_input_register, R_HAVE_CIB); } if(is_set(fsa_input_register, R_IN_TRANSITION)) { crm_warn("join-%d: We are still in a transition." " Delaying until the TE completes.", current_join_id); crmd_fsa_stall(NULL); return; } if(is_set(fsa_input_register, R_HAVE_CIB) == FALSE) { /* ask for the agreed best CIB */ sync_from = crm_strdup(max_generation_from); crm_log_xml_debug(max_generation_xml, "Requesting version"); set_bit_inplace(fsa_input_register, R_CIB_ASKED); } else { /* Send _our_ CIB out to everyone */ sync_from = crm_strdup(fsa_our_uname); } crm_info("join-%d: Syncing the CIB from %s to the rest of the cluster", current_join_id, sync_from); rc = fsa_cib_conn->cmds->sync_from( fsa_cib_conn, sync_from, NULL,cib_quorum_override); fsa_cib_conn->cmds->register_callback( fsa_cib_conn, rc, 60, FALSE, sync_from, "finalize_sync_callback", finalize_sync_callback); } void finalize_sync_callback(xmlNode *msg, int call_id, int rc, xmlNode *output, void *user_data) { CRM_DEV_ASSERT(cib_not_master != rc); clear_bit_inplace(fsa_input_register, R_CIB_ASKED); if(rc != cib_ok) { do_crm_log((rc==cib_old_data?LOG_WARNING:LOG_ERR), "Sync from %s resulted in an error: %s", (char*)user_data, cib_error2string(rc)); /* restart the whole join process */ register_fsa_error_adv( C_FSA_INTERNAL, I_ELECTION_DC,NULL,NULL,__FUNCTION__); } else if(AM_I_DC && fsa_state == S_FINALIZE_JOIN) { set_bit_inplace(fsa_input_register, R_HAVE_CIB); clear_bit_inplace(fsa_input_register, R_CIB_ASKED); /* make sure dc_uuid is re-set to us */ if(check_join_state(fsa_state, __FUNCTION__) == FALSE) { crm_debug("Notifying %d clients of join-%d results", g_hash_table_size(integrated_nodes), current_join_id); g_hash_table_foreach_remove( integrated_nodes, finalize_join_for, NULL); } } else { crm_debug("No longer the DC in S_FINALIZE_JOIN: %s/%s", AM_I_DC?"DC":"CRMd", fsa_state2string(fsa_state)); } crm_free(user_data); } static void join_update_complete_callback(xmlNode *msg, int call_id, int rc, xmlNode *output, void *user_data) { fsa_data_t *msg_data = NULL; if(rc == cib_ok) { crm_debug("Join update %d complete", call_id); check_join_state(fsa_state, __FUNCTION__); } else { crm_err("Join update %d failed", call_id); crm_log_xml(LOG_DEBUG, "failed", msg); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } } /* A_DC_JOIN_PROCESS_ACK */ void do_dc_join_ack(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { int join_id = -1; int call_id = 0; ha_msg_input_t *join_ack = fsa_typed_data(fsa_dt_ha_msg); const char *join_id_s = NULL; const char *join_state = NULL; const char *op = crm_element_value(join_ack->msg, F_CRM_TASK); const char *join_from = crm_element_value(join_ack->msg, F_CRM_HOST_FROM); if(safe_str_neq(op, CRM_OP_JOIN_CONFIRM)) { crm_debug("Ignoring op=%s message from %s", op, join_from); return; } crm_element_value_int(join_ack->msg, F_CRM_JOIN_ID, &join_id); join_id_s = crm_element_value(join_ack->msg, F_CRM_JOIN_ID); /* now update them to "member" */ crm_debug_2("Processing ack from %s", join_from); join_state = (const char *) g_hash_table_lookup(finalized_nodes, join_from); if(join_state == NULL) { crm_err("Join not in progress: ignoring join-%d from %s", join_id, join_from); return; } else if(safe_str_neq(join_state, CRMD_JOINSTATE_MEMBER)) { crm_err("Node %s wasnt invited to join the cluster",join_from); g_hash_table_remove(finalized_nodes, join_from); return; } else if(join_id != current_join_id) { crm_err("Invalid response from %s: join-%d vs. join-%d", join_from, join_id, current_join_id); g_hash_table_remove(finalized_nodes, join_from); return; } g_hash_table_remove(finalized_nodes, join_from); if(g_hash_table_lookup(confirmed_nodes, join_from) != NULL) { crm_err("join-%d: hash already contains confirmation from %s", join_id, join_from); } g_hash_table_insert( confirmed_nodes, crm_strdup(join_from), crm_strdup(join_id_s)); crm_info("join-%d: Updating node state to %s for %s", join_id, CRMD_JOINSTATE_MEMBER, join_from); /* update CIB with the current LRM status from the node * We dont need to notify the TE of these updates, a transition will * be started in due time */ erase_status_tag(join_from, XML_CIB_TAG_LRM); fsa_cib_update(XML_CIB_TAG_STATUS, join_ack->xml, cib_scope_local|cib_quorum_override|cib_can_create, call_id); add_cib_op_callback( fsa_cib_conn, call_id, FALSE, NULL, join_update_complete_callback); crm_debug("join-%d: Registered callback for LRM update %d", join_id, call_id); } gboolean finalize_join_for(gpointer key, gpointer value, gpointer user_data) { const char *join_to = NULL; const char *join_state = NULL; xmlNode *acknak = NULL; crm_node_t *join_node = NULL; if(key == NULL || value == NULL) { return TRUE; } join_to = (const char *)key; join_state = (const char *)value; /* make sure the node exists in the config section */ create_node_entry(join_to, join_to, NORMALNODE); join_node = crm_get_peer(0, join_to); if(crm_is_member_active(join_node) == FALSE) { /* * NACK'ing nodes that the membership layer doesn't know about yet * simply creates more churn * * Better to leave them waiting and let the join restart when * the new membership event comes in * * All other NACKs (due to versions etc) should still be processed */ return TRUE; } /* send the ack/nack to the node */ acknak = create_request( CRM_OP_JOIN_ACKNAK, NULL, join_to, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL); crm_xml_add_int(acknak, F_CRM_JOIN_ID, current_join_id); /* set the ack/nack */ if(safe_str_eq(join_state, CRMD_JOINSTATE_MEMBER)) { crm_debug("join-%d: ACK'ing join request from %s, state %s", current_join_id, join_to, join_state); crm_xml_add(acknak, CRM_OP_JOIN_ACKNAK, XML_BOOLEAN_TRUE); g_hash_table_insert( finalized_nodes, crm_strdup(join_to), crm_strdup(CRMD_JOINSTATE_MEMBER)); } else { crm_warn("join-%d: NACK'ing join request from %s, state %s", current_join_id, join_to, join_state); crm_xml_add(acknak, CRM_OP_JOIN_ACKNAK, XML_BOOLEAN_FALSE); } - send_msg_via_ha(acknak); + send_cluster_message(join_to, crm_msg_crmd, acknak, TRUE); free_xml(acknak); return TRUE; } void ghash_print_node(gpointer key, gpointer value, gpointer user_data); gboolean check_join_state(enum crmd_fsa_state cur_state, const char *source) { crm_debug("Invoked by %s in state: %s", source, fsa_state2string(cur_state)); if(saved_ccm_membership_id != crm_peer_seq) { crm_info("%s: Membership changed since join started: %llu -> %llu", source, saved_ccm_membership_id, crm_peer_seq); register_fsa_input_before(C_FSA_INTERNAL, I_NODE_JOIN, NULL); } else if(cur_state == S_INTEGRATION) { if(g_hash_table_size(welcomed_nodes) == 0) { crm_debug("join-%d: Integration of %d peers complete: %s", current_join_id, g_hash_table_size(integrated_nodes), source); register_fsa_input_before( C_FSA_INTERNAL, I_INTEGRATED, NULL); return TRUE; } } else if(cur_state == S_FINALIZE_JOIN) { if(is_set(fsa_input_register, R_HAVE_CIB) == FALSE) { crm_debug("join-%d: Delaying I_FINALIZED until we have the CIB", current_join_id); return TRUE; } else if(g_hash_table_size(integrated_nodes) == 0 && g_hash_table_size(finalized_nodes) == 0) { crm_debug("join-%d complete: %s", current_join_id, source); register_fsa_input_later(C_FSA_INTERNAL, I_FINALIZED, NULL); } else if(g_hash_table_size(integrated_nodes) != 0 && g_hash_table_size(finalized_nodes) != 0) { char *msg = NULL; crm_err("join-%d: Waiting on %d integrated nodes" " AND %d finalized nodes", current_join_id, g_hash_table_size(integrated_nodes), g_hash_table_size(finalized_nodes)); msg = crm_strdup("Integrated node"); g_hash_table_foreach(integrated_nodes, ghash_print_node, msg); crm_free(msg); msg = crm_strdup("Finalized node"); g_hash_table_foreach(finalized_nodes, ghash_print_node, msg); crm_free(msg); } else if(g_hash_table_size(integrated_nodes) != 0) { crm_debug("join-%d: Still waiting on %d integrated nodes", current_join_id, g_hash_table_size(integrated_nodes)); } else if(g_hash_table_size(finalized_nodes) != 0) { crm_debug("join-%d: Still waiting on %d finalized nodes", current_join_id, g_hash_table_size(finalized_nodes)); } } return FALSE; } void do_dc_join_final(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { crm_info("Ensuring DC, quorum and node attributes are up-to-date"); update_attrd(NULL, NULL, NULL); crm_update_quorum(crm_have_quorum, TRUE); } diff --git a/crmd/messages.c b/crmd/messages.c index 092d9c8caf..2762cdf671 100644 --- a/crmd/messages.c +++ b/crmd/messages.c @@ -1,1062 +1,1005 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include GListPtr fsa_message_queue = NULL; extern void crm_shutdown(int nsig); void handle_response(xmlNode *stored_msg); enum crmd_fsa_input handle_request(xmlNode *stored_msg); enum crmd_fsa_input handle_shutdown_request(xmlNode *stored_msg); ha_msg_input_t *copy_ha_msg_input(ha_msg_input_t *orig); gboolean ipc_queue_helper(gpointer key, gpointer value, gpointer user_data); #ifdef MSG_LOG # define ROUTER_RESULT(x) crm_debug_3("Router result: %s", x); \ crm_log_xml(LOG_MSG, "router.log", msg); #else # define ROUTER_RESULT(x) crm_debug_3("Router result: %s", x) #endif /* debug only, can wrap all it likes */ int last_data_id = 0; void register_fsa_error_adv( enum crmd_fsa_cause cause, enum crmd_fsa_input input, fsa_data_t *cur_data, void *new_data, const char *raised_from) { /* save the current actions if any */ if(fsa_actions != A_NOTHING) { register_fsa_input_adv( cur_data?cur_data->fsa_cause:C_FSA_INTERNAL, I_NULL, cur_data?cur_data->data:NULL, fsa_actions, TRUE, __FUNCTION__); } /* reset the action list */ fsa_actions = A_NOTHING; /* register the error */ register_fsa_input_adv( cause, input, new_data, A_NOTHING, TRUE, raised_from); } int register_fsa_input_adv( enum crmd_fsa_cause cause, enum crmd_fsa_input input, void *data, long long with_actions, gboolean prepend, const char *raised_from) { unsigned old_len = g_list_length(fsa_message_queue); fsa_data_t *fsa_data = NULL; last_data_id++; CRM_CHECK(raised_from != NULL, raised_from = ""); crm_debug_2("%s %s FSA input %d (%s) (cause=%s) %s data", raised_from, prepend?"prepended":"appended",last_data_id, fsa_input2string(input), fsa_cause2string(cause), data?"with":"without"); if(input == I_WAIT_FOR_EVENT) { do_fsa_stall = TRUE; crm_debug("Stalling the FSA pending further input: cause=%s", fsa_cause2string(cause)); if(old_len > 0) { crm_warn("%s stalled the FSA with pending inputs", raised_from); fsa_dump_queue(LOG_DEBUG); } if(data == NULL) { set_bit_inplace(fsa_actions, with_actions); with_actions = A_NOTHING; return 0; } crm_err("%s stalled the FSA with data - this may be broken", raised_from); } if(input == I_NULL && with_actions == A_NOTHING /* && data == NULL */){ /* no point doing anything */ crm_err("Cannot add entry to queue: no input and no action"); return 0; } crm_malloc0(fsa_data, sizeof(fsa_data_t)); fsa_data->id = last_data_id; fsa_data->fsa_input = input; fsa_data->fsa_cause = cause; fsa_data->origin = raised_from; fsa_data->data = NULL; fsa_data->data_type = fsa_dt_none; fsa_data->actions = with_actions; if(with_actions != A_NOTHING) { crm_debug_3("Adding actions %.16llx to input", with_actions); } if(data != NULL) { switch(cause) { case C_FSA_INTERNAL: case C_CRMD_STATUS_CALLBACK: case C_IPC_MESSAGE: case C_HA_MESSAGE: crm_debug_3("Copying %s data from %s as a HA msg", fsa_cause2string(cause), raised_from); CRM_CHECK(((ha_msg_input_t*)data)->msg != NULL, crm_err("Bogus data from %s", raised_from)); fsa_data->data = copy_ha_msg_input(data); fsa_data->data_type = fsa_dt_ha_msg; break; case C_LRM_OP_CALLBACK: crm_debug_3("Copying %s data from %s as lrm_op_t", fsa_cause2string(cause), raised_from); fsa_data->data = copy_lrm_op((lrm_op_t*)data); fsa_data->data_type = fsa_dt_lrm; break; case C_CCM_CALLBACK: case C_SUBSYSTEM_CONNECT: case C_LRM_MONITOR_CALLBACK: case C_TIMER_POPPED: case C_SHUTDOWN: case C_HEARTBEAT_FAILED: case C_HA_DISCONNECT: case C_ILLEGAL: case C_UNKNOWN: case C_STARTUP: crm_err("Copying %s data (from %s)" " not yet implemented", fsa_cause2string(cause), raised_from); exit(1); break; } crm_debug_4("%s data copied", fsa_cause2string(fsa_data->fsa_cause)); } /* make sure to free it properly later */ if(prepend) { crm_debug_2("Prepending input"); fsa_message_queue = g_list_prepend(fsa_message_queue, fsa_data); } else { fsa_message_queue = g_list_append(fsa_message_queue, fsa_data); } crm_debug_2("Queue len: %d", g_list_length(fsa_message_queue)); fsa_dump_queue(LOG_DEBUG_2); if(old_len == g_list_length(fsa_message_queue)){ crm_err("Couldnt add message to the queue"); } if(fsa_source) { crm_debug_3("Triggering FSA: %s", __FUNCTION__); G_main_set_trigger(fsa_source); } return last_data_id; } void fsa_dump_queue(int log_level) { if(log_level < (int)crm_log_level) { return; } slist_iter( data, fsa_data_t, fsa_message_queue, lpc, do_crm_log(log_level, "queue[%d(%d)]: input %s raised by %s()\t(cause=%s)", lpc, data->id, fsa_input2string(data->fsa_input), data->origin, fsa_cause2string(data->fsa_cause)); ); } ha_msg_input_t * copy_ha_msg_input(ha_msg_input_t *orig) { ha_msg_input_t *copy = NULL; xmlNodePtr data = NULL; if(orig != NULL) { crm_debug_4("Copy msg"); data = copy_xml(orig->msg); } else { crm_debug_3("No message to copy"); } copy = new_ha_msg_input(data); if(orig && orig->msg != NULL) { CRM_CHECK(copy->msg != NULL, crm_err("copy failed")); } return copy; } void delete_fsa_input(fsa_data_t *fsa_data) { lrm_op_t *op = NULL; xmlNode *foo = NULL; if(fsa_data == NULL) { return; } crm_debug_4("About to free %s data", fsa_cause2string(fsa_data->fsa_cause)); if(fsa_data->data != NULL) { switch(fsa_data->data_type) { case fsa_dt_ha_msg: delete_ha_msg_input(fsa_data->data); break; case fsa_dt_xml: foo = fsa_data->data; free_xml(foo); break; case fsa_dt_lrm: op = (lrm_op_t*)fsa_data->data; free_lrm_op(op); break; case fsa_dt_none: if(fsa_data->data != NULL) { crm_err("Dont know how to free %s data from %s", fsa_cause2string(fsa_data->fsa_cause), fsa_data->origin); exit(1); } break; } crm_debug_4("%s data freed", fsa_cause2string(fsa_data->fsa_cause)); } crm_free(fsa_data); } /* returns the next message */ fsa_data_t * get_message(void) { fsa_data_t* message = g_list_nth_data(fsa_message_queue, 0); fsa_message_queue = g_list_remove(fsa_message_queue, message); crm_debug_2("Processing input %d", message->id); return message; } /* returns the current head of the FIFO queue */ gboolean is_message(void) { return (g_list_length(fsa_message_queue) > 0); } void * fsa_typed_data_adv( fsa_data_t *fsa_data, enum fsa_data_type a_type, const char *caller) { void *ret_val = NULL; if(fsa_data == NULL) { do_crm_log(LOG_ERR, "%s: No FSA data available", caller); } else if(fsa_data->data == NULL) { do_crm_log(LOG_ERR, "%s: No message data available. Origin: %s", caller, fsa_data->origin); } else if(fsa_data->data_type != a_type) { do_crm_log(LOG_CRIT, "%s: Message data was the wrong type! %d vs. requested=%d." " Origin: %s", caller, fsa_data->data_type, a_type, fsa_data->origin); CRM_ASSERT(fsa_data->data_type == a_type); } else { ret_val = fsa_data->data; } return ret_val; } /* A_MSG_ROUTE */ void do_msg_route(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg); route_message(msg_data->fsa_cause, input->msg); } void route_message(enum crmd_fsa_cause cause, xmlNode *input) { ha_msg_input_t fsa_input; enum crmd_fsa_input result = I_NULL; fsa_input.msg = input; CRM_CHECK(cause == C_IPC_MESSAGE || cause == C_HA_MESSAGE, return); /* try passing the buck first */ if(relay_message(input, cause==C_IPC_MESSAGE)) { return; } /* handle locally */ result = handle_message(input); /* done or process later? */ switch(result) { case I_NULL: case I_CIB_OP: case I_ROUTER: case I_NODE_JOIN: case I_JOIN_REQUEST: case I_JOIN_RESULT: break; default: /* Defering local processing of message */ register_fsa_input_later(cause, result, &fsa_input); return; } if(result != I_NULL) { /* add to the front of the queue */ register_fsa_input(cause, result, &fsa_input); } } gboolean send_request(xmlNode *msg, char **msg_reference) { if(msg_reference != NULL) { *msg_reference = crm_strdup( crm_element_value(msg, XML_ATTR_REFERENCE)); } if(relay_message(msg, TRUE) == FALSE) { ha_msg_input_t fsa_input; fsa_input.msg = msg; register_fsa_input(C_IPC_MESSAGE, I_ROUTER, &fsa_input); return FALSE; } return TRUE; } gboolean relay_message(xmlNode *msg, gboolean originated_locally) { + int dest = 1; int is_for_dc = 0; int is_for_dcib = 0; int is_for_te = 0; int is_for_crm = 0; int is_for_cib = 0; int is_local = 0; gboolean processing_complete = FALSE; const char *host_to = crm_element_value(msg, F_CRM_HOST_TO); const char *sys_to = crm_element_value(msg, F_CRM_SYS_TO); const char *sys_from= crm_element_value(msg, F_CRM_SYS_FROM); const char *type = crm_element_value(msg, F_TYPE); const char *msg_error = NULL; crm_debug_3("Routing message %s", crm_element_value(msg, XML_ATTR_REFERENCE)); if(msg == NULL) { msg_error = "Cannot route empty message"; } else if(safe_str_eq(CRM_OP_HELLO, crm_element_value(msg, F_CRM_TASK))){ /* quietly ignore */ processing_complete = TRUE; } else if(safe_str_neq(type, T_CRM)) { msg_error = "Bad message type"; } else if(sys_to == NULL) { msg_error = "Bad message destination: no subsystem"; } if(msg_error != NULL) { processing_complete = TRUE; crm_err("%s", msg_error); crm_log_xml(LOG_WARNING, "bad msg", msg); } if(processing_complete) { return TRUE; } processing_complete = TRUE; is_for_dc = (strcasecmp(CRM_SYSTEM_DC, sys_to) == 0); is_for_dcib = (strcasecmp(CRM_SYSTEM_DCIB, sys_to) == 0); is_for_te = (strcasecmp(CRM_SYSTEM_TENGINE, sys_to) == 0); is_for_cib = (strcasecmp(CRM_SYSTEM_CIB, sys_to) == 0); is_for_crm = (strcasecmp(CRM_SYSTEM_CRMD, sys_to) == 0); is_local = 0; if(host_to == NULL || strlen(host_to) == 0) { if(is_for_dc || is_for_te) { is_local = 0; } else if(is_for_crm && originated_locally) { is_local = 0; } else { is_local = 1; } } else if(safe_str_eq(fsa_our_uname, host_to)) { is_local=1; - } + } if(is_for_dc || is_for_dcib || is_for_te) { if(AM_I_DC && is_for_te) { ROUTER_RESULT("Message result: Local relay"); send_msg_via_ipc(msg, sys_to); } else if(AM_I_DC) { ROUTER_RESULT("Message result: DC/CRMd process"); processing_complete = FALSE; /* more to be done by caller */ } else if(originated_locally && safe_str_neq(sys_from, CRM_SYSTEM_PENGINE) && safe_str_neq(sys_from, CRM_SYSTEM_TENGINE)) { /* Neither the TE or PE should be sending messages * to DC's on other nodes * * By definition, if we are no longer the DC, then * the PE or TE's data should be discarded */ +#if SUPPORT_AIS + if(is_openais_cluster()) { + dest = text2msg_type(sys_to); + } +#endif ROUTER_RESULT("Message result: External relay to DC"); - send_msg_via_ha(msg); + send_cluster_message(host_to, dest, msg, TRUE); } else { /* discard */ ROUTER_RESULT("Message result: Discard, not DC"); } } else if(is_local && (is_for_crm || is_for_cib)) { ROUTER_RESULT("Message result: CRMd process"); processing_complete = FALSE; /* more to be done by caller */ } else if(is_local) { ROUTER_RESULT("Message result: Local relay"); send_msg_via_ipc(msg, sys_to); } else { - ROUTER_RESULT("Message result: External relay"); - send_msg_via_ha(msg); +#if SUPPORT_AIS + if(is_openais_cluster()) { + dest = text2msg_type(sys_to); + } +#endif + ROUTER_RESULT("Message result: External relay"); + send_cluster_message(host_to, dest, msg, TRUE); } return processing_complete; } gboolean crmd_authorize_message(xmlNode *client_msg, crmd_client_t *curr_client) { /* check the best case first */ const char *sys_from = crm_element_value(client_msg, F_CRM_SYS_FROM); char *uuid = NULL; char *client_name = NULL; char *major_version = NULL; char *minor_version = NULL; const char *filtered_from; gpointer table_key = NULL; gboolean auth_result = FALSE; struct crm_subsystem_s *the_subsystem = NULL; gboolean can_reply = FALSE; /* no-one has registered with this id */ xmlNode *xml = NULL; const char *op = crm_element_value(client_msg, F_CRM_TASK); if (safe_str_neq(CRM_OP_HELLO, op)) { if(sys_from == NULL) { crm_warn("Message [%s] was had no value for %s... discarding", crm_element_value(client_msg, XML_ATTR_REFERENCE), F_CRM_SYS_FROM); return FALSE; } filtered_from = sys_from; /* The CIB can have two names on the DC */ if(strcasecmp(sys_from, CRM_SYSTEM_DCIB) == 0) filtered_from = CRM_SYSTEM_CIB; if (g_hash_table_lookup (ipc_clients, filtered_from) != NULL) { can_reply = TRUE; /* reply can be routed */ } crm_debug_2("Message reply can%s be routed from %s.", can_reply?"":" not", sys_from); if(can_reply == FALSE) { crm_warn("Message [%s] not authorized", crm_element_value(client_msg, XML_ATTR_REFERENCE)); } return can_reply; } crm_debug_3("received client join msg"); crm_log_xml(LOG_MSG, "join", client_msg); xml = get_message_xml(client_msg, F_CRM_DATA); auth_result = process_hello_message( xml, &uuid, &client_name, &major_version, &minor_version); if (auth_result == TRUE) { if(client_name == NULL || uuid == NULL) { crm_err("Bad client details (client_name=%s, uuid=%s)", crm_str(client_name), crm_str(uuid)); auth_result = FALSE; } } if (auth_result == TRUE) { /* check version */ int mav = atoi(major_version); int miv = atoi(minor_version); crm_debug_3("Checking client version number"); if (mav < 0 || miv < 0) { crm_err("Client version (%d:%d) is not acceptable", mav, miv); auth_result = FALSE; } crm_free(major_version); crm_free(minor_version); } if (safe_str_eq(CRM_SYSTEM_PENGINE, client_name)) { the_subsystem = pe_subsystem; } else if (safe_str_eq(CRM_SYSTEM_TENGINE, client_name)) { the_subsystem = te_subsystem; } /* TODO: Is this code required anymore?? */ if (auth_result == TRUE && the_subsystem != NULL) { /* if we already have one of those clients * only applies to te, pe etc. not admin clients */ crm_err("Checking if %s is required/already connected", client_name); table_key = (gpointer)crm_strdup(client_name); if(is_set(fsa_input_register, the_subsystem->flag_connected)) { auth_result = FALSE; crm_free(table_key); table_key = NULL; crm_warn("Bit\t%.16llx set in %.16llx", the_subsystem->flag_connected, fsa_input_register); crm_err("Client %s is already connected", client_name); } else if(FALSE == is_set(fsa_input_register, the_subsystem->flag_required)) { crm_warn("Bit\t%.16llx not set in %.16llx", the_subsystem->flag_connected, fsa_input_register); crm_warn("Client %s joined but we dont need it", client_name); stop_subsystem(the_subsystem, TRUE); } else { the_subsystem->ipc = curr_client->client_channel; set_bit_inplace(fsa_input_register, the_subsystem->flag_connected); } } else { table_key = (gpointer)generate_hash_key(client_name, uuid); } if (auth_result == TRUE) { crm_debug_2("Accepted client %s", crm_str(table_key)); curr_client->table_key = table_key; curr_client->sub_sys = crm_strdup(client_name); curr_client->uuid = crm_strdup(uuid); g_hash_table_insert (ipc_clients, table_key, curr_client->client_channel); send_hello_message(curr_client->client_channel, "n/a", CRM_SYSTEM_CRMD, "0", "1"); crm_debug_3("Updated client list with %s", crm_str(table_key)); crm_debug_3("Triggering FSA: %s", __FUNCTION__); G_main_set_trigger(fsa_source); if(the_subsystem != NULL) { CRM_CHECK(the_subsystem->client == NULL, process_client_disconnect(the_subsystem->client)); the_subsystem->client = curr_client; } } else { crm_free(table_key); crm_warn("Rejected client logon request"); curr_client->client_channel->ch_status = IPC_DISC_PENDING; } if(uuid != NULL) crm_free(uuid); if(minor_version != NULL) crm_free(minor_version); if(major_version != NULL) crm_free(major_version); if(client_name != NULL) crm_free(client_name); /* hello messages should never be processed further */ return FALSE; } enum crmd_fsa_input handle_message(xmlNode *msg) { const char *type = NULL; CRM_CHECK(msg != NULL, return I_NULL); type = crm_element_value(msg, F_CRM_MSG_TYPE); if(crm_str_eq(type, XML_ATTR_REQUEST, TRUE)) { return handle_request(msg); } else if(crm_str_eq(type, XML_ATTR_RESPONSE, TRUE)) { handle_response(msg); return I_NULL; } crm_err("Unknown message type: %s", type); return I_NULL; } enum crmd_fsa_input handle_request(xmlNode *stored_msg) { xmlNode *msg = NULL; const char *op = crm_element_value(stored_msg, F_CRM_TASK); /* Optimize this for the DC - it has the most to do */ if(op == NULL) { crm_log_xml(LOG_ERR, "Bad message", stored_msg); return I_NULL; } /*========== DC-Only Actions ==========*/ if(AM_I_DC) { if(strcmp(op, CRM_OP_JOIN_ANNOUNCE) == 0) { return I_NODE_JOIN; } else if(strcmp(op, CRM_OP_JOIN_REQUEST) == 0) { return I_JOIN_REQUEST; } else if(strcmp(op, CRM_OP_JOIN_CONFIRM) == 0) { return I_JOIN_RESULT; } else if(strcmp(op, CRM_OP_SHUTDOWN) == 0) { const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM); gboolean dc_match = safe_str_eq(host_from, fsa_our_dc); if(is_set(fsa_input_register, R_SHUTDOWN)) { crm_info("Shutting ourselves down (DC)"); return I_STOP; } else if(dc_match) { crm_err("We didnt ask to be shut down, yet our" " TE is telling us too." " Better get out now!"); return I_TERMINATE; } else if(fsa_state != S_STOPPING) { crm_err("Another node is asking us to shutdown" " but we think we're ok."); return I_ELECTION; } } else if(strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0) { /* a slave wants to shut down */ /* create cib fragment and add to message */ return handle_shutdown_request(stored_msg); } } /*========== common actions ==========*/ if(strcmp(op, CRM_OP_NOVOTE) == 0) { ha_msg_input_t fsa_input; fsa_input.msg = stored_msg; register_fsa_input_adv(C_HA_MESSAGE, I_NULL, &fsa_input, A_ELECTION_COUNT|A_ELECTION_CHECK, FALSE, __FUNCTION__); } else if(strcmp(op, CRM_OP_VOTE) == 0) { /* count the vote and decide what to do after that */ ha_msg_input_t fsa_input; fsa_input.msg = stored_msg; register_fsa_input_adv(C_HA_MESSAGE, I_NULL, &fsa_input, A_ELECTION_COUNT|A_ELECTION_CHECK, FALSE, __FUNCTION__); /* Sometimes we _must_ go into S_ELECTION */ if(fsa_state == S_HALT) { crm_debug("Forcing an election from S_HALT"); return I_ELECTION; #if 0 } else if(AM_I_DC) { /* This is the old way of doing things but what is gained? */ return I_ELECTION; #endif } } else if(strcmp(op, CRM_OP_JOIN_OFFER) == 0) { crm_debug("Raising I_JOIN_OFFER: join-%s", crm_element_value(stored_msg, F_CRM_JOIN_ID)); return I_JOIN_OFFER; } else if(strcmp(op, CRM_OP_JOIN_ACKNAK) == 0) { crm_debug("Raising I_JOIN_RESULT: join-%s", crm_element_value(stored_msg, F_CRM_JOIN_ID)); return I_JOIN_RESULT; } else if(strcmp(op, CRM_OP_LRM_DELETE) == 0 || strcmp(op, CRM_OP_LRM_FAIL) == 0 || strcmp(op, CRM_OP_LRM_REFRESH) == 0 || strcmp(op, CRM_OP_REPROBE) == 0) { crm_xml_add(stored_msg, F_CRM_SYS_TO, CRM_SYSTEM_LRMD); return I_ROUTER; } else if(strcmp(op, CRM_OP_NOOP) == 0) { return I_NULL; } else if(strcmp(op, CRM_OP_LOCAL_SHUTDOWN) == 0) { crm_shutdown(SIGTERM); /*return I_SHUTDOWN; */ return I_NULL; /*========== (NOT_DC)-Only Actions ==========*/ } else if(AM_I_DC == FALSE && strcmp(op, CRM_OP_SHUTDOWN) == 0) { const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM); gboolean dc_match = safe_str_eq(host_from, fsa_our_dc); if(dc_match || fsa_our_dc == NULL) { return I_STOP; } else { crm_warn("Discarding %s op from %s", op, host_from); } } else if(strcmp(op, CRM_OP_PING) == 0) { /* eventually do some stuff to figure out * if we /are/ ok */ const char *sys_to = crm_element_value(stored_msg, F_CRM_SYS_TO); xmlNode *ping = createPingAnswerFragment(sys_to, "ok"); crm_xml_add(ping, "crmd_state", fsa_state2string(fsa_state)); crm_info("Current ping state: %s", fsa_state2string(fsa_state)); msg = create_reply(stored_msg, ping); relay_message(msg, TRUE); free_xml(ping); free_xml(msg); /* probably better to do this via signals on the * local node */ } else if(strcmp(op, CRM_OP_DEBUG_UP) == 0) { alter_debug(DEBUG_INC); crm_info("Debug set to %d", get_crm_log_level()); } else if(strcmp(op, CRM_OP_DEBUG_DOWN) == 0) { alter_debug(DEBUG_DEC); crm_info("Debug set to %d", get_crm_log_level()); } else { crm_err("Unexpected request (%s) sent to %s", op, AM_I_DC?"the DC":"non-DC node"); crm_log_xml(LOG_ERR, "Unexpected", stored_msg); } return I_NULL; } void handle_response(xmlNode *stored_msg) { const char *op = crm_element_value(stored_msg, F_CRM_TASK); if(op == NULL) { crm_log_xml(LOG_ERR, "Bad message", stored_msg); } else if(AM_I_DC && strcmp(op, CRM_OP_PECALC) == 0) { /* Check if the PE answer been superceeded by a subsequent request? */ const char *msg_ref = crm_element_value(stored_msg, XML_ATTR_REFERENCE); if(msg_ref == NULL) { crm_err("%s - Ignoring calculation with no reference", op); } else if(safe_str_eq(msg_ref, fsa_pe_ref)) { ha_msg_input_t fsa_input; fsa_input.msg = stored_msg; register_fsa_input_later(C_IPC_MESSAGE, I_PE_SUCCESS, &fsa_input); crm_debug_2("Completed: %s...", fsa_pe_ref); crm_free(fsa_pe_ref); fsa_pe_ref = NULL; } else { crm_info("%s calculation %s is obsolete", op, msg_ref); } } else if(strcmp(op, CRM_OP_VOTE) == 0 || strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0 || strcmp(op, CRM_OP_SHUTDOWN) == 0) { } else { const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM); crm_err("Unexpected response (op=%s, src=%s) sent to the %s", op, host_from, AM_I_DC?"DC":"CRMd"); } } enum crmd_fsa_input handle_shutdown_request(xmlNode *stored_msg) { /* handle here to avoid potential version issues * where the shutdown message/proceedure may have * been changed in later versions. * * This way the DC is always in control of the shutdown */ char *when = NULL; time_t now = time(NULL); xmlNode *node_state = NULL; const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM); if(host_from == NULL) { /* we're shutting down and the DC */ host_from = fsa_our_uname; } crm_info("Creating shutdown request for %s (state=%s)", host_from, fsa_state2string(fsa_state)); crm_log_xml(LOG_MSG, "message", stored_msg); node_state = create_node_state( host_from, NULL, NULL, NULL, NULL, CRMD_STATE_INACTIVE, FALSE, __FUNCTION__); fsa_cib_anon_update(XML_CIB_TAG_STATUS, node_state, cib_quorum_override); crm_log_xml_debug_2(node_state, "Shutdown update"); free_xml(node_state); when = crm_itoa(now); update_attrd(host_from, XML_CIB_ATTR_SHUTDOWN, when); crm_free(when); /* will be picked up by the TE as long as its running */ start_transition(fsa_state); return I_NULL; } -/* frees msg upon completion */ -gboolean -send_msg_via_ha(xmlNode *msg) -{ - int log_level = LOG_DEBUG_3; - gboolean broadcast = FALSE; - gboolean all_is_good = TRUE; - - const char *op = crm_element_value(msg, F_CRM_TASK); - const char *sys_to = crm_element_value(msg, F_CRM_SYS_TO); - const char *host_to = crm_element_value(msg, F_CRM_HOST_TO); - enum crm_ais_msg_types dest = 0; - - if(is_openais_cluster()) { - dest = 1; -#if SUPPORT_AIS - dest = text2msg_type(sys_to); -#endif - } - - if (msg == NULL) { - crm_err("Attempt to send NULL Message via HA failed."); - all_is_good = FALSE; - } else { - crm_debug_4("Relaying message to (%s) via HA", host_to); - } - - if (all_is_good) { - if (sys_to == NULL || strlen(sys_to) == 0) { - crm_err("You did not specify a destination sub-system" - " for this message."); - all_is_good = FALSE; - } - } - - /* There are a number of messages may not need to be ordered. - * At a later point perhaps we should detect them and send them - * as unordered messages. - */ - if (all_is_good) { - if (host_to == NULL - || strlen(host_to) == 0 - || safe_str_eq(sys_to, CRM_SYSTEM_DC)) { - broadcast = TRUE; - all_is_good = send_cluster_message(NULL, dest, msg, FALSE); - } else { - all_is_good = send_cluster_message(host_to, dest, msg, FALSE); - } - } - - if(all_is_good == FALSE) { - log_level = LOG_WARNING; - } - - if(log_level == LOG_WARNING - || (safe_str_neq(op, CRM_OP_HBEAT))) { - do_crm_log(log_level, - "Sending %sHA message (ref=%s) to %s@%s %s.", - broadcast?"broadcast ":"directed ", - crm_element_value(msg, XML_ATTR_REFERENCE), - crm_str(sys_to), host_to==NULL?"":host_to, - all_is_good?"succeeded":"failed"); - } - - return all_is_good; -} - - /* msg is deleted by the time this returns */ extern gboolean process_te_message(xmlNode *msg, xmlNode *xml_data); gboolean send_msg_via_ipc(xmlNode *msg, const char *sys) { gboolean send_ok = TRUE; IPC_Channel *client_channel; client_channel = (IPC_Channel*)g_hash_table_lookup(ipc_clients, sys); if(crm_element_value(msg, F_CRM_HOST_FROM) == NULL) { crm_xml_add(msg, F_CRM_HOST_FROM, fsa_our_uname); } if (client_channel != NULL) { crm_debug_3("Sending message via channel %s.", sys); send_ok = send_ipc_message(client_channel, msg); } else if(sys != NULL && strcmp(sys, CRM_SYSTEM_TENGINE) == 0) { xmlNode *data = get_message_xml(msg, F_CRM_DATA); process_te_message(msg, data); } else if(sys != NULL && strcmp(sys, CRM_SYSTEM_LRMD) == 0) { fsa_data_t fsa_data; ha_msg_input_t fsa_input; fsa_input.msg = msg; fsa_input.xml = get_message_xml(msg, F_CRM_DATA); fsa_data.id = 0; fsa_data.actions = 0; fsa_data.data = &fsa_input; fsa_data.fsa_input = I_MESSAGE; fsa_data.fsa_cause = C_IPC_MESSAGE; fsa_data.origin = __FUNCTION__; fsa_data.data_type = fsa_dt_ha_msg; #ifdef FSA_TRACE crm_debug_2("Invoking action A_LRM_INVOKE (%.16llx)", A_LRM_INVOKE); #endif do_lrm_invoke(A_LRM_INVOKE, C_IPC_MESSAGE, fsa_state, I_MESSAGE, &fsa_data); } else { crm_err("Unknown Sub-system (%s)... discarding message.", crm_str(sys)); send_ok = FALSE; } return send_ok; } void msg_queue_helper(void) { #if SUPPORT_HEARTBEAT IPC_Channel *ipc = NULL; if(fsa_cluster_conn != NULL) { ipc = fsa_cluster_conn->llc_ops->ipcchan( fsa_cluster_conn); } if(ipc != NULL) { ipc->ops->resume_io(ipc); } /* g_hash_table_foreach_remove(ipc_clients, ipc_queue_helper, NULL); */ #endif } gboolean ipc_queue_helper(gpointer key, gpointer value, gpointer user_data) { crmd_client_t *ipc_client = value; if(ipc_client->client_channel != NULL) { ipc_client->client_channel->ops->is_message_pending(ipc_client->client_channel); } return FALSE; }