diff --git a/crm/crmd/callbacks.c b/crm/crmd/callbacks.c index b2d6d28d3d..757c10ff56 100644 --- a/crm/crmd/callbacks.c +++ b/crm/crmd/callbacks.c @@ -1,616 +1,618 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include GHashTable *crmd_peer_state = NULL; crm_data_t *find_xml_in_hamessage(const HA_Message * msg); void crmd_ha_connection_destroy(gpointer user_data); /* From join_dc... */ extern gboolean check_join_state( enum crmd_fsa_state cur_state, const char *source); /* #define MAX_EMPTY_CALLBACKS 20 */ /* int empty_callbacks = 0; */ gboolean crmd_ha_msg_dispatch(IPC_Channel *channel, gpointer user_data) { int lpc = 0; ll_cluster_t *hb_cluster = (ll_cluster_t*)user_data; while(lpc < 2 && hb_cluster->llc_ops->msgready(hb_cluster)) { if(channel->ch_status != IPC_CONNECT) { /* there really is no point continuing */ break; } lpc++; /* invoke the callbacks but dont block */ hb_cluster->llc_ops->rcvmsg(hb_cluster, 0); } crm_devel("%d HA messages dispatched", lpc); G_main_set_trigger(fsa_source); if (channel && (channel->ch_status != IPC_CONNECT)) { crm_crit("Lost connection to heartbeat service."); return FALSE; } return TRUE; } void crmd_ha_msg_callback(const HA_Message * msg, void* private_data) { ha_msg_input_t *new_input = NULL; oc_node_t *from_node = NULL; const char *from = ha_msg_value(msg, F_ORIG); const char *seq = ha_msg_value(msg, F_SEQ); const char *op = ha_msg_value(msg, F_CRM_TASK); const char *sys_to = ha_msg_value(msg, F_CRM_SYS_TO); const char *sys_from = ha_msg_value(msg, F_CRM_SYS_FROM); CRM_DEV_ASSERT(from != NULL); if(fsa_membership_copy == NULL) { crm_debug("Ignoring HA messages until we are" " connected to the CCM (%s op from %s)", op, from); crm_log_message_adv( LOG_MSG, "HA[inbound]: Ignore (No CCM)", msg); return; } from_node = g_hash_table_lookup(fsa_membership_copy->members, from); if(from_node == NULL) { int level = LOG_DEBUG; if(safe_str_eq(op, CRM_OP_VOTE)) { level = LOG_WARNING; } else if(AM_I_DC && safe_str_eq(op, CRM_OP_JOIN_ANNOUNCE)) { level = LOG_WARNING; } else if(safe_str_eq(sys_from, CRM_SYSTEM_DC)) { level = LOG_WARNING; } do_crm_log(level, __FILE__, __FUNCTION__, "Ignoring HA message (op=%s) from %s: not in our" " membership list (size=%d)", op, from, g_hash_table_size(fsa_membership_copy->members)); crm_log_message_adv(LOG_MSG, "HA[inbound]: CCM Discard", msg); } else if(AM_I_DC && safe_str_eq(sys_from, CRM_SYSTEM_DC) && safe_str_neq(from, fsa_our_uname)) { crm_err("Another DC detected: %s (op=%s)", from, op); crm_log_message_adv( LOG_WARNING, "HA[inbound]: Duplicate DC", msg); new_input = new_ha_msg_input(msg); /* make sure the election happens NOW */ register_fsa_error_adv(C_FSA_INTERNAL, I_ELECTION, NULL, new_input, __FUNCTION__); #if 0 /* still thinking about this one... * could create a timing issue if we dont notice the * election before a new DC is elected. */ } else if(fsa_our_dc != NULL && safe_str_eq(sys_from, CRM_SYSTEM_DC) && safe_str_neq(from, fsa_our_dc)) { crm_warn("Ignoring message from wrong DC: %s vs. %s ", from, fsa_our_dc); crm_log_message_adv(LOG_WARNING, "HA[inbound]: wrong DC", msg); #endif } else if(safe_str_eq(sys_to, CRM_SYSTEM_DC) && AM_I_DC == FALSE) { crm_verbose("Ignoring message for the DC [F_SEQ=%s]", seq); crm_log_message_adv(LOG_TRACE, "HA[inbound]: ignore", msg); return; } else if(safe_str_eq(from, fsa_our_uname) && safe_str_eq(op, CRM_OP_VOTE)) { crm_log_message_adv(LOG_TRACE, "HA[inbound]", msg); crm_verbose("Ignoring our own vote [F_SEQ=%s]: own vote", seq); return; } else if(AM_I_DC && safe_str_eq(op, CRM_OP_HBEAT)) { crm_verbose("Ignoring our own heartbeat [F_SEQ=%s]", seq); crm_log_message_adv(LOG_TRACE, "HA[inbound]: own heartbeat", msg); return; } else { crm_devel("Processing message"); crm_log_message_adv(LOG_MSG, "HA[inbound]", msg); new_input = new_ha_msg_input(msg); register_fsa_input(C_HA_MESSAGE, I_ROUTER, new_input); } #if 0 if(ha_msg_value(msg, XML_ATTR_REFERENCE) == NULL) { ha_msg_add(new_input->msg, XML_ATTR_REFERENCE, seq); } #endif delete_ha_msg_input(new_input); return; } /* * Apparently returning TRUE means "stay connected, keep doing stuff". * Returning FALSE means "we're all done, close the connection" */ gboolean crmd_ipc_msg_callback(IPC_Channel *client, gpointer user_data) { int lpc = 0; IPC_Message *msg = NULL; ha_msg_input_t *new_input = NULL; crmd_client_t *curr_client = (crmd_client_t*)user_data; gboolean stay_connected = TRUE; crm_verbose("Processing IPC message from %s", curr_client->table_key); while(lpc == 0 && client->ops->is_message_pending(client)) { if (client->ch_status != IPC_CONNECT) { /* The message which was pending for us is that * the IPC status is now IPC_DISCONNECT */ break; } if (client->ops->recv(client, &msg) != IPC_OK) { perror("Receive failure:"); crm_err("[%s] [receive failure]", curr_client->table_key); stay_connected = FALSE; break; } else if (msg == NULL) { crm_err("No message from %s this time", curr_client->table_key); continue; } lpc++; new_input = new_ipc_msg_input(msg); msg->msg_done(msg); crm_verbose("Processing msg from %s", curr_client->table_key); crm_log_message_adv(LOG_MSG, "CRMd[inbound]", new_input->msg); crmd_authorize_message(new_input, curr_client); delete_ha_msg_input(new_input); msg = NULL; new_input = NULL; } crm_verbose("Processed %d messages", lpc); if (client->ch_status != IPC_CONNECT) { stay_connected = FALSE; crm_verbose("received HUP from %s", curr_client->table_key); if (curr_client != NULL) { struct crm_subsystem_s *the_subsystem = NULL; if (curr_client->sub_sys == NULL) { crm_debug("Client hadn't registered with us yet"); } else if (strcmp(CRM_SYSTEM_PENGINE, curr_client->sub_sys) == 0) { the_subsystem = pe_subsystem; } else if (strcmp(CRM_SYSTEM_TENGINE, curr_client->sub_sys) == 0) { the_subsystem = te_subsystem; } else if (strcmp(CRM_SYSTEM_CIB, curr_client->sub_sys) == 0){ the_subsystem = cib_subsystem; } if(the_subsystem != NULL) { cleanup_subsystem(the_subsystem); } /* else that was a transient client */ if (curr_client->table_key != NULL) { /* * Key is destroyed below: * curr_client->table_key * Value is cleaned up by: * G_main_del_IPC_Channel */ g_hash_table_remove( ipc_clients, curr_client->table_key); } #if 0 if(curr_client->client_source != NULL) { gboolean det = G_main_del_IPC_Channel( curr_client->client_source); crm_verbose("crm_client was %s detached", det?"successfully":"not"); } #endif crm_free(curr_client->table_key); crm_free(curr_client->sub_sys); crm_free(curr_client->uuid); crm_free(curr_client); } } G_main_set_trigger(fsa_source); return stay_connected; } gboolean lrm_dispatch(IPC_Channel*src_not_used, gpointer user_data) { int num_msgs = 0; ll_lrm_t *lrm = (ll_lrm_t*)user_data; crm_devel("received callback"); num_msgs = lrm->lrm_ops->rcvmsg(lrm, FALSE); if(num_msgs < 1) { crm_err("lrm->lrm_ops->rcvmsg() failed, connection lost?"); clear_bit_inplace(fsa_input_register, R_LRM_CONNECTED); register_fsa_input(C_FSA_INTERNAL, I_ERROR, NULL); G_main_set_trigger(fsa_source); return FALSE; } return TRUE; } void lrm_op_callback(lrm_op_t* op) { CRM_DEV_ASSERT(op != NULL); if(crm_assert_failed) { return; } crm_debug("received callback: %s/%s (%s)", op->op_type, op->rsc_id, op_status2text(op->op_status)); /* Make sure the LRM events are received in order */ register_fsa_input_later(C_LRM_OP_CALLBACK, I_LRM_EVENT, op); G_main_set_trigger(fsa_source); } void crmd_ha_status_callback( const char *node, const char * status, void* private_data) { crm_data_t *update = NULL; crm_devel("received callback"); crm_notice("Status update: Node %s now has status [%s]",node,status); if(safe_str_neq(status, DEADSTATUS)) { crm_devel("nstatus callback was not for a dead node"); return; } /* this node is taost */ update = create_node_state( node, node, status, NULL, NULL, NULL, NULL); set_xml_property_copy( update, XML_CIB_ATTR_CLEAR_SHUTDOWN, XML_BOOLEAN_TRUE); update_local_cib(create_cib_fragment(update, NULL)); G_main_set_trigger(fsa_source); free_xml(update); } void crmd_client_status_callback(const char * node, const char * client, const char * status, void * private) { const char *join = NULL; const char *extra = NULL; crm_data_t * update = NULL; crm_devel("received callback"); if(safe_str_neq(client, CRM_SYSTEM_CRMD)) { return; } if(safe_str_eq(status, JOINSTATUS)){ status = ONLINESTATUS; extra = XML_CIB_ATTR_CLEAR_SHUTDOWN; } else if(safe_str_eq(status, LEAVESTATUS)){ status = OFFLINESTATUS; join = CRMD_JOINSTATE_DOWN; extra = XML_CIB_ATTR_CLEAR_SHUTDOWN; } set_bit_inplace(fsa_input_register, R_PEER_DATA); g_hash_table_replace( crmd_peer_state, crm_strdup(node), crm_strdup(status)); if(fsa_state == S_STARTING || fsa_state == S_STOPPING) { return; } crm_notice("Status update: Client %s/%s now has status [%s]", node, client, status); if(safe_str_eq(node, fsa_our_dc) && safe_str_eq(status, OFFLINESTATUS)) { /* did our DC leave us */ crm_info("Got client status callback - our DC is dead"); register_fsa_input(C_CRMD_STATUS_CALLBACK, I_ELECTION, NULL); } else { crm_devel("Got client status callback"); update = create_node_state( node, node, NULL, NULL, status, join, NULL); set_xml_property_copy(update, extra, XML_BOOLEAN_TRUE); update_local_cib(create_cib_fragment(update, NULL)); free_xml(update); if(AM_I_DC && safe_str_eq(status, OFFLINESTATUS)) { g_hash_table_remove(confirmed_nodes, node); g_hash_table_remove(finalized_nodes, node); g_hash_table_remove(integrated_nodes, node); g_hash_table_remove(welcomed_nodes, node); check_join_state(fsa_state, __FUNCTION__); } } G_main_set_trigger(fsa_source); } void crmd_ha_connection_destroy(gpointer user_data) { crm_crit("Heartbeat has left us"); /* this is always an error */ /* feed this back into the FSA */ register_fsa_input(C_HA_DISCONNECT, I_ERROR, NULL); G_main_set_trigger(fsa_source); } gboolean crmd_client_connect(IPC_Channel *client_channel, gpointer user_data) { if (client_channel == NULL) { crm_err("Channel was NULL"); } else if (client_channel->ch_status == IPC_DISCONNECT) { crm_err("Channel was disconnected"); } else { crmd_client_t *blank_client = NULL; crm_devel("Channel connected"); crm_malloc0(blank_client, sizeof(crmd_client_t)); if (blank_client == NULL) { return FALSE; } client_channel->ops->set_recv_qlen(client_channel, 100); client_channel->ops->set_send_qlen(client_channel, 100); blank_client->client_channel = client_channel; blank_client->sub_sys = NULL; blank_client->uuid = NULL; blank_client->table_key = NULL; blank_client->client_source = G_main_add_IPC_Channel( G_PRIORITY_LOW, client_channel, FALSE, crmd_ipc_msg_callback, blank_client, default_ipc_connection_destroy); } return TRUE; } gboolean ccm_dispatch(int fd, gpointer user_data) { int rc = 0; oc_ev_t *ccm_token = (oc_ev_t*)user_data; gboolean was_error = FALSE; crm_devel("received callback"); rc = oc_ev_handle_event(ccm_token); if(rc != 0) { crm_err("CCM connection appears to have failed: rc=%d.", rc); register_fsa_input(C_CCM_CALLBACK, I_ERROR, NULL); was_error = TRUE; } G_main_set_trigger(fsa_source); return !was_error; } static gboolean fsa_have_quorum = FALSE; void crmd_ccm_msg_callback( oc_ed_t event, void *cookie, size_t size, const void *data) { int instance = -1; gboolean update_cache = FALSE; struct crmd_ccm_data_s *event_data = NULL; const oc_ev_membership_t *membership = data; gboolean update_quorum = FALSE; gboolean trigger_transition = FALSE; crm_devel("received callback"); if(data != NULL) { instance = membership->m_instance; } crm_info("Quorum %s after event=%s (id=%d)", ccm_have_quorum(event)?"(re)attained":"lost", ccm_event_name(event), instance); switch(event) { case OC_EV_MS_NEW_MEMBERSHIP: case OC_EV_MS_INVALID: update_cache = TRUE; update_quorum = TRUE; if(AM_I_DC && need_transition(fsa_state)) { trigger_transition = TRUE; } break; case OC_EV_MS_NOT_PRIMARY: #if UNTESTED if(AM_I_DC == FALSE) { break; } /* tell the TE to pretend it completed and stop */ /* side effect: we'll end up in S_IDLE */ register_fsa_action(A_TE_HALT, TRUE); #endif break; case OC_EV_MS_PRIMARY_RESTORED: if(AM_I_DC && need_transition(fsa_state)) { fsa_membership_copy->id = instance; trigger_transition = TRUE; } break; case OC_EV_MS_EVICTED: update_quorum = TRUE; register_fsa_input(C_FSA_INTERNAL, I_STOP, NULL); break; default: crm_err("Unknown CCM event: %d", event); } if(update_quorum && ccm_have_quorum(event) == FALSE) { /* did we just loose quorum? */ if(fsa_have_quorum && need_transition(fsa_state)) { crm_info("Quorum lost: triggering transition (%s)", ccm_event_name(event)); trigger_transition = TRUE; } fsa_have_quorum = FALSE; } else if(update_quorum) { crm_debug("Updating quorum after event %s", ccm_event_name(event)); fsa_have_quorum = TRUE; } if(update_cache) { crm_debug("Updating cache after event %s", ccm_event_name(event)); crm_malloc0(event_data, sizeof(struct crmd_ccm_data_s)); if(event_data == NULL) { return; } event_data->event = event; if(data != NULL) { event_data->oc = copy_ccm_oc_data(data); } - register_fsa_input_later( - C_CCM_CALLBACK, I_CCM_EVENT, event_data); + register_fsa_input_adv( + C_CCM_CALLBACK, I_CCM_EVENT, event_data, + trigger_transition?A_TE_CANCEL:A_NOTHING, + FALSE, __FUNCTION__); if (event_data->oc) { crm_free(event_data->oc); event_data->oc = NULL; } crm_free(event_data); - } - if(trigger_transition) { + } else if(trigger_transition) { crm_debug("Scheduling transition after event %s", ccm_event_name(event)); - register_fsa_action(A_TE_CANCEL, FALSE); + register_fsa_action(A_TE_CANCEL); } oc_ev_callback_done(cookie); return; } void crmd_cib_connection_destroy(gpointer user_data) { if(is_set(fsa_input_register, R_SHUTDOWN)) { crm_info("Connection to the CIB terminated..."); return; } /* eventually this will trigger a reconnect, not a shutdown */ crm_err("Connection to the CIB terminated..."); register_fsa_input(C_FSA_INTERNAL, I_ERROR, NULL); clear_bit_inplace(fsa_input_register, R_CIB_CONNECTED); G_main_set_trigger(fsa_source); return; } longclock_t fsa_start = 0; longclock_t fsa_stop = 0; longclock_t fsa_diff = 0; -int fsa_diff_ms = 0; gboolean crm_fsa_trigger(gpointer user_data) { + unsigned int fsa_diff_ms = 0; if(fsa_diff_max_ms > 0) { fsa_start = time_longclock(); } s_crmd_fsa(C_FSA_INTERNAL); if(fsa_diff_max_ms > 0) { fsa_stop = time_longclock(); - fsa_diff = sub_longclock(fsa_start, fsa_stop); + fsa_diff = sub_longclock(fsa_stop, fsa_start); fsa_diff_ms = longclockto_ms(fsa_diff); - if(fsa_diff_ms > fsa_diff_max_ms) { + if(fsa_diff_ms < 0 || fsa_diff_ms > fsa_diff_max_ms) { crm_err("FSA took %dms to complete", fsa_diff_ms); } + } return TRUE; } diff --git a/crm/crmd/fsa.c b/crm/crmd/fsa.c index bc096e7515..41c5deb9b1 100644 --- a/crm/crmd/fsa.c +++ b/crm/crmd/fsa.c @@ -1,694 +1,701 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include extern uint highest_born_on; extern int num_join_invites; extern GHashTable *welcomed_nodes; extern GHashTable *integrated_nodes; extern GHashTable *finalized_nodes; extern GHashTable *confirmed_nodes; extern void initialize_join(gboolean before); long long do_state_transition(long long actions, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_state next_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data); long long clear_flags(long long actions, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input cur_input); void dump_rsc_info(void); void dump_rsc_info_callback(const HA_Message *msg, int call_id, int rc, crm_data_t *output, void *user_data); void ghash_print_node(gpointer key, gpointer value, gpointer user_data); #define DOT_PREFIX "live.dot: " #define DOT_LOG LOG_DEBUG #define do_dot_log(fmt...) do_crm_log(DOT_LOG, NULL, NULL, fmt) #define do_dot_action(fmt...) do_crm_log(DOT_LOG+1, NULL, NULL, fmt) longclock_t action_start = 0; longclock_t action_stop = 0; longclock_t action_diff = 0; -int action_diff_ms = 0; +unsigned int action_diff_ms = 0; #define IF_FSA_ACTION(x,y) \ if(is_set(fsa_actions,x)) { \ last_action = x; \ fsa_actions = clear_bit(fsa_actions, x); \ crm_verbose("Invoking action %s (%.16llx)", \ fsa_action2string(x), x); \ if(action_diff_max_ms > 0) { \ action_start = time_longclock(); \ } \ next_input = y(x, cause, fsa_state, last_input, fsa_data); \ if(action_diff_max_ms > 0) { \ action_stop = time_longclock(); \ - action_diff = sub_longclock(action_start, action_stop); \ + action_diff = sub_longclock(action_stop, action_start); \ action_diff_ms = longclockto_ms(action_diff); \ if(action_diff_ms > action_diff_max_ms) { \ crm_err("Action %s took %dms to complete", \ fsa_action2string(x), \ action_diff_ms); \ } \ } \ crm_verbose("Action complete: %s (%.16llx)", \ fsa_action2string(x), x); \ CRM_DEV_ASSERT(next_input == I_NULL); \ do_dot_action(DOT_PREFIX"\t// %s", fsa_action2string(x)); \ } /* #define ELSEIF_FSA_ACTION(x,y) else IF_FSA_ACTION(x,y) */ void init_dotfile(void); void init_dotfile(void) { do_dot_log(DOT_PREFIX"digraph \"g\" {"); do_dot_log(DOT_PREFIX" size = \"30,30\""); do_dot_log(DOT_PREFIX" graph ["); do_dot_log(DOT_PREFIX" fontsize = \"12\""); do_dot_log(DOT_PREFIX" fontname = \"Times-Roman\""); do_dot_log(DOT_PREFIX" fontcolor = \"black\""); do_dot_log(DOT_PREFIX" bb = \"0,0,398.922306,478.927856\""); do_dot_log(DOT_PREFIX" color = \"black\""); do_dot_log(DOT_PREFIX" ]"); do_dot_log(DOT_PREFIX" node ["); do_dot_log(DOT_PREFIX" fontsize = \"12\""); do_dot_log(DOT_PREFIX" fontname = \"Times-Roman\""); do_dot_log(DOT_PREFIX" fontcolor = \"black\""); do_dot_log(DOT_PREFIX" shape = \"ellipse\""); do_dot_log(DOT_PREFIX" color = \"black\""); do_dot_log(DOT_PREFIX" ]"); do_dot_log(DOT_PREFIX" edge ["); do_dot_log(DOT_PREFIX" fontsize = \"12\""); do_dot_log(DOT_PREFIX" fontname = \"Times-Roman\""); do_dot_log(DOT_PREFIX" fontcolor = \"black\""); do_dot_log(DOT_PREFIX" color = \"black\""); do_dot_log(DOT_PREFIX" ]"); do_dot_log(DOT_PREFIX"// special nodes"); do_dot_log(DOT_PREFIX" \"S_PENDING\" "); do_dot_log(DOT_PREFIX" ["); do_dot_log(DOT_PREFIX" color = \"blue\""); do_dot_log(DOT_PREFIX" fontcolor = \"blue\""); do_dot_log(DOT_PREFIX" ]"); do_dot_log(DOT_PREFIX" \"S_TERMINATE\" "); do_dot_log(DOT_PREFIX" ["); do_dot_log(DOT_PREFIX" color = \"red\""); do_dot_log(DOT_PREFIX" fontcolor = \"red\""); do_dot_log(DOT_PREFIX" ]"); do_dot_log(DOT_PREFIX"// DC only nodes"); do_dot_log(DOT_PREFIX" \"S_INTEGRATION\" [ fontcolor = \"green\" ]"); do_dot_log(DOT_PREFIX" \"S_POLICY_ENGINE\" [ fontcolor = \"green\" ]"); do_dot_log(DOT_PREFIX" \"S_TRANSITION_ENGINE\" [ fontcolor = \"green\" ]"); do_dot_log(DOT_PREFIX" \"S_RELEASE_DC\" [ fontcolor = \"green\" ]"); do_dot_log(DOT_PREFIX" \"S_IDLE\" [ fontcolor = \"green\" ]"); } volatile enum crmd_fsa_state fsa_state = S_STARTING; oc_node_list_t *fsa_membership_copy; ll_cluster_t *fsa_cluster_conn; ll_lrm_t *fsa_lrm_conn; volatile long long fsa_input_register; volatile long long fsa_actions = A_NOTHING; const char *fsa_our_uname; char *fsa_our_dc; cib_t *fsa_cib_conn = NULL; fsa_timer_t *election_trigger = NULL; /* */ fsa_timer_t *election_timeout = NULL; /* */ fsa_timer_t *shutdown_escalation_timer = NULL; /* */ fsa_timer_t *shutdown_timer = NULL; /* */ fsa_timer_t *integration_timer = NULL; fsa_timer_t *finalization_timer = NULL; fsa_timer_t *dc_heartbeat = NULL; fsa_timer_t *wait_timer = NULL; volatile gboolean do_fsa_stall = FALSE; enum crmd_fsa_state s_crmd_fsa(enum crmd_fsa_cause cause) { time_t now; fsa_data_t *fsa_data = NULL; long long register_copy = fsa_input_register; long long new_actions = A_NOTHING; long long last_action = A_NOTHING; enum crmd_fsa_input last_input = I_NULL; enum crmd_fsa_input cur_input = I_NULL; enum crmd_fsa_input next_input = I_NULL; enum crmd_fsa_state starting_state = fsa_state; enum crmd_fsa_state last_state = starting_state; enum crmd_fsa_state next_state = starting_state; crm_verbose("FSA invoked with Cause: %s\tState: %s", fsa_cause2string(cause), fsa_state2string(fsa_state)); /* * Process actions in order of priority but do only one * action at a time to avoid complicating the ordering. * * Actions may result in a new I_ event, these are added to * (not replace) existing actions before the next iteration. * */ do_fsa_stall = FALSE; - while(next_input != I_NULL || fsa_actions != A_NOTHING || is_message()) { + while(next_input != I_NULL || fsa_actions != A_NOTHING || is_message()){ msg_queue_helper(); if(do_fsa_stall) { /* we may be waiting for an a-sync task to "happen" * and until it does, we cant do anything else */ crm_info("Wait until something else happens"); break; } else if((is_message() && fsa_data == NULL) || (is_message() && fsa_actions == A_NOTHING && next_input == I_NULL)) { fsa_data_t *stored_msg = NULL; crm_devel("Finished with current input..." " Checking messages (%d remaining)", g_list_length(fsa_message_queue)); next_input = I_NULL; stored_msg = get_message(); + crm_debug("Processing queued input %d", stored_msg->id); if(stored_msg == NULL) { crm_crit("Invalid stored message"); exit(1); } delete_fsa_input(fsa_data); if(stored_msg->fsa_cause == C_CCM_CALLBACK) { crm_devel("FSA processing CCM callback from %s", stored_msg->origin); } else if(stored_msg->fsa_cause == C_LRM_OP_CALLBACK) { crm_devel("FSA processing LRM callback from %s", stored_msg->origin); } else if(stored_msg->data == NULL) { crm_devel("FSA processing input from %s", stored_msg->origin); } else { ha_msg_input_t *ha_input = fsa_typed_data_adv( stored_msg, fsa_dt_ha_msg, __FUNCTION__); crm_devel("FSA processing XML message from %s", stored_msg->origin); crm_log_message(LOG_MSG, ha_input->msg); crm_xml_devel(ha_input->xml, "FSA message data"); } fsa_data = stored_msg; /* set up the input */ next_input = fsa_data->fsa_input; /* add any actions back to the queue */ fsa_actions |= fsa_data->actions; /* update the cause */ cause = fsa_data->fsa_cause; fsa_dump_actions(fsa_data->actions, "\tadded back"); do_dot_log(DOT_PREFIX"\t// FSA input: State=%s \tCause=%s" - " \tInput=%s \tOrigin=%s()", + " \tInput=%s \tOrigin=%s() \tid=%d", fsa_state2string(fsa_state), fsa_cause2string(fsa_data->fsa_cause), fsa_input2string(fsa_data->fsa_input), - fsa_data->origin); + fsa_data->origin, fsa_data->id); } else if(fsa_data == NULL) { crm_malloc0(fsa_data, sizeof(fsa_data_t)); fsa_data->fsa_input = I_NULL; fsa_data->fsa_cause = cause; fsa_data->actions = A_NOTHING; fsa_data->origin = "s_crmd_fsa (enter)"; fsa_data->data = NULL; fsa_data->data_type = fsa_dt_none; if(fsa_data->origin == NULL) { crm_crit("Out of memory"); exit(1); } } /* update input variables */ cur_input = next_input; if(cur_input != I_NULL) { /* record the most recent non I_NULL input */ crm_devel("Updating last_input to %s", fsa_input2string(cur_input)); last_input = cur_input; } /* get the next batch of actions */ new_actions = crmd_fsa_actions[cur_input][fsa_state]; if(new_actions != A_NOTHING) { #ifdef FSA_TRACE crm_verbose("Adding actions %.16llx for %s/%s", new_actions, fsa_input2string(cur_input), fsa_state2string(fsa_state)); fsa_dump_actions(new_actions, "\tscheduled"); #endif fsa_actions |= new_actions; } if(fsa_data == NULL) { crm_err("No input for FSA.... terminating"); exit(1); } #ifdef FSA_TRACE crm_verbose("FSA while loop:\tState: %s, Cause: %s," " Input: %s, Origin=%s", fsa_state2string(crmd_fsa_state[cur_input][fsa_state]), fsa_cause2string(fsa_data->fsa_cause), fsa_input2string(cur_input), fsa_data->origin); #endif /* logging : *before* the state is changed */ IF_FSA_ACTION(A_ERROR, do_log) else IF_FSA_ACTION(A_WARN, do_log) else IF_FSA_ACTION(A_LOG, do_log) /* this is always run, some inputs/states may make various * actions irrelevant/invalid */ fsa_actions = clear_flags(fsa_actions, cause, fsa_state, cur_input); /* update state variables */ next_state = crmd_fsa_state[cur_input][fsa_state]; last_state = fsa_state; fsa_state = next_state; /* start doing things... */ /* * Hook for change of state. * Allows actions to be added or removed when entering a state */ if(last_state != fsa_state){ fsa_actions = do_state_transition( fsa_actions, cause, last_state, fsa_state, cur_input, fsa_data); } /* regular action processing in order of action priority * * Make sure all actions that connect to required systems * are performed first */ /* get out of here NOW! before anything worse happens */ IF_FSA_ACTION(A_EXIT_1, do_exit) else IF_FSA_ACTION(A_ERROR, do_log) else IF_FSA_ACTION(A_WARN, do_log) else IF_FSA_ACTION(A_LOG, do_log) /* essential start tasks */ else IF_FSA_ACTION(A_HA_CONNECT, do_ha_control) else IF_FSA_ACTION(A_STARTUP, do_startup) else IF_FSA_ACTION(A_CIB_START, do_cib_control) else IF_FSA_ACTION(A_READCONFIG, do_read_config) /* sub-system start/connect */ else IF_FSA_ACTION(A_LRM_CONNECT, do_lrm_control) else IF_FSA_ACTION(A_CCM_CONNECT, do_ccm_control) else IF_FSA_ACTION(A_TE_START, do_te_control) else IF_FSA_ACTION(A_PE_START, do_pe_control) /* sub-system restart */ else IF_FSA_ACTION(O_CIB_RESTART, do_cib_control) else IF_FSA_ACTION(O_PE_RESTART, do_pe_control) else IF_FSA_ACTION(O_TE_RESTART, do_te_control) /* Timers */ /* else IF_FSA_ACTION(O_DC_TIMER_RESTART, do_timer_control) */ else IF_FSA_ACTION(A_DC_TIMER_STOP, do_timer_control) else IF_FSA_ACTION(A_INTEGRATE_TIMER_STOP, do_timer_control) else IF_FSA_ACTION(A_INTEGRATE_TIMER_START, do_timer_control) else IF_FSA_ACTION(A_FINALIZE_TIMER_STOP, do_timer_control) else IF_FSA_ACTION(A_FINALIZE_TIMER_START, do_timer_control) /* * Highest priority actions */ else IF_FSA_ACTION(A_CIB_BUMPGEN, do_cib_invoke) else IF_FSA_ACTION(A_MSG_ROUTE, do_msg_route) else IF_FSA_ACTION(A_RECOVER, do_recover) else IF_FSA_ACTION(A_CL_JOIN_REQUEST, do_cl_join_request) else IF_FSA_ACTION(A_CL_JOIN_RESULT, do_cl_join_result) else IF_FSA_ACTION(A_SHUTDOWN_REQ, do_shutdown_req) else IF_FSA_ACTION(A_ELECTION_VOTE, do_election_vote) else IF_FSA_ACTION(A_ELECTION_COUNT, do_election_count_vote) /* * High priority actions * Update the cache first */ else IF_FSA_ACTION(A_CCM_UPDATE_CACHE, do_ccm_update_cache) else IF_FSA_ACTION(A_CCM_EVENT, do_ccm_event) else IF_FSA_ACTION(A_STARTED, do_started) else IF_FSA_ACTION(A_CL_JOIN_QUERY, do_cl_join_query) else IF_FSA_ACTION(A_DC_TIMER_START, do_timer_control) /* * Medium priority actions */ else IF_FSA_ACTION(A_DC_TAKEOVER, do_dc_takeover) else IF_FSA_ACTION(A_DC_RELEASE, do_dc_release) else IF_FSA_ACTION(A_ELECTION_START, do_election_vote) else IF_FSA_ACTION(A_DC_JOIN_OFFER_ALL, do_dc_join_offer_all) else IF_FSA_ACTION(A_DC_JOIN_OFFER_ONE, do_dc_join_offer_all) else IF_FSA_ACTION(A_DC_JOIN_PROCESS_REQ,do_dc_join_req) else IF_FSA_ACTION(A_DC_JOIN_PROCESS_ACK,do_dc_join_ack) /* * Low(er) priority actions * Make sure the CIB is always updated before invoking the * PE, and the PE before the TE */ else IF_FSA_ACTION(A_CIB_INVOKE_LOCAL, do_cib_invoke) else IF_FSA_ACTION(A_CIB_INVOKE, do_cib_invoke) else IF_FSA_ACTION(A_DC_JOIN_FINALIZE, do_dc_join_finalize) else IF_FSA_ACTION(A_LRM_INVOKE, do_lrm_invoke) else IF_FSA_ACTION(A_LRM_EVENT, do_lrm_event) else IF_FSA_ACTION(A_TE_HALT, do_te_invoke) else IF_FSA_ACTION(A_TE_CANCEL, do_te_invoke) else IF_FSA_ACTION(A_PE_INVOKE, do_pe_invoke) else IF_FSA_ACTION(A_TE_INVOKE, do_te_invoke) else IF_FSA_ACTION(A_CL_JOIN_ANNOUNCE, do_cl_join_announce) /* sub-system stop */ else IF_FSA_ACTION(A_DC_RELEASED, do_dc_release) else IF_FSA_ACTION(A_PE_STOP, do_pe_control) else IF_FSA_ACTION(A_TE_STOP, do_te_control) else IF_FSA_ACTION(A_SHUTDOWN, do_shutdown) else IF_FSA_ACTION(A_STOP, do_stop) else IF_FSA_ACTION(A_CCM_DISCONNECT, do_ccm_control) else IF_FSA_ACTION(A_LRM_DISCONNECT, do_lrm_control) else IF_FSA_ACTION(A_HA_DISCONNECT, do_ha_control) else IF_FSA_ACTION(A_CIB_STOP, do_cib_control) /* exit gracefully */ else IF_FSA_ACTION(A_EXIT_0, do_exit) /* else IF_FSA_ACTION(A_, do_) */ /* Error checking and reporting */ else if(cur_input != I_NULL && fsa_actions == A_NOTHING) { crm_debug( "No action specified for input,state (%s,%s)", fsa_input2string(cur_input), fsa_state2string(fsa_state)); next_input = I_NULL; } else if(cur_input == I_NULL && fsa_actions == A_NOTHING) { #ifdef FSA_TRACE crm_info("Nothing left to do..."); fsa_dump_actions(fsa_actions, "still here"); #endif break; } else { crm_err("Action %s (0x%llx) not supported ", fsa_action2string(fsa_actions), fsa_actions); next_input = I_ERROR; } } now = time(NULL); do_dot_log(DOT_PREFIX"\t// ### Exiting the FSA (%s%s): %s", fsa_state2string(fsa_state), do_fsa_stall?": paused":"", asctime(localtime(&now))); + crm_debug("Exiting the FSA: is_message=%s, queue=%d, fsa_data=%p," + " fsa_actions=0x%llx, next_input=%s, stalled=%s", + is_message()?"true":"false", g_list_length(fsa_message_queue), + fsa_data, fsa_actions, fsa_input2string(next_input), + do_fsa_stall?"true":"false"); + /* cleanup inputs? */ delete_fsa_input(fsa_data); if(register_copy != fsa_input_register) { fsa_dump_inputs(LOG_DEBUG, fsa_input_register); } fsa_dump_queue(LOG_VERBOSE); return fsa_state; } long long do_state_transition(long long actions, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_state next_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { gboolean clear_recovery_bit = TRUE; long long tmp = actions; const char *state_from = fsa_state2string(cur_state); const char *state_to = fsa_state2string(next_state); const char *input = fsa_input2string(current_input); time_t now = time(NULL); if(cur_state == next_state) { crm_err("%s called in state %s with no transtion", __FUNCTION__, state_from); return A_NOTHING; } do_dot_log(DOT_PREFIX"\t%s -> %s [ label=%s cause=%s origin=%s ] // %s", state_from, state_to, input, fsa_cause2string(cause), msg_data->origin, asctime(localtime(&now))); crm_info("State transition %s -> %s [ input=%s cause=%s origin=%s ]", state_from, state_to, input, fsa_cause2string(cause), msg_data->origin); /* the last two clauses might cause trouble later */ if(election_timeout != NULL && next_state != S_ELECTION && next_state != S_RELEASE_DC && next_state != S_PENDING) { crm_timer_stop(election_timeout); /* } else { */ /* crm_timer_start(election_timeout); */ } #if 0 if(is_set(fsa_input_register, R_SHUTDOWN)){ set_bit_inplace(tmp, A_DC_TIMER_STOP); } #endif if(next_state == S_INTEGRATION) { set_bit_inplace(tmp, A_INTEGRATE_TIMER_START); } else { set_bit_inplace(tmp, A_INTEGRATE_TIMER_STOP); } if(next_state == S_FINALIZE_JOIN) { set_bit_inplace(tmp, A_FINALIZE_TIMER_START); } else { set_bit_inplace(tmp, A_FINALIZE_TIMER_STOP); } if(next_state != S_PENDING) { set_bit_inplace(tmp, A_DC_TIMER_STOP); } if(next_state != S_ELECTION) { highest_born_on = 0; } switch(next_state) { case S_PENDING: case S_ELECTION: crm_info("Resetting our DC to NULL on transition to %s", fsa_state2string(next_state)); crm_free(fsa_our_dc); fsa_our_dc = NULL; break; case S_NOT_DC: if(is_set(fsa_input_register, R_SHUTDOWN)){ crm_info("(Re)Issuing shutdown request now" " that we have a new DC"); set_bit_inplace(tmp, A_SHUTDOWN_REQ); } CRM_DEV_ASSERT(fsa_our_dc != NULL); if(fsa_our_dc == NULL) { crm_err("Reached S_NOT_DC without a DC" " being recorded"); } break; case S_RECOVERY: clear_recovery_bit = FALSE; break; case S_FINALIZE_JOIN: if(cause == C_TIMER_POPPED) { crm_warn("Progressed to state %s after %s", fsa_state2string(next_state), fsa_cause2string(cause)); } if(g_hash_table_size(welcomed_nodes) > 0) { char *msg = crm_strdup( " Welcome reply not received from"); crm_warn("%u cluster nodes failed to respond" "to the join offer.", g_hash_table_size(welcomed_nodes)); g_hash_table_foreach( welcomed_nodes, ghash_print_node, msg); } else { crm_info("All %d cluster nodes " "responded to the join offer.", g_hash_table_size(integrated_nodes)); } break; case S_POLICY_ENGINE: if(cause == C_TIMER_POPPED) { crm_warn("Progressed to state %s after %s", fsa_state2string(next_state), fsa_cause2string(cause)); } if(g_hash_table_size(finalized_nodes) > 0) { char *msg = crm_strdup( " Confirm not received from"); crm_err("%u cluster nodes failed to confirm" " their join.", g_hash_table_size(finalized_nodes)); g_hash_table_foreach( finalized_nodes, ghash_print_node, msg); } else if(g_hash_table_size(confirmed_nodes) == fsa_membership_copy->members_size) { crm_info("All %u cluster nodes are" " eligable to run resources.", fsa_membership_copy->members_size); } else { crm_warn("Only %u of %u cluster " "nodes are eligable to run resources", g_hash_table_size(confirmed_nodes), fsa_membership_copy->members_size); } /* initialize_join(FALSE); */ break; case S_STOPPING: case S_TERMINATE: /* possibly redundant */ crm_timer_stop(shutdown_timer); set_bit_inplace(fsa_input_register, R_SHUTDOWN); break; case S_IDLE: dump_rsc_info(); break; default: break; } if(clear_recovery_bit && next_state != S_PENDING) { tmp = clear_bit(tmp, A_RECOVER); } else if(clear_recovery_bit == FALSE) { tmp = set_bit(tmp, A_RECOVER); } if(tmp != actions) { fsa_dump_actions(actions ^ tmp, "New actions"); actions = tmp; } return actions; } long long clear_flags(long long actions, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input cur_input) { long long saved_actions = actions; long long startup_actions = A_STARTUP|A_CIB_START|A_LRM_CONNECT|A_CCM_CONNECT|A_HA_CONNECT|A_READCONFIG|A_STARTED|A_CL_JOIN_QUERY; if(cur_state == S_STOPPING || is_set(fsa_input_register, R_SHUTDOWN)) { clear_bit_inplace(actions, startup_actions); } fsa_dump_actions(actions ^ saved_actions, "Cleared Actions"); return actions; } void dump_rsc_info(void) { } void ghash_print_node(gpointer key, gpointer value, gpointer user_data) { const char *text = user_data; const char *uname = key; crm_info("%s: %s", text, uname); } diff --git a/crm/crmd/messages.c b/crm/crmd/messages.c index a58115ab60..d29dac87b3 100644 --- a/crm/crmd/messages.c +++ b/crm/crmd/messages.c @@ -1,1205 +1,1208 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include GListPtr fsa_message_queue = NULL; extern void crm_shutdown(int nsig); enum crmd_fsa_input handle_request(ha_msg_input_t *stored_msg); enum crmd_fsa_input handle_response(ha_msg_input_t *stored_msg); enum crmd_fsa_input handle_shutdown_request(HA_Message *stored_msg); ha_msg_input_t *copy_ha_msg_input(ha_msg_input_t *orig); gboolean ipc_queue_helper(gpointer key, gpointer value, gpointer user_data); #ifdef MSG_LOG # define ROUTER_RESULT(x) crm_devel("Router result: %s", x); \ crm_log_message_adv(LOG_MSG, "router.log", relay_message); #else # define ROUTER_RESULT(x) crm_devel("Router result: %s", x) #endif /* debug only, can wrap all it likes */ int last_data_id = 0; void register_fsa_error_adv( enum crmd_fsa_cause cause, enum crmd_fsa_input input, fsa_data_t *cur_data, void *new_data, const char *raised_from) { /* save the current actions */ register_fsa_input_adv(cur_data?cur_data->fsa_cause:C_FSA_INTERNAL, I_NULL, cur_data?cur_data->data:NULL, fsa_actions, TRUE, __FUNCTION__); /* reset the action list */ fsa_actions = A_NOTHING; /* register the error */ register_fsa_input_adv( cause, input, new_data, A_NOTHING, TRUE, raised_from); } static gboolean last_was_vote = FALSE; void register_fsa_input_adv( enum crmd_fsa_cause cause, enum crmd_fsa_input input, void *data, long long with_actions, gboolean prepend, const char *raised_from) { unsigned old_len = g_list_length(fsa_message_queue); fsa_data_t *fsa_data = NULL; crm_debug("%s raised FSA input %s (cause=%s) %s data", raised_from,fsa_input2string(input), fsa_cause2string(cause), data?"with":"without"); if(input == I_WAIT_FOR_EVENT) { do_fsa_stall = TRUE; set_bit_inplace(fsa_actions, with_actions); with_actions = A_NOTHING; crm_debug("Stalling the FSA pending further input"); if(old_len > 0) { crm_err("Stalling the FSA with pending inputs"); } fsa_dump_queue(LOG_DEBUG); return; } if(old_len == 0) { last_was_vote = FALSE; } if(input == I_NULL && with_actions == A_NOTHING /* && data == NULL */){ /* no point doing anything */ + crm_err("Cannot add entry to queue: no input and no action"); return; } else if(data == NULL) { last_was_vote = FALSE; } else if(last_was_vote && cause == C_HA_MESSAGE && input == I_ROUTER) { const char *op = cl_get_string( ((ha_msg_input_t*)data)->msg, F_CRM_TASK); if(safe_str_eq(op, CRM_OP_VOTE)) { /* It is always safe to treat N successive votes as * a single one * * If all the discarded votes are more "loosing" than * the first then the result is accurate * (win or loose). * * If any of the discarded votes are less "loosing" * than the first then we will cast our vote and the * eventual winner will vote us down again (which * even in the case that N=2, is no worse than if we * had not disarded the vote). */ crm_verbose("Vote compression: %d", old_len); return; } } else if (cause == C_HA_MESSAGE && input == I_ROUTER) { const char *op = cl_get_string( ((ha_msg_input_t*)data)->msg, F_CRM_TASK); if(safe_str_eq(op, CRM_OP_VOTE)) { last_was_vote = TRUE; crm_devel("Added vote: %d", old_len); } } else { last_was_vote = FALSE; } crm_malloc0(fsa_data, sizeof(fsa_data_t)); fsa_data->id = ++last_data_id; fsa_data->fsa_input = input; fsa_data->fsa_cause = cause; fsa_data->origin = raised_from; fsa_data->data = NULL; fsa_data->data_type = fsa_dt_none; fsa_data->actions = with_actions; if(with_actions != A_NOTHING) { crm_devel("Adding actions %.16llx to input", with_actions); } if(data != NULL) { switch(cause) { case C_FSA_INTERNAL: case C_CRMD_STATUS_CALLBACK: case C_IPC_MESSAGE: case C_HA_MESSAGE: crm_devel("Copying %s data from %s as a HA msg", fsa_cause2string(cause), raised_from); fsa_data->data = copy_ha_msg_input(data); fsa_data->data_type = fsa_dt_ha_msg; break; case C_LRM_OP_CALLBACK: crm_devel("Copying %s data from %s as lrm_op_t", fsa_cause2string(cause), raised_from); fsa_data->data = copy_lrm_op((lrm_op_t*)data); fsa_data->data_type = fsa_dt_lrm; break; case C_CCM_CALLBACK: crm_devel("Copying %s data from %s as CCM data", fsa_cause2string(cause), raised_from); fsa_data->data = copy_ccm_data(data); fsa_data->data_type = fsa_dt_ccm; break; case C_SUBSYSTEM_CONNECT: case C_LRM_MONITOR_CALLBACK: case C_TIMER_POPPED: case C_SHUTDOWN: case C_HEARTBEAT_FAILED: case C_HA_DISCONNECT: case C_ILLEGAL: case C_UNKNOWN: case C_STARTUP: crm_err("Copying %s data (from %s)" " not yet implemented", fsa_cause2string(cause), raised_from); exit(1); break; } crm_trace("%s data copied", fsa_cause2string(fsa_data->fsa_cause)); } /* make sure to free it properly later */ if(prepend) { crm_trace("Prepending input"); fsa_message_queue = g_list_prepend(fsa_message_queue, fsa_data); } else { crm_trace("Appending input"); fsa_message_queue = g_list_append(fsa_message_queue, fsa_data); } - crm_verbose("Queue len: %d -> %d", old_len, + crm_debug("Queue len: %d -> %d", old_len, g_list_length(fsa_message_queue)); - fsa_dump_queue(LOG_DEV); + fsa_dump_queue(LOG_DEBUG); if(old_len == g_list_length(fsa_message_queue)){ crm_err("Couldnt add message to the queue"); } } void fsa_dump_queue(int log_level) { if(log_level < (int)crm_log_level) { return; } slist_iter( data, fsa_data_t, fsa_message_queue, lpc, do_crm_log(log_level, __FILE__, __FUNCTION__, "queue[%d(%d)]: input %s raised by %s()\t(cause=%s)", lpc, data->id, fsa_input2string(data->fsa_input), data->origin, fsa_cause2string(data->fsa_cause)); ); } ha_msg_input_t * copy_ha_msg_input(ha_msg_input_t *orig) { ha_msg_input_t *input_copy = NULL; crm_malloc0(input_copy, sizeof(ha_msg_input_t)); if(orig != NULL) { crm_trace("Copy msg"); input_copy->msg = ha_msg_copy(orig->msg); if(orig->xml != NULL) { crm_trace("Copy xml"); input_copy->xml = copy_xml_node_recursive(orig->xml); } } else { crm_devel("No message to copy"); } return input_copy; } void delete_fsa_input(fsa_data_t *fsa_data) { lrm_op_t *op = NULL; crm_data_t *foo = NULL; struct crmd_ccm_data_s *ccm_input = NULL; if(fsa_data == NULL) { return; } crm_trace("About to free %s data", fsa_cause2string(fsa_data->fsa_cause)); if(fsa_data->data != NULL) { switch(fsa_data->data_type) { case fsa_dt_ha_msg: delete_ha_msg_input(fsa_data->data); break; case fsa_dt_xml: foo = fsa_data->data; free_xml(foo); break; case fsa_dt_lrm: op = (lrm_op_t*)fsa_data->data; crm_free(op->user_data); crm_free(op->output); crm_free(op->rsc_id); crm_free(op->app_name); crm_free(op); break; case fsa_dt_ccm: ccm_input = (struct crmd_ccm_data_s *) fsa_data->data; crm_free(ccm_input->oc); crm_free(ccm_input); break; case fsa_dt_none: if(fsa_data->data != NULL) { crm_err("Dont know how to free %s data from %s", fsa_cause2string(fsa_data->fsa_cause), fsa_data->origin); exit(1); } break; } crm_trace("%s data freed", fsa_cause2string(fsa_data->fsa_cause)); } crm_free(fsa_data); } /* returns the next message */ fsa_data_t * get_message(void) { fsa_data_t* message = g_list_nth_data(fsa_message_queue, 0); fsa_message_queue = g_list_remove(fsa_message_queue, message); return message; } /* returns the current head of the FIFO queue */ gboolean is_message(void) { return (g_list_length(fsa_message_queue) > 0); } void * fsa_typed_data_adv( fsa_data_t *fsa_data, enum fsa_data_type a_type, const char *caller) { void *ret_val = NULL; if(fsa_data == NULL) { do_crm_log(LOG_ERR, caller, NULL, "No FSA data available"); } else if(fsa_data->data == NULL) { do_crm_log(LOG_ERR, caller, NULL, "No message data available"); } else if(fsa_data->data_type != a_type) { do_crm_log(LOG_CRIT, caller, NULL, "Message data was the wrong type! %d vs. requested=%d." " Origin: %s", fsa_data->data_type, a_type, fsa_data->origin); CRM_ASSERT(fsa_data->data_type == a_type); } else { ret_val = fsa_data->data; } return ret_val; } /* A_MSG_ROUTE */ enum crmd_fsa_input do_msg_route(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { enum crmd_fsa_input result = I_NULL; ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg); gboolean routed = FALSE; if(msg_data->fsa_cause != C_IPC_MESSAGE && msg_data->fsa_cause != C_HA_MESSAGE) { /* dont try and route these */ crm_warn("Can only process HA and IPC messages"); return I_NULL; } /* try passing the buck first */ crm_trace("Attempting to route message"); routed = relay_message(input->msg, cause==C_IPC_MESSAGE); if(routed == FALSE) { crm_trace("Message wasn't routed... try handling locally"); /* calculate defer */ result = handle_message(input); switch(result) { case I_NULL: break; case I_DC_HEARTBEAT: break; case I_CIB_OP: break; /* what else should go here? */ default: crm_trace("Defering local processing of message"); register_fsa_input_later( cause, result, msg_data->data); result = I_NULL; break; } if(result == I_NULL) { crm_trace("Message processed"); } else { register_fsa_input(cause, result, msg_data->data); } } else { crm_trace("Message routed..."); input->msg = NULL; } return I_NULL; } /* * This method frees msg */ gboolean send_request(HA_Message *msg, char **msg_reference) { gboolean was_sent = FALSE; /* crm_xml_devel(request, "Final request..."); */ if(msg_reference != NULL) { *msg_reference = crm_strdup( cl_get_string(msg, XML_ATTR_REFERENCE)); } was_sent = relay_message(msg, TRUE); if(was_sent == FALSE) { ha_msg_input_t *fsa_input = new_ha_msg_input(msg); register_fsa_input(C_IPC_MESSAGE, I_ROUTER, fsa_input); delete_ha_msg_input(fsa_input); crm_msg_del(msg); } return was_sent; } /* unless more processing is required, relay_message is freed */ gboolean relay_message(HA_Message *relay_message, gboolean originated_locally) { int is_for_dc = 0; int is_for_dcib = 0; int is_for_crm = 0; int is_for_cib = 0; int is_local = 0; gboolean processing_complete = FALSE; const char *host_to = cl_get_string(relay_message, F_CRM_HOST_TO); const char *sys_to = cl_get_string(relay_message, F_CRM_SYS_TO); const char *sys_from= cl_get_string(relay_message, F_CRM_SYS_FROM); const char *type = cl_get_string(relay_message, F_TYPE); const char *msg_error = NULL; crm_devel("Routing message %s", cl_get_string(relay_message, XML_ATTR_REFERENCE)); if(relay_message == NULL) { msg_error = "Cannot route empty message"; } else if(safe_str_eq(CRM_OP_HELLO, cl_get_string(relay_message, F_CRM_TASK))){ /* quietly ignore */ processing_complete = TRUE; } else if(safe_str_neq(type, T_CRM)) { msg_error = "Bad message type"; } else if(sys_to == NULL) { msg_error = "Bad message destination: no subsystem"; } if(msg_error != NULL) { processing_complete = TRUE; crm_err("%s", msg_error); crm_log_message(LOG_WARNING, relay_message); } if(processing_complete) { crm_msg_del(relay_message); return TRUE; } processing_complete = TRUE; is_for_dc = (strcmp(CRM_SYSTEM_DC, sys_to) == 0); is_for_dcib = (strcmp(CRM_SYSTEM_DCIB, sys_to) == 0); is_for_cib = (strcmp(CRM_SYSTEM_CIB, sys_to) == 0); is_for_crm = (strcmp(CRM_SYSTEM_CRMD, sys_to) == 0); is_local = 0; if(host_to == NULL || strlen(host_to) == 0) { if(is_for_dc) { is_local = 0; } else if(is_for_crm && originated_locally) { is_local = 0; } else { is_local = 1; } } else if(strcmp(fsa_our_uname, host_to) == 0) { is_local=1; } if(is_for_dc || is_for_dcib) { if(AM_I_DC) { ROUTER_RESULT("Message result: DC/CRMd process"); processing_complete = FALSE; /* more to be done by caller */ } else if(originated_locally && safe_str_neq(sys_from, CRM_SYSTEM_PENGINE) && safe_str_neq(sys_from, CRM_SYSTEM_TENGINE)) { /* Neither the TE or PE should be sending messages * to DC's on other nodes * * By definition, if we are no longer the DC, then * the PE or TE's data should be discarded */ ROUTER_RESULT("Message result: External relay to DC"); send_msg_via_ha(fsa_cluster_conn, relay_message); } else { /* discard */ ROUTER_RESULT("Message result: Discard, not DC"); crm_msg_del(relay_message); } } else if(is_local && (is_for_crm || is_for_cib)) { ROUTER_RESULT("Message result: CRMd process"); processing_complete = FALSE; /* more to be done by caller */ } else if(is_local) { ROUTER_RESULT("Message result: Local relay"); send_msg_via_ipc(relay_message, sys_to); } else { ROUTER_RESULT("Message result: External relay"); send_msg_via_ha(fsa_cluster_conn, relay_message); } return processing_complete; } gboolean crmd_authorize_message(ha_msg_input_t *client_msg, crmd_client_t *curr_client) { /* check the best case first */ const char *sys_from = cl_get_string(client_msg->msg, F_CRM_SYS_FROM); char *uuid = NULL; char *client_name = NULL; char *major_version = NULL; char *minor_version = NULL; const char *filtered_from; gpointer table_key = NULL; gboolean auth_result = FALSE; struct crm_subsystem_s *the_subsystem = NULL; gboolean can_reply = FALSE; /* no-one has registered with this id */ const char *op = cl_get_string(client_msg->msg, F_CRM_TASK); if (safe_str_neq(CRM_OP_HELLO, op)) { if(sys_from == NULL) { crm_warn("Message [%s] was had no value for %s... discarding", cl_get_string(client_msg->msg, XML_ATTR_REFERENCE), F_CRM_SYS_FROM); return FALSE; } filtered_from = sys_from; /* The CIB can have two names on the DC */ if(strcmp(sys_from, CRM_SYSTEM_DCIB) == 0) filtered_from = CRM_SYSTEM_CIB; if (g_hash_table_lookup (ipc_clients, filtered_from) != NULL) { can_reply = TRUE; /* reply can be routed */ } crm_verbose("Message reply can%s be routed from %s.", can_reply?"":" not", sys_from); if(can_reply == FALSE) { crm_warn("Message [%s] not authorized", cl_get_string(client_msg->msg, XML_ATTR_REFERENCE)); } #if 0 if(ha_msg_value(msg, XML_ATTR_REFERENCE) == NULL) { ha_msg_add(new_input->msg, XML_ATTR_REFERENCE, seq); } #endif register_fsa_input(C_IPC_MESSAGE, I_ROUTER, client_msg); return can_reply; } crm_devel("received client join msg"); crm_log_message(LOG_MSG, client_msg->msg); auth_result = process_hello_message( client_msg->xml, &uuid, &client_name, &major_version, &minor_version); if (auth_result == TRUE) { if(client_name == NULL || uuid == NULL) { crm_err("Bad client details (client_name=%s, uuid=%s)", crm_str(client_name), crm_str(uuid)); auth_result = FALSE; } } if (auth_result == TRUE) { /* check version */ int mav = atoi(major_version); int miv = atoi(minor_version); crm_devel("Checking client version number"); if (mav < 0 || miv < 0) { crm_err("Client version (%d:%d) is not acceptable", mav, miv); auth_result = FALSE; } crm_free(major_version); crm_free(minor_version); } if (auth_result == TRUE) { /* if we already have one of those clients * only applies to te, pe etc. not admin clients */ if (strcmp(CRM_SYSTEM_PENGINE, client_name) == 0) { the_subsystem = pe_subsystem; } else if (strcmp(CRM_SYSTEM_TENGINE, client_name) == 0) { the_subsystem = te_subsystem; } if (the_subsystem != NULL) { /* do we already have one? */ crm_devel("Checking if %s is required/already connected", client_name); if(is_set(fsa_input_register, the_subsystem->flag_connected)) { auth_result = FALSE; crm_warn("Bit\t%.16llx set in %.16llx", the_subsystem->flag_connected, fsa_input_register); crm_err("Client %s is already connected", client_name); } else if(FALSE == is_set(fsa_input_register, the_subsystem->flag_required)) { auth_result = FALSE; crm_warn("Bit\t%.16llx not set in %.16llx", the_subsystem->flag_connected, fsa_input_register); crm_warn("Client %s joined but we dont need it", client_name); } else { the_subsystem->ipc = curr_client->client_channel; } } else { table_key = (gpointer) generate_hash_key(client_name, uuid); } } if (auth_result == TRUE) { if(table_key == NULL) { table_key = (gpointer)crm_strdup(client_name); } crm_verbose("Accepted client %s", crm_str(table_key)); curr_client->table_key = table_key; curr_client->sub_sys = crm_strdup(client_name); curr_client->uuid = crm_strdup(uuid); g_hash_table_insert (ipc_clients, table_key, curr_client->client_channel); send_hello_message(curr_client->client_channel, "n/a", CRM_SYSTEM_CRMD, "0", "1"); crm_devel("Updated client list with %s", crm_str(table_key)); if(the_subsystem != NULL) { set_bit_inplace( fsa_input_register, the_subsystem->flag_connected); } G_main_set_trigger(fsa_source); } else { crm_warn("Rejected client logon request"); curr_client->client_channel->ch_status = IPC_DISC_PENDING; } if(uuid != NULL) crm_free(uuid); if(minor_version != NULL) crm_free(minor_version); if(major_version != NULL) crm_free(major_version); if(client_name != NULL) crm_free(client_name); /* hello messages should never be processed further */ return FALSE; } enum crmd_fsa_input handle_message(ha_msg_input_t *stored_msg) { enum crmd_fsa_input next_input = I_NULL; const char *type = NULL; if(stored_msg == NULL || stored_msg->msg == NULL) { crm_err("No message to handle"); return I_NULL; } type = cl_get_string(stored_msg->msg, F_CRM_MSG_TYPE); if(safe_str_eq(type, XML_ATTR_REQUEST)) { next_input = handle_request(stored_msg); } else if(safe_str_eq(type, XML_ATTR_RESPONSE)) { next_input = handle_response(stored_msg); } else { crm_err("Unknown message type: %s", type); } /* crm_verbose("%s: Next input is %s", __FUNCTION__, */ /* fsa_input2string(next_input)); */ return next_input; } enum crmd_fsa_input handle_request(ha_msg_input_t *stored_msg) { HA_Message *msg = NULL; enum crmd_fsa_input next_input = I_NULL; const char *op = cl_get_string(stored_msg->msg, F_CRM_TASK); const char *sys_to = cl_get_string(stored_msg->msg, F_CRM_SYS_TO); const char *host_from = cl_get_string(stored_msg->msg, F_CRM_HOST_FROM); crm_verbose("Received %s in state %s", op, fsa_state2string(fsa_state)); if(op == NULL) { crm_err("Bad message"); crm_log_message(LOG_ERR, stored_msg->msg); /*========== common actions ==========*/ } else if(strcmp(op, CRM_OP_NOOP) == 0) { crm_debug("no-op from %s", crm_str(host_from)); } else if(strcmp(op, CRM_OP_VOTE) == 0) { /* count the vote and decide what to do after that */ register_fsa_input_adv(C_HA_MESSAGE, I_NULL, stored_msg, A_ELECTION_COUNT, FALSE, __FUNCTION__); /* Sometimes we _must_ go into S_ELECTION */ if(fsa_state == S_HALT) { crm_debug("Forcing an election from S_HALT"); next_input = I_ELECTION; #if 0 } else if(AM_I_DC) { /* This is the old way of doing things but what is gained? */ next_input = I_ELECTION; #endif } } else if(strcmp(op, CRM_OP_LOCAL_SHUTDOWN) == 0) { crm_shutdown(SIGTERM); /*next_input = I_SHUTDOWN; */ next_input = I_NULL; } else if(strcmp(op, CRM_OP_PING) == 0) { /* eventually do some stuff to figure out * if we /are/ ok */ crm_data_t *ping = createPingAnswerFragment(sys_to, "ok"); set_xml_property_copy(ping, "crmd_state", fsa_state2string(fsa_state)); crm_info("Current state: %s", fsa_state2string(fsa_state)); msg = create_reply(stored_msg->msg, ping); if(relay_message(msg, TRUE) == FALSE) { crm_msg_del(msg); } /* probably better to do this via signals on the * local node */ } else if(strcmp(op, CRM_OP_DEBUG_UP) == 0) { int level = get_crm_log_level(); set_crm_log_level(level+1); crm_info("Debug set to %d (was %d)", get_crm_log_level(), level); } else if(strcmp(op, CRM_OP_DEBUG_DOWN) == 0) { int level = get_crm_log_level(); set_crm_log_level(level-1); crm_info("Debug set to %d (was %d)", get_crm_log_level(), level); } else if(strcmp(op, CRM_OP_JOIN_OFFER) == 0) { next_input = I_JOIN_OFFER; } else if(strcmp(op, CRM_OP_JOIN_ACKNAK) == 0) { next_input = I_JOIN_RESULT; /* this functionality should only be enabled * if this is a development build */ } else if(CRM_DEV_BUILD && strcmp(op, CRM_OP_DIE) == 0/*constant condition*/) { crm_warn("Test-only code: Killing the CRM without mercy"); crm_warn("Inhibiting respawns"); exit(100); /*========== (NOT_DC)-Only Actions ==========*/ } else if(AM_I_DC == FALSE){ gboolean dc_match = safe_str_eq(host_from, fsa_our_dc); if(dc_match || fsa_our_dc == NULL) { if(strcmp(op, CRM_OP_HBEAT) == 0) { crm_devel("Received DC heartbeat from %s", host_from); next_input = I_DC_HEARTBEAT; } else if(fsa_our_dc == NULL) { crm_warn("CRMd discarding request: %s" " (DC: %s, from: %s)", op, crm_str(fsa_our_dc), host_from); crm_warn("Ignored Request"); crm_log_message(LOG_WARNING, stored_msg->msg); } else if(strcmp(op, CRM_OP_SHUTDOWN) == 0) { next_input = I_STOP; } else { crm_err("CRMd didnt expect request: %s", op); crm_log_message(LOG_ERR, stored_msg->msg); } } else { crm_warn("Discarding %s op from %s", op, host_from); } /*========== DC-Only Actions ==========*/ } else if(AM_I_DC){ if(safe_str_eq(op, CRM_OP_TEABORT)) { if(fsa_state == S_POLICY_ENGINE || fsa_state == S_TRANSITION_ENGINE || fsa_state == S_IDLE) { next_input = I_PE_CALC; } else { crm_debug("Filtering %s op in state %s", op, fsa_state2string(fsa_state)); } } else if(safe_str_eq(op, CRM_OP_TETIMEOUT)) { if(fsa_state == S_TRANSITION_ENGINE || fsa_state == S_POLICY_ENGINE) { next_input = I_PE_CALC; } else if(fsa_state == S_IDLE) { crm_err("Transition timed out in S_IDLE"); next_input = I_PE_CALC; } else { crm_err("Filtering %s op in state %s", op, fsa_state2string(fsa_state)); } } else if(strcmp(op, CRM_OP_TECOMPLETE) == 0) { if(fsa_state == S_TRANSITION_ENGINE) { next_input = I_TE_SUCCESS; } else { crm_debug("Filtering %s op in state %s", op, fsa_state2string(fsa_state)); } } else if(strcmp(op, CRM_OP_JOIN_ANNOUNCE) == 0) { next_input = I_NODE_JOIN; } else if(strcmp(op, CRM_OP_JOIN_REQUEST) == 0) { next_input = I_JOIN_REQUEST; } else if(strcmp(op, CRM_OP_JOIN_CONFIRM) == 0) { next_input = I_JOIN_RESULT; } else if(strcmp(op, CRM_OP_SHUTDOWN) == 0) { gboolean dc_match = safe_str_eq(host_from, fsa_our_dc); if(dc_match) { crm_err("We didnt ask to be shut down yet our" " TE is telling us too." " Better get out now!"); next_input = I_TERMINATE; } else if(is_set(fsa_input_register, R_SHUTDOWN)) { crm_err("We asked to be shut down, " " are still the DC, yet another node" " (DC) is askin us to shutdown!"); next_input = I_STOP; } else if(fsa_state != S_STOPPING) { crm_err("Another node is asking us to shutdown" " but we think we're ok."); next_input = I_ELECTION; } } else if(strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0) { /* a slave wants to shut down */ /* create cib fragment and add to message */ next_input = handle_shutdown_request(stored_msg->msg); } else { crm_err("Unexpected request (%s) sent to the DC", op); crm_log_message(LOG_ERR, stored_msg->msg); } } return next_input; } enum crmd_fsa_input handle_response(ha_msg_input_t *stored_msg) { enum crmd_fsa_input next_input = I_NULL; const char *op = cl_get_string(stored_msg->msg, F_CRM_TASK); const char *sys_from = cl_get_string(stored_msg->msg, F_CRM_SYS_FROM); const char *msg_ref = cl_get_string(stored_msg->msg, XML_ATTR_REFERENCE); crm_verbose("Received %s %s in state %s", op, XML_ATTR_RESPONSE, fsa_state2string(fsa_state)); if(op == NULL) { crm_err("Bad message"); crm_log_message(LOG_ERR, stored_msg->msg); } else if(AM_I_DC && strcmp(op, CRM_OP_PECALC) == 0) { if(safe_str_eq(msg_ref, fsa_pe_ref)) { next_input = I_PE_SUCCESS; } else { crm_verbose("Skipping superceeded reply from %s", sys_from); } } else if(strcmp(op, CRM_OP_VOTE) == 0 || strcmp(op, CRM_OP_HBEAT) == 0 || strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0 || strcmp(op, CRM_OP_SHUTDOWN) == 0) { next_input = I_NULL; } else if(strcmp(op, CRM_OP_CIB_CREATE) == 0 || strcmp(op, CRM_OP_CIB_UPDATE) == 0 || strcmp(op, CRM_OP_CIB_DELETE) == 0 || strcmp(op, CRM_OP_CIB_REPLACE) == 0 || strcmp(op, CRM_OP_CIB_ERASE) == 0) { /* perhaps we should do somethign with these replies, * especially check that the actions passed */ } else { crm_err("Unexpected response (op=%s) sent to the %s", op, AM_I_DC?"DC":"CRMd"); next_input = I_NULL; } return next_input; } enum crmd_fsa_input handle_shutdown_request(HA_Message *stored_msg) { /* handle here to avoid potential version issues * where the shutdown message/proceedure may have * been changed in later versions. * * This way the DC is always in control of the shutdown */ crm_data_t *frag = NULL; time_t now = time(NULL); char *now_s = crm_itoa((int)now); crm_data_t *node_state = create_xml_node(NULL, XML_CIB_TAG_STATE); const char *host_from= cl_get_string(stored_msg, F_CRM_HOST_FROM); crm_info("Creating shutdown request for %s",host_from); crm_log_message(LOG_MSG, stored_msg); set_uuid(fsa_cluster_conn, node_state, XML_ATTR_UUID, host_from); set_xml_property_copy(node_state, XML_ATTR_UNAME, host_from); set_xml_property_copy(node_state, XML_CIB_ATTR_SHUTDOWN, now_s); set_xml_property_copy( node_state, XML_CIB_ATTR_EXPSTATE, CRMD_STATE_INACTIVE); frag = create_cib_fragment(node_state, NULL); /* cleanup intermediate steps */ free_xml(node_state); crm_free(now_s); fsa_cib_conn->cmds->modify( fsa_cib_conn, XML_CIB_TAG_STATUS, frag, NULL, cib_quorum_override); free_xml(frag); /* will be picked up by the TE as long as its running */ if(need_transition(fsa_state) && is_set(fsa_input_register, R_TE_CONNECTED) == FALSE) { - register_fsa_action(A_TE_CANCEL, TRUE); + register_fsa_action(A_TE_CANCEL); } return I_NULL; } /* frees msg upon completion */ gboolean send_msg_via_ha(ll_cluster_t *hb_fd, HA_Message *msg) { int log_level = LOG_DEV; gboolean broadcast = FALSE; gboolean all_is_good = TRUE; const char *op = cl_get_string(msg, F_CRM_TASK); const char *sys_to = cl_get_string(msg, F_CRM_SYS_TO); const char *host_to = cl_get_string(msg, F_CRM_HOST_TO); if (msg == NULL) { crm_err("Attempt to send NULL Message via HA failed."); all_is_good = FALSE; } else { crm_trace("Relaying message to (%s) via HA", host_to); } if (all_is_good) { if (sys_to == NULL || strlen(sys_to) == 0) { crm_err("You did not specify a destination sub-system" " for this message."); all_is_good = FALSE; } } /* There are a number of messages may not need to be ordered. * At a later point perhaps we should detect them and send them * as unordered messages. */ if (all_is_good) { if (host_to == NULL || strlen(host_to) == 0 || safe_str_eq(sys_to, CRM_SYSTEM_DC)) { broadcast = TRUE; all_is_good = send_ha_message(hb_fd, msg, NULL); } else { all_is_good = send_ha_message(hb_fd, msg, host_to); } } if(all_is_good == FALSE) { log_level = LOG_ERR; } if(log_level == LOG_ERR || (safe_str_neq(op, CRM_OP_HBEAT))) { do_crm_log(log_level, __FILE__, __FUNCTION__, "Sending %sHA message (ref=%s) to %s@%s %s.", broadcast?"broadcast ":"directed ", cl_get_string(msg, XML_ATTR_REFERENCE), crm_str(sys_to), host_to==NULL?"":host_to, all_is_good?"succeeded":"failed"); } crm_msg_del(msg); return all_is_good; } /* msg is deleted by the time this returns */ gboolean send_msg_via_ipc(HA_Message *msg, const char *sys) { gboolean send_ok = TRUE; IPC_Channel *client_channel; enum crmd_fsa_input next_input; crm_trace("relaying msg to sub_sys=%s via IPC", sys); client_channel = (IPC_Channel*)g_hash_table_lookup(ipc_clients, sys); if(cl_get_string(msg, F_CRM_HOST_FROM) == NULL) { ha_msg_add(msg, F_CRM_HOST_FROM, fsa_our_uname); } if (client_channel != NULL) { crm_devel("Sending message via channel %s.", sys); send_ok = send_ipc_message(client_channel, msg); msg = NULL; /* so the crm_msg_del() below doesnt fail */ } else if(sys != NULL && strcmp(sys, CRM_SYSTEM_CIB) == 0) { crm_err("Sub-system (%s) has been incorporated into the CRMd.", sys); crm_err("Change the way we handle this CIB message"); crm_log_message(LOG_ERR, msg); send_ok = FALSE; } else if(sys != NULL && strcmp(sys, CRM_SYSTEM_LRMD) == 0) { fsa_data_t *fsa_data = NULL; ha_msg_input_t *msg_copy = new_ha_msg_input(msg); crm_malloc0(fsa_data, sizeof(fsa_data_t)); fsa_data->fsa_input = I_MESSAGE; fsa_data->fsa_cause = C_IPC_MESSAGE; fsa_data->data = msg_copy; fsa_data->origin = __FUNCTION__; fsa_data->data_type = fsa_dt_ha_msg; #ifdef FSA_TRACE crm_verbose("Invoking action %s (%.16llx)", fsa_action2string(A_LRM_INVOKE), A_LRM_INVOKE); #endif next_input = do_lrm_invoke(A_LRM_INVOKE, C_IPC_MESSAGE, fsa_state, I_MESSAGE, fsa_data); delete_ha_msg_input(msg_copy); crm_free(fsa_data); /* todo: feed this back in for anything != I_NULL */ #ifdef FSA_TRACE crm_verbose("Result of action %s was %s", fsa_action2string(A_LRM_INVOKE), fsa_input2string(next_input)); #endif } else { crm_err("Unknown Sub-system (%s)... discarding message.", sys); send_ok = FALSE; } crm_msg_del(msg); return send_ok; } void msg_queue_helper(void) { IPC_Channel *ipc = NULL; if(fsa_cluster_conn != NULL) { ipc = fsa_cluster_conn->llc_ops->ipcchan( fsa_cluster_conn); } + crm_debug("Checking cluster connection"); if(ipc != NULL) { - ipc->ops->is_message_pending(ipc); + ipc->ops->resume_io(ipc); } + crm_debug("Checking cluster connection complete"); /* g_hash_table_foreach_remove(ipc_clients, ipc_queue_helper, NULL); */ } gboolean ipc_queue_helper(gpointer key, gpointer value, gpointer user_data) { crmd_client_t *ipc_client = value; if(ipc_client->client_channel != NULL) { ipc_client->client_channel->ops->is_message_pending(ipc_client->client_channel); } return FALSE; } diff --git a/crm/crmd/pengine.c b/crm/crmd/pengine.c index 0349db86df..aa07a53464 100644 --- a/crm/crmd/pengine.c +++ b/crm/crmd/pengine.c @@ -1,213 +1,214 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include /* for access */ #include #include #include /* for calls to open */ #include /* for calls to open */ #include /* for calls to open */ #include /* for getpwuid */ #include /* for initgroups */ #include /* for getrlimit */ #include /* for getrlimit */ #include #include #include #include #include #include #include #include #define CLIENT_EXIT_WAIT 30 struct crm_subsystem_s *pe_subsystem = NULL; void do_pe_invoke_callback(const HA_Message *msg, int call_id, int rc, crm_data_t *output, void *user_data); /* A_PE_START, A_PE_STOP, A_TE_RESTART */ enum crmd_fsa_input do_pe_control(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { enum crmd_fsa_input result = I_NULL; struct crm_subsystem_s *this_subsys = pe_subsystem; long long stop_actions = A_PE_STOP; long long start_actions = A_PE_START; if(action & stop_actions) { if(stop_subsystem(this_subsys) == FALSE) { register_fsa_error(C_FSA_INTERNAL, I_FAIL, NULL); } } if(action & start_actions) { if(cur_state != S_STOPPING) { if(start_subsystem(this_subsys) == FALSE) { register_fsa_error(C_FSA_INTERNAL, I_FAIL, NULL); cleanup_subsystem(this_subsys); } } else { crm_info("Ignoring request to start %s while shutting down", this_subsys->name); } } return result; } char *fsa_pe_ref = NULL; /* A_PE_INVOKE */ enum crmd_fsa_input do_pe_invoke(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { int call_id = 0; /* * FIXME: The CIB might have a different version of membership than the CRM * We need to allow for that possibility. * We could set a flag saying we're waiting for the membership versions (and quorum!) * to synchronize before going on. I don't know if anything bad happens * if the CIB is ahead of us. But I know for sure that bad things * happen when the CIB is behind us (the CRM). * * This probably has effects beyond that of running things without quorum * or failing to run things when we have quorum. * * We might try and run things on nodes that aren't running, and we * might fail to schedule something on a node which is really available * for use. I'm pretty sure I've seen the latter occur * * A crude method would be to poll every 100ms and detect when the CRM * and CIB membership versions are the same. I suspect if I knew * the code better, there probably is a callback which occurs when the * CIB is updated which we could use to trigger the delayed PE invocation. * There _might_ also need to be a mechanism for cancelling this delayed * pengine invocation - depending on what else happens after we * get this far (this doesn't seem that likely) * --AlanR. * */ if(is_set(fsa_input_register, R_PE_CONNECTED) == FALSE){ if(pe_subsystem->pid > 0) { int pid_status = -1; int rc = waitpid( pe_subsystem->pid, &pid_status, WNOHANG); if(rc > 0 && WIFEXITED(pid_status)) { clear_bit_inplace(fsa_input_register, pe_subsystem->flag_connected); if(is_set(fsa_input_register, pe_subsystem->flag_required)) { /* this wasnt supposed to happen */ crm_err("%s[%d] terminated during start", pe_subsystem->name, pe_subsystem->pid); register_fsa_error( C_FSA_INTERNAL, I_ERROR, NULL); } pe_subsystem->pid = -1; return I_NULL; } } crm_info("Waiting for the PE to connect"); crmd_fsa_stall(); return I_NULL; } + crm_debug("Requesting the current CIB"); call_id = fsa_cib_conn->cmds->query( fsa_cib_conn, NULL, NULL, cib_scope_local); if(FALSE == add_cib_op_callback( call_id, TRUE, NULL, do_pe_invoke_callback)) { crm_err("Cant retrieve the CIB to invoke the %s subsystem with", pe_subsystem->name); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } return I_NULL; } void do_pe_invoke_callback(const HA_Message *msg, int call_id, int rc, crm_data_t *output, void *user_data) { HA_Message *cmd = NULL; int ccm_transition_id = -1; gboolean cib_has_quorum = FALSE; crm_data_t *local_cib = find_xml_node(output, XML_TAG_CIB, TRUE); if(AM_I_DC == FALSE || is_set(fsa_input_register, R_PE_CONNECTED) == FALSE || fsa_state != S_POLICY_ENGINE) { crm_debug("No need to invoke the PE anymore"); return; } crm_verbose("Invoking %s with %p", CRM_SYSTEM_PENGINE, local_cib); CRM_DEV_ASSERT(local_cib != NULL); CRM_DEV_ASSERT(crm_element_value(local_cib, XML_ATTR_DC_UUID) != NULL); cib_has_quorum = crm_is_true( crm_element_value(local_cib, XML_ATTR_HAVE_QUORUM)); ccm_transition_id = crm_atoi( crm_element_value(local_cib, XML_ATTR_CCM_TRANSITION), "-1"); if(ccm_transition_id != fsa_membership_copy->id) { - crm_info("Re-asking for the CIB until membership/quorum" + crm_err("Re-asking for the CIB until membership/quorum" " matches: CIB=%d, CRM=%d", ccm_transition_id, fsa_membership_copy->id); - register_fsa_action(A_PE_INVOKE, TRUE); + register_fsa_action(A_PE_INVOKE); return; } if(fsa_pe_ref) { crm_free(fsa_pe_ref); fsa_pe_ref = NULL; } cmd = create_request( CRM_OP_PECALC, local_cib, NULL, CRM_SYSTEM_PENGINE, CRM_SYSTEM_DC, NULL); send_request(cmd, &fsa_pe_ref); } diff --git a/lib/crm/common/ipc.c b/lib/crm/common/ipc.c index 4fa5581d40..eec1d8d669 100644 --- a/lib/crm/common/ipc.c +++ b/lib/crm/common/ipc.c @@ -1,388 +1,388 @@ -/* $Id: ipc.c,v 1.4 2005/04/28 08:37:06 andrew Exp $ */ +/* $Id: ipc.c,v 1.5 2005/05/11 17:40:00 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef USE_LIBXML # include # include # include #endif #include #include #include #include gboolean send_ha_message(ll_cluster_t *hb_conn, HA_Message *msg, const char *node) { gboolean all_is_good = TRUE; if (msg == NULL) { crm_err("cant send NULL message"); all_is_good = FALSE; } else if(hb_conn == NULL) { crm_err("No heartbeat connection specified"); all_is_good = FALSE; } else if(hb_conn->llc_ops->chan_is_connected(hb_conn) != HA_OK) { crm_err("Not connected to Heartbeat"); all_is_good = FALSE; } else if(node != NULL) { if(hb_conn->llc_ops->send_ordered_nodemsg( hb_conn, msg, node) != HA_OK) { IPC_Channel *ipc = hb_conn->llc_ops->ipcchan(hb_conn); all_is_good = FALSE; crm_err("Send failed"); CRM_DEV_ASSERT(ipc->send_queue->current_qlen < ipc->send_queue->max_qlen); } else { crm_verbose("Message sent..."); } } else { if(hb_conn->llc_ops->sendclustermsg(hb_conn, msg) != HA_OK) { IPC_Channel *ipc = hb_conn->llc_ops->ipcchan(hb_conn); all_is_good = FALSE; crm_err("Broadcast Send failed"); CRM_DEV_ASSERT(ipc->send_queue->current_qlen < ipc->send_queue->max_qlen); } else { crm_verbose("Broadcast message sent..."); } } crm_log_message_adv(all_is_good?LOG_MSG:LOG_WARNING,"HA[outbound]",msg); return all_is_good; } #define ipc_log(fmt...) do_crm_log(server?LOG_WARNING:LOG_ERR, __FILE__, __FUNCTION__, fmt) /* frees msg */ gboolean crm_send_ipc_message(IPC_Channel *ipc_client, HA_Message *msg, gboolean server) { gboolean all_is_good = TRUE; if (msg == NULL) { crm_err("cant send NULL message"); all_is_good = FALSE; } else if (ipc_client == NULL) { crm_err("cant send message without an IPC Channel"); all_is_good = FALSE; } else if(ipc_client->ops->get_chan_status(ipc_client) != IPC_CONNECT) { ipc_log("IPC Channel is not connected"); all_is_good = FALSE; } if(all_is_good && msg2ipcchan(msg, ipc_client) != HA_OK) { ipc_log("Could not send IPC, message"); all_is_good = FALSE; if(ipc_client->ops->get_chan_status(ipc_client) != IPC_CONNECT) { ipc_log("IPC Channel is no longer connected"); } else if(server == FALSE) { CRM_DEV_ASSERT(ipc_client->send_queue->current_qlen < ipc_client->send_queue->max_qlen); } } crm_log_message_adv(all_is_good?LOG_MSG:LOG_WARNING,"IPC[outbound]",msg); crm_msg_del(msg); return all_is_good; } void default_ipc_connection_destroy(gpointer user_data) { return; } int init_server_ipc_comms( char *channel_name, gboolean (*channel_client_connect)(IPC_Channel *newclient,gpointer user_data), void (*channel_connection_destroy)(gpointer user_data)) { /* the clients wait channel is the other source of events. * This source delivers the clients connection events. * listen to this source at a relatively lower priority. */ char commpath[SOCKET_LEN]; IPC_WaitConnection *wait_ch; sprintf(commpath, WORKING_DIR "/%s", channel_name); wait_ch = wait_channel_init(commpath); if (wait_ch == NULL) { return 1; } G_main_add_IPC_WaitConnection( G_PRIORITY_LOW, wait_ch, NULL, FALSE, channel_client_connect, channel_name, channel_connection_destroy); crm_devel("Listening on: %s", commpath); return 0; } GCHSource* init_client_ipc_comms(const char *channel_name, gboolean (*dispatch)( IPC_Channel* source_data, gpointer user_data), void *client_data, IPC_Channel **ch) { IPC_Channel *a_ch = NULL; GCHSource *the_source = NULL; void *callback_data = client_data; a_ch = init_client_ipc_comms_nodispatch(channel_name); if(ch != NULL) { *ch = a_ch; if(callback_data == NULL) { callback_data = a_ch; } } if(a_ch == NULL) { crm_err("Setup of client connection failed," " not adding channel to mainloop"); return NULL; } if(dispatch == NULL) { crm_warn("No dispatch method specified..." "maybe you meant init_client_ipc_comms_nodispatch()?"); } else { crm_devel("Adding dispatch method to channel"); the_source = G_main_add_IPC_Channel( G_PRIORITY_HIGH, a_ch, FALSE, dispatch, callback_data, default_ipc_connection_destroy); } return the_source; } IPC_Channel * init_client_ipc_comms_nodispatch(const char *channel_name) { IPC_Channel *ch; GHashTable *attrs; static char path[] = IPC_PATH_ATTR; char *commpath = NULL; int local_socket_len = 2; /* 2 = '/' + '\0' */ local_socket_len += strlen(channel_name); local_socket_len += strlen(WORKING_DIR); crm_malloc0(commpath, sizeof(char)*local_socket_len); if(commpath != NULL) { sprintf(commpath, WORKING_DIR "/%s", channel_name); commpath[local_socket_len - 1] = '\0'; crm_devel("Attempting to talk on: %s", commpath); } attrs = g_hash_table_new(g_str_hash,g_str_equal); g_hash_table_insert(attrs, path, commpath); ch = ipc_channel_constructor(IPC_ANYTYPE, attrs); g_hash_table_destroy(attrs); if (ch == NULL) { crm_err("Could not access channel on: %s", commpath); return NULL; } else if (ch->ops->initiate_connection(ch) != IPC_OK) { crm_debug("Could not init comms on: %s", commpath); return NULL; } ch->ops->set_recv_qlen(ch, 100); ch->ops->set_send_qlen(ch, 100); /* ch->should_send_block = TRUE; */ crm_devel("Processing of %s complete", commpath); return ch; } IPC_WaitConnection * wait_channel_init(char daemonsocket[]) { IPC_WaitConnection *wait_ch; mode_t mask; char path[] = IPC_PATH_ATTR; GHashTable * attrs; attrs = g_hash_table_new(g_str_hash,g_str_equal); g_hash_table_insert(attrs, path, daemonsocket); mask = umask(0); wait_ch = ipc_wait_conn_constructor(IPC_ANYTYPE, attrs); if (wait_ch == NULL) { cl_perror("Can't create wait channel of type %s", IPC_ANYTYPE); exit(1); } mask = umask(mask); g_hash_table_destroy(attrs); return wait_ch; } longclock_t ipc_call_start = 0; longclock_t ipc_call_stop = 0; longclock_t ipc_call_diff = 0; -int ipc_call_diff_ms = 0; gboolean subsystem_msg_dispatch(IPC_Channel *sender, void *user_data) { int lpc = 0; IPC_Message *msg = NULL; ha_msg_input_t *new_input = NULL; gboolean all_is_well = TRUE; const char *sys_to; const char *task; while(sender->ops->is_message_pending(sender)) { gboolean process = FALSE; if (sender->ch_status == IPC_DISCONNECT) { /* The message which was pending for us is that * the IPC status is now IPC_DISCONNECT */ break; } if (sender->ops->recv(sender, &msg) != IPC_OK) { perror("Receive failure:"); return !all_is_well; } if (msg == NULL) { crm_err("No message this time"); continue; } lpc++; new_input = new_ipc_msg_input(msg); msg->msg_done(msg); crm_log_message(LOG_MSG, new_input->msg); sys_to = cl_get_string(new_input->msg, F_CRM_SYS_TO); task = cl_get_string(new_input->msg, F_CRM_TASK); if(safe_str_eq(task, CRM_OP_HELLO)) { process = TRUE; } else if(sys_to == NULL) { crm_err("Value of %s was NULL!!", F_CRM_SYS_TO); } else if(task == NULL) { crm_err("Value of %s was NULL!!", F_CRM_TASK); } else { process = TRUE; } if(process){ gboolean (*process_function) (HA_Message *msg, crm_data_t *data, IPC_Channel *sender) = NULL; process_function = user_data; #ifdef MSG_LOG crm_log_message_adv( LOG_MSG, __FUNCTION__, new_input->msg); #endif if(ipc_call_diff_max_ms > 0) { ipc_call_start = time_longclock(); } if(FALSE == process_function( new_input->msg, new_input->xml, sender)) { crm_warn("Received a message destined for %s" " by mistake", sys_to); } if(ipc_call_diff_max_ms > 0) { + unsigned int ipc_call_diff_ms = 0; ipc_call_stop = time_longclock(); ipc_call_diff = sub_longclock( - ipc_call_start, ipc_call_stop); + ipc_call_stop, ipc_call_start); ipc_call_diff_ms = longclockto_ms( ipc_call_diff); if(ipc_call_diff_ms > ipc_call_diff_max_ms) { crm_err("%s took %dms to complete", sys_to, ipc_call_diff_ms); } } } else { #ifdef MSG_LOG crm_log_message_adv( LOG_ERR, NULL, new_input->msg); #endif } delete_ha_msg_input(new_input); msg = NULL; } /* clean up after a break */ if(msg != NULL) { msg->msg_done(msg); } crm_verbose("Processed %d messages", lpc); if (sender->ch_status != IPC_CONNECT) { crm_err("The server has left us: Shutting down...NOW"); exit(1); /* shutdown properly later */ return !all_is_well; } return all_is_well; }