diff --git a/crm/crmd/join_client.c b/crm/crmd/join_client.c index e61a979645..4507e13c08 100644 --- a/crm/crmd/join_client.c +++ b/crm/crmd/join_client.c @@ -1,306 +1,296 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include int reannounce_count = 0; void join_query_callback(const HA_Message *msg, int call_id, int rc, crm_data_t *output, void *user_data); extern ha_msg_input_t *copy_ha_msg_input(ha_msg_input_t *orig); extern gboolean process_join_ack_msg( const char *join_from, crm_data_t *lrm_update); /* A_CL_JOIN_QUERY */ /* is there a DC out there? */ enum crmd_fsa_input do_cl_join_query(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { HA_Message *req = create_request(CRM_OP_JOIN_ANNOUNCE, NULL, NULL, CRM_SYSTEM_DC, CRM_SYSTEM_CRMD, NULL); crm_debug("c0) query"); send_msg_via_ha(fsa_cluster_conn, req); return I_NULL; } /* A_CL_JOIN_ANNOUNCE */ /* this is kind of a workaround for the the fact that we may not be around * or are otherwise unable to reply when the DC sends out A_WELCOME_ALL */ enum crmd_fsa_input do_cl_join_announce(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg); /* Once we hear from the DC, we can stop the timer * * This timer was started either on startup or when a node * left the CCM list */ /* dont announce if we're in one of these states */ if(cur_state != S_PENDING) { crm_warn("Do not announce ourselves in state %s", fsa_state2string(cur_state)); return I_NULL; } if(AM_I_OPERATIONAL) { const char *hb_from = cl_get_string( input->msg, F_CRM_HOST_FROM); crm_debug("c0) announce"); if(hb_from == NULL) { crm_err("Failed to determin origin of hb message"); register_fsa_error(C_FSA_INTERNAL, I_FAIL, NULL); return I_NULL; } if(fsa_our_dc == NULL) { crm_info("Set DC to %s", hb_from); fsa_our_dc = crm_strdup(hb_from); } else if(safe_str_neq(fsa_our_dc, hb_from)) { /* reset the fsa_our_dc to NULL */ crm_warn("Resetting our DC to NULL after DC_HB" " from unrecognised node."); crm_free(fsa_our_dc); fsa_our_dc = NULL; return I_NULL; /* for now, wait for the DC's * to settle down */ } /* send as a broadcast */ { HA_Message *req = create_request( CRM_OP_JOIN_ANNOUNCE, NULL, NULL, CRM_SYSTEM_DC, CRM_SYSTEM_CRMD, NULL); send_msg_via_ha(fsa_cluster_conn, req); } } else { /* Delay announce until we have finished local startup */ crm_warn("Delaying announce until local startup is complete"); return I_NULL; } return I_NULL; } static int query_call_id = 0; /* A_CL_JOIN_REQUEST */ /* aka. accept the welcome offer */ enum crmd_fsa_input do_cl_join_request(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg); const char *welcome_from = cl_get_string(input->msg, F_CRM_HOST_FROM); #if 0 if(we are sick) { log error ; /* save the request for later? */ return I_NULL; } #endif crm_debug("c1) processing join offer: %s", cl_get_string(input->msg, F_CRM_TASK)); /* we only ever want the last one */ if(query_call_id > 0) { crm_debug("Cancelling previous join query"); remove_cib_op_callback(query_call_id, FALSE); } if(fsa_our_dc == NULL) { crm_info("Set DC to %s", welcome_from); fsa_our_dc = crm_strdup(welcome_from); } else if(safe_str_neq(welcome_from, fsa_our_dc)) { /* dont do anything until DC's sort themselves out */ crm_err("Expected a welcome from %s, but %s replied", fsa_our_dc, welcome_from); return I_NULL; } CRM_DEV_ASSERT(input != NULL); query_call_id = fsa_cib_conn->cmds->query( fsa_cib_conn, NULL, NULL, cib_scope_local); add_cib_op_callback( query_call_id, TRUE, copy_ha_msg_input(input), join_query_callback); fsa_actions |= A_DC_TIMER_STOP; return I_NULL; } void join_query_callback(const HA_Message *msg, int call_id, int rc, crm_data_t *output, void *user_data) { crm_data_t *local_cib = NULL; ha_msg_input_t *input = user_data; crm_data_t *generation = create_xml_node( NULL, XML_CIB_TAG_GENERATION_TUPPLE); CRM_DEV_ASSERT(input != NULL); query_call_id = 0; if(rc == cib_ok) { local_cib = find_xml_node(output, XML_TAG_CIB, TRUE); } if(local_cib != NULL) { HA_Message *reply = NULL; + const char *join_id = ha_msg_value(input->msg, F_CRM_JOIN_ID); crm_debug("c2) respond to join offer"); crm_debug("Acknowledging %s as our DC", cl_get_string(input->msg, F_CRM_HOST_FROM)); copy_in_properties(generation, local_cib); reply = create_request( CRM_OP_JOIN_REQUEST, generation, fsa_our_dc, CRM_SYSTEM_DC, CRM_SYSTEM_CRMD, NULL); + ha_msg_add(reply, F_CRM_JOIN_ID, join_id); + send_msg_via_ha(fsa_cluster_conn, reply); } else { crm_err("Could not retrieve Generation to attach to our" " join acknowledgement: %s", cib_error2string(rc)); register_fsa_error_adv( C_FSA_INTERNAL, I_ERROR, NULL, NULL, __FUNCTION__); } delete_ha_msg_input(input); free_xml(generation); } /* A_CL_JOIN_RESULT */ /* aka. this is notification that we have (or have not) been accepted */ enum crmd_fsa_input do_cl_join_result(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { crm_data_t *tmp1 = NULL; gboolean was_nack = TRUE; ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg); const char *op = cl_get_string(input->msg,F_CRM_TASK); const char *ack_nack = cl_get_string(input->msg,CRM_OP_JOIN_ACKNAK); const char *welcome_from = cl_get_string(input->msg,F_CRM_HOST_FROM); if(safe_str_neq(op, CRM_OP_JOIN_ACKNAK)) { crm_debug("Ignoring op=%s message", op); return I_NULL; } /* calculate if it was an ack or a nack */ if(crm_is_true(ack_nack)) { was_nack = FALSE; } if(was_nack) { crm_err("Join with %s failed. NACK'd", welcome_from); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); return I_NULL; } if(AM_I_DC == FALSE && safe_str_eq(welcome_from, fsa_our_uname)) { crm_warn("Discarding our own welcome - we're no longer the DC"); return I_NULL; } else if(fsa_our_dc == NULL) { crm_info("Set DC to %s", welcome_from); fsa_our_dc = crm_strdup(welcome_from); } /* send our status section to the DC */ crm_debug("c3) confirming join: %s", cl_get_string(input->msg, F_CRM_TASK)); crm_debug("Discovering local LRM status"); tmp1 = do_lrm_query(TRUE); if(tmp1 != NULL) { -#if 0 - if(AM_I_DC) { - process_join_ack_msg(fsa_our_uname, tmp1); - - } else { - HA_Message *reply = create_request( - CRM_OP_JOIN_CONFIRM, tmp1, fsa_our_dc, - CRM_SYSTEM_DC, CRM_SYSTEM_CRMD, NULL); - - crm_debug("Sending local LRM status"); - send_msg_via_ha(fsa_cluster_conn, reply); - register_fsa_input(cause, I_NOT_DC, NULL); - } -#else + const char *join_id = ha_msg_value(input->msg, F_CRM_JOIN_ID); HA_Message *reply = create_request( CRM_OP_JOIN_CONFIRM, tmp1, fsa_our_dc, CRM_SYSTEM_DC, CRM_SYSTEM_CRMD, NULL); + ha_msg_add(reply, F_CRM_JOIN_ID, join_id); crm_debug("Sending local LRM status"); send_msg_via_ha(fsa_cluster_conn, reply); if(AM_I_DC == FALSE) { register_fsa_input(cause, I_NOT_DC, NULL); } -#endif free_xml(tmp1); } else { crm_err("Could send our LRM state to the DC"); register_fsa_error(C_FSA_INTERNAL, I_FAIL, NULL); } return I_NULL; } diff --git a/crm/crmd/join_dc.c b/crm/crmd/join_dc.c index 5565c20e74..6d9fed5695 100644 --- a/crm/crmd/join_dc.c +++ b/crm/crmd/join_dc.c @@ -1,576 +1,603 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include GHashTable *welcomed_nodes = NULL; GHashTable *integrated_nodes = NULL; GHashTable *finalized_nodes = NULL; GHashTable *confirmed_nodes = NULL; char *max_epoche = NULL; char *max_generation_from = NULL; crm_data_t *max_generation_xml = NULL; void initialize_join(gboolean before); gboolean finalize_join_for(gpointer key, gpointer value, gpointer user_data); void join_send_offer(gpointer key, gpointer value, gpointer user_data); void finalize_sync_callback(const HA_Message *msg, int call_id, int rc, crm_data_t *output, void *user_data); -gboolean process_join_ack_msg(const char *join_from, crm_data_t *lrm_update); +gboolean process_join_ack_msg( + const char *join_from, crm_data_t *lrm_update, int join_id); gboolean check_join_state(enum crmd_fsa_state cur_state, const char *source); +static int current_join_id = 0; + /* A_DC_JOIN_OFFER_ALL */ enum crmd_fsa_input do_dc_join_offer_all(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { /* reset everyones status back to down or in_ccm in the CIB */ crm_data_t *fragment = create_cib_fragment(NULL, NULL); crm_data_t *update = find_xml_node(fragment, XML_TAG_CIB, TRUE); /* any nodes that are active in the CIB but not in the CCM list * will be seen as offline by the PE anyway */ update = get_object_root(XML_CIB_TAG_STATUS, update); CRM_DEV_ASSERT(update != NULL); /* now process the CCM data */ do_update_cib_nodes(fragment, TRUE); crm_debug("0) Offering membership to %d clients", fsa_membership_copy->members_size); initialize_join(TRUE); + current_join_id++; + g_hash_table_foreach( fsa_membership_copy->members, join_send_offer, NULL); /* dont waste time by invoking the PE yet; */ crm_debug("1) Waiting on %d outstanding join acks", g_hash_table_size(welcomed_nodes)); return I_NULL; } /* A_DC_JOIN_OFFER_ONE */ enum crmd_fsa_input do_dc_join_offer_one(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { oc_node_t member; gpointer a_node = NULL; ha_msg_input_t *welcome = fsa_typed_data(fsa_dt_ha_msg); const char *join_to = NULL; if(welcome == NULL) { crm_err("Attempt to send welcome message " "without a message to reply to!"); return I_NULL; } join_to = cl_get_string(welcome->msg, F_CRM_HOST_FROM); if(a_node != NULL && (cur_state == S_INTEGRATION || cur_state == S_FINALIZE_JOIN)) { /* note: it _is_ possible that a node will have been * sick or starting up when the original offer was made. * however, it will either re-announce itself in due course * _or_ we can re-store the original offer on the client. */ crm_info("Re-offering membership to %s...", join_to); } crm_debug("Processing annouce request from %s in state %s", join_to, fsa_state2string(cur_state)); /* always offer to the DC (ourselves) * this ensures the correct value for max_generation_from */ member.node_uname = crm_strdup(fsa_our_uname); join_send_offer(NULL, &member, NULL); crm_free(member.node_uname); member.node_uname = crm_strdup(join_to); join_send_offer(NULL, &member, NULL); crm_free(member.node_uname); /* this was a genuine join request, cancel any existing * transition and invoke the PE */ if(need_transition(fsa_state)) { register_fsa_action(A_TE_CANCEL); } /* dont waste time by invoking the pe yet; */ crm_debug("1) Waiting on %d outstanding join acks", g_hash_table_size(welcomed_nodes)); return I_NULL; } /* A_DC_JOIN_PROCESS_REQ */ enum crmd_fsa_input do_dc_join_req(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { crm_data_t *generation = NULL; - gboolean is_a_member = FALSE; + int join_id = -1; + gboolean ack_nack_bool = TRUE; const char *ack_nack = CRMD_JOINSTATE_MEMBER; ha_msg_input_t *join_ack = fsa_typed_data(fsa_dt_ha_msg); const char *join_from = cl_get_string(join_ack->msg,F_CRM_HOST_FROM); const char *ref = cl_get_string(join_ack->msg,XML_ATTR_REFERENCE); - + gpointer join_node = g_hash_table_lookup(fsa_membership_copy->members, join_from); - crm_devel("Processing req from %s", join_from); - - if(join_node != NULL) { - is_a_member = TRUE; - } + crm_devel("2) Processing req from %s", join_from); generation = join_ack->xml; - + ha_msg_value_int(join_ack->msg, F_CRM_JOIN_ID, &join_id); crm_xml_debug(max_generation_xml, "Max generation"); crm_xml_debug(generation, "Their generation"); - if(max_generation_xml == NULL) { + if(join_node == NULL) { + crm_err("Node %s is not a member", join_from); + ack_nack_bool = FALSE; + + } else if(generation == NULL) { + crm_err("Generation was NULL"); + ack_nack_bool = FALSE; + + } else if(join_id != current_join_id) { + crm_debug("Response from %s was for invalid join: %d vs. %d", + join_from, join_id, current_join_id); + return I_NULL; + + } else if(max_generation_xml == NULL) { max_generation_xml = copy_xml_node_recursive(generation); max_generation_from = crm_strdup(join_from); } else if(cib_compare_generation(max_generation_xml, generation) < 0) { crm_debug("%s has a better generation number than" " the current max %s", join_from, max_generation_from); crm_free(max_generation_from); free_xml(max_generation_xml); max_generation_from = crm_strdup(join_from); max_generation_xml = copy_xml_node_recursive(join_ack->xml); } - crm_xml_debug(max_generation_xml, "Current max generation"); - - crm_debug("2) Welcoming node %s after ACK (ref %s)", join_from, ref); - - /* add them to our list of CRMD_STATE_ACTIVE nodes */ + crm_xml_debug(max_generation_xml, "Current max generation"); - if(/* some reason */ 0) { + if(ack_nack_bool == FALSE) { /* NACK this client */ ack_nack = CRMD_JOINSTATE_DOWN; - } - + crm_err("2) NACK'ing node %s (ref %s)", join_from, ref); + } else { + crm_debug("2) Welcoming node %s after ACK (ref %s)", + join_from, ref); + } + + /* add them to our list of CRMD_STATE_ACTIVE nodes */ g_hash_table_insert( integrated_nodes, crm_strdup(join_from), crm_strdup(ack_nack)); crm_debug("%u nodes have been integrated", g_hash_table_size(integrated_nodes)); g_hash_table_remove(welcomed_nodes, join_from); if(check_join_state(cur_state, __FUNCTION__) == FALSE) { /* dont waste time by invoking the PE yet; */ crm_debug("Still waiting on %d outstanding join acks", g_hash_table_size(welcomed_nodes)); } return I_NULL; } #define JOIN_AFTER_SYNC 1 /* A_DC_JOIN_FINALIZE */ enum crmd_fsa_input do_dc_join_finalize(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { enum cib_errors rc = cib_ok; /* This we can do straight away and avoid clients timing us out * while we compute the latest CIB */ #if JOIN_AFTER_SYNC crm_debug("Finializing join for %d clients", g_hash_table_size(integrated_nodes)); #else crm_debug("Notifying %d clients of join results", g_hash_table_size(integrated_nodes)); g_hash_table_foreach_remove( integrated_nodes, finalize_join_for, NULL); #endif clear_bit_inplace(fsa_input_register, R_HAVE_CIB); if(max_generation_from == NULL || safe_str_eq(max_generation_from, fsa_our_uname)){ set_bit_inplace(fsa_input_register, R_HAVE_CIB); } if(is_set(fsa_input_register, R_HAVE_CIB) == FALSE) { /* ask for the agreed best CIB */ crm_info("Asking %s for its copy of the CIB", crm_str(max_generation_from)); set_bit_inplace(fsa_input_register, R_CIB_ASKED); fsa_cib_conn->call_timeout = 10; rc = fsa_cib_conn->cmds->sync_from( fsa_cib_conn, max_generation_from, NULL, cib_quorum_override); fsa_cib_conn->call_timeout = 0; /* back to the default */ add_cib_op_callback(rc, FALSE, crm_strdup(max_generation_from), finalize_sync_callback); return I_NULL; } clear_bit_inplace(fsa_input_register, R_CIB_ASKED); crm_devel("Bumping the epoche and syncing to %d clients", g_hash_table_size(finalized_nodes)); fsa_cib_conn->cmds->bump_epoch( fsa_cib_conn, cib_scope_local|cib_inhibit_bcast|cib_quorum_override); #if JOIN_AFTER_SYNC crm_debug("Notifying %d clients of join results", g_hash_table_size(integrated_nodes)); if(check_join_state(cur_state, __FUNCTION__) == FALSE) { crm_debug("Notifying %d clients of join results", g_hash_table_size(integrated_nodes)); g_hash_table_foreach_remove( integrated_nodes, finalize_join_for, NULL); } #endif rc = fsa_cib_conn->cmds->sync(fsa_cib_conn, NULL, cib_quorum_override); return I_NULL; } void finalize_sync_callback(const HA_Message *msg, int call_id, int rc, crm_data_t *output, void *user_data) { CRM_DEV_ASSERT(cib_not_master != rc); clear_bit_inplace(fsa_input_register, R_CIB_ASKED); if(rc == cib_remote_timeout) { crm_err("Sync from %s resulted in an error: %s." " Use what we have...", (char*)user_data, cib_error2string(rc)); #if 0 /* restart the whole join process */ register_fsa_error_adv(C_FSA_INTERNAL, I_ELECTION_DC, NULL, NULL, __FUNCTION__); return; #else rc = cib_ok; #endif } if(rc < cib_ok) { crm_err("Sync from %s resulted in an error: %s", (char*)user_data, cib_error2string(rc)); register_fsa_error_adv( C_FSA_INTERNAL, I_ERROR, NULL, NULL, __FUNCTION__); } else if(AM_I_DC) { set_bit_inplace(fsa_input_register, R_HAVE_CIB); fsa_cib_conn->cmds->bump_epoch( fsa_cib_conn, cib_quorum_override); #if JOIN_AFTER_SYNC crm_debug("Notifying %d clients of join results", g_hash_table_size(integrated_nodes)); g_hash_table_foreach_remove( integrated_nodes, finalize_join_for, NULL); #else check_join_state(cur_state, __FUNCTION__); #endif } crm_free(user_data); } /* A_DC_JOIN_PROCESS_ACK */ enum crmd_fsa_input do_dc_join_ack(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { ha_msg_input_t *join_ack = fsa_typed_data(fsa_dt_ha_msg); const char *join_from = cl_get_string(join_ack->msg, F_CRM_HOST_FROM); const char *op = cl_get_string(join_ack->msg, F_CRM_TASK); if(safe_str_neq(op, CRM_OP_JOIN_CONFIRM)) { crm_debug("Ignoring op=%s message", op); } else { - process_join_ack_msg(join_from, join_ack->xml); + int join_id = -1; + ha_msg_value_int(join_ack->msg, F_CRM_JOIN_ID, &join_id); + process_join_ack_msg(join_from, join_ack->xml, join_id); } return I_NULL; } gboolean -process_join_ack_msg(const char *join_from, crm_data_t *lrm_update) +process_join_ack_msg(const char *join_from, crm_data_t *lrm_update, int join_id) { /* now update them to "member" */ crm_data_t *update = NULL; const char *join_state = NULL; crm_debug("Processing ack from %s", join_from); join_state = (const char *) g_hash_table_lookup(finalized_nodes, join_from); if(join_state == NULL) { crm_err("Join not in progress: ignoring join from %s", join_from); return FALSE; } else if(safe_str_neq(join_state, CRMD_JOINSTATE_MEMBER)) { crm_err("Node %s wasnt invited to join the cluster",join_from); g_hash_table_remove(finalized_nodes, join_from); return FALSE; - + + } else if(join_id != current_join_id) { + crm_err("Node %s responded to an invalid join: %d vs. %d", + join_from, join_id, current_join_id); + g_hash_table_remove(finalized_nodes, join_from); + return FALSE; + } else { g_hash_table_remove(finalized_nodes, join_from); } if(g_hash_table_lookup(confirmed_nodes, join_from) != NULL) { crm_err("hash already contains confirmation from %s",join_from); } g_hash_table_insert(confirmed_nodes, crm_strdup(join_from), crm_strdup(CRMD_JOINSTATE_MEMBER)); /* the updates will actually occur in reverse order because of * the LIFO nature of the fsa input queue */ /* update CIB with the current LRM status from the node */ update_local_cib(copy_xml_node_recursive(lrm_update)); /* update node entry in the status section */ crm_info("4) Updating node state to %s for %s", join_state, join_from); update = create_node_state( join_from, join_from, ACTIVESTATUS, NULL, ONLINESTATUS, join_state, join_state); set_xml_property_copy(update,XML_CIB_ATTR_EXPSTATE, CRMD_STATE_ACTIVE); update_local_cib(create_cib_fragment(update, NULL)); check_join_state(fsa_state, __FUNCTION__); return TRUE; } gboolean finalize_join_for(gpointer key, gpointer value, gpointer user_data) { const char *join_to = NULL; const char *join_state = NULL; HA_Message *acknak = NULL; if(key == NULL || value == NULL) { return TRUE; } join_to = (const char *)key; join_state = (const char *)value; /* make sure the node exists in the config section */ create_node_entry(join_to, join_to, CRMD_JOINSTATE_MEMBER); /* send the ack/nack to the node */ acknak = create_request( CRM_OP_JOIN_ACKNAK, NULL, join_to, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL); + ha_msg_add_int(acknak, F_CRM_JOIN_ID, current_join_id); /* set the ack/nack */ if(safe_str_eq(join_state, CRMD_JOINSTATE_MEMBER)) { crm_info("3) ACK'ing join request from %s, state %s", join_to, join_state); ha_msg_add(acknak, CRM_OP_JOIN_ACKNAK, XML_BOOLEAN_TRUE); g_hash_table_insert( finalized_nodes, crm_strdup(join_to), crm_strdup(CRMD_JOINSTATE_MEMBER)); } else { crm_warn("3) NACK'ing join request from %s, state %s", join_to, join_state); ha_msg_add(acknak, CRM_OP_JOIN_ACKNAK, XML_BOOLEAN_FALSE); } send_msg_via_ha(fsa_cluster_conn, acknak); return TRUE; } void initialize_join(gboolean before) { /* clear out/reset a bunch of stuff */ crm_debug("Initializing join data"); g_hash_table_destroy(welcomed_nodes); g_hash_table_destroy(integrated_nodes); g_hash_table_destroy(finalized_nodes); g_hash_table_destroy(confirmed_nodes); if(before) { if(max_generation_from != NULL) { crm_free(max_generation_from); max_generation_from = NULL; } if(max_generation_xml != NULL) { free_xml(max_generation_xml); max_generation_xml = NULL; } clear_bit_inplace(fsa_input_register, R_HAVE_CIB); clear_bit_inplace(fsa_input_register, R_CIB_ASKED); } welcomed_nodes = g_hash_table_new_full( g_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); integrated_nodes = g_hash_table_new_full( g_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); finalized_nodes = g_hash_table_new_full( g_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); confirmed_nodes = g_hash_table_new_full( g_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); } void join_send_offer(gpointer key, gpointer value, gpointer user_data) { const char *join_to = NULL; const char *crm_online = NULL; const oc_node_t *member = (const oc_node_t*)value; if(member != NULL) { join_to = member->node_uname; } if(join_to == NULL) { crm_err("No recipient for welcome message"); return; } g_hash_table_remove(confirmed_nodes, join_to); g_hash_table_remove(finalized_nodes, join_to); g_hash_table_remove(integrated_nodes, join_to); g_hash_table_remove(welcomed_nodes, join_to); crm_online = g_hash_table_lookup(crmd_peer_state, join_to); if(safe_str_eq(crm_online, ONLINESTATUS)) { HA_Message *offer = create_request( CRM_OP_JOIN_OFFER, NULL, join_to, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL); + ha_msg_add_int(offer, F_CRM_JOIN_ID, current_join_id); /* send the welcome */ - crm_info("Sending %s to %s", CRM_OP_JOIN_OFFER, join_to); + crm_info("Sending %s(%d) to %s", + CRM_OP_JOIN_OFFER, current_join_id, join_to); send_msg_via_ha(fsa_cluster_conn, offer); g_hash_table_insert(welcomed_nodes, crm_strdup(join_to), crm_strdup(CRMD_JOINSTATE_PENDING)); } else { crm_debug("Peer process on %s is not active", join_to); } } gboolean check_join_state(enum crmd_fsa_state cur_state, const char *source) { crm_debug("Invoked by %s in state: %s", source, fsa_state2string(cur_state)); if(cur_state == S_INTEGRATION) { if(g_hash_table_size(welcomed_nodes) == 0) { crm_info("Integration of %d peers complete: %s", g_hash_table_size(integrated_nodes), source); register_fsa_input_later( C_FSA_INTERNAL, I_INTEGRATED, NULL); return TRUE; } } else if(cur_state == S_FINALIZE_JOIN) { if(is_set(fsa_input_register, R_HAVE_CIB) == FALSE) { crm_debug("Delaying I_FINALIZED until we have the CIB"); return TRUE; } else if(g_hash_table_size(integrated_nodes) == 0 && g_hash_table_size(finalized_nodes) == 0) { crm_info("Join process complete: %s", source); register_fsa_input_later( C_FSA_INTERNAL, I_FINALIZED, NULL); } else if(g_hash_table_size(integrated_nodes) != 0 && g_hash_table_size(finalized_nodes) != 0) { crm_err("Waiting on %d integrated nodes" " AND %d confirmations", g_hash_table_size(integrated_nodes), g_hash_table_size(finalized_nodes)); } else if(g_hash_table_size(integrated_nodes) != 0) { crm_debug("Still waiting on %d integrated nodes", g_hash_table_size(integrated_nodes)); } else if(g_hash_table_size(finalized_nodes) != 0) { crm_debug("Still waiting on %d confirmations", g_hash_table_size(finalized_nodes)); } } return FALSE; }