diff --git a/crm/cib/main.c b/crm/cib/main.c index 744a285ea6..a932d07470 100644 --- a/crm/cib/main.c +++ b/crm/cib/main.c @@ -1,538 +1,538 @@ -/* $Id: main.c,v 1.54 2006/07/07 21:26:29 andrew Exp $ */ +/* $Id: main.c,v 1.55 2006/07/09 09:54:09 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* #include */ #include /* #include */ #include #include #include #include #include #include #include #include #include #include gboolean cib_shutdown_flag = FALSE; gboolean stand_alone = FALSE; gboolean per_action_cib = FALSE; enum cib_errors cib_status = cib_ok; extern char *ccm_transition_id; extern void oc_ev_special(const oc_ev_t *, oc_ev_class_t , int ); GMainLoop* mainloop = NULL; const char* crm_system_name = CRM_SYSTEM_CIB; char *cib_our_uname = NULL; oc_ev_t *cib_ev_token; gboolean cib_writes_enabled = TRUE; void usage(const char* cmd, int exit_status); int init_start(void); gboolean cib_register_ha(ll_cluster_t *hb_cluster, const char *client_name); gboolean cib_shutdown(int nsig, gpointer unused); void cib_ha_connection_destroy(gpointer user_data); gboolean startCib(const char *filename); extern gboolean cib_msg_timeout(gpointer data); extern int write_cib_contents(gpointer p); ll_cluster_t *hb_conn = NULL; GTRIGSource *cib_writer = NULL; #define OPTARGS "hVsf" static void cib_diskwrite_complete(gpointer userdata, int status, int signo, int exitcode) { if(exitcode != LSB_EXIT_OK || signo != 0 || status != 0) { crm_err("Disk write failed: status=%d, signo=%d, exitcode=%d", status, signo, exitcode); if(cib_writes_enabled) { crm_err("Disabling disk writes after write failure"); cib_writes_enabled = FALSE; } } else { crm_debug_2("Disk write passed"); } } int main(int argc, char ** argv) { int flag; int argerr = 0; crm_log_init(crm_system_name); G_main_add_SignalHandler( G_PRIORITY_HIGH, SIGTERM, cib_shutdown, NULL, NULL); cib_writer = G_main_add_tempproc_trigger( G_PRIORITY_LOW, write_cib_contents, "write_cib_contents", NULL, NULL, NULL, cib_diskwrite_complete); EnableProcLogging(); set_sigchld_proctrack(G_PRIORITY_HIGH); client_list = g_hash_table_new(g_str_hash, g_str_equal); peer_hash = g_hash_table_new(g_str_hash, g_str_equal); while ((flag = getopt(argc, argv, OPTARGS)) != EOF) { switch(flag) { case 'V': alter_debug(DEBUG_INC); break; case 's': stand_alone = TRUE; cl_log_enable_stderr(1); break; case 'h': /* Help message */ usage(crm_system_name, LSB_EXIT_OK); break; case 'f': per_action_cib = TRUE; break; default: ++argerr; break; } } crm_info("Retrieval of a per-action CIB: %s", per_action_cib?"enabled":"disabled"); if (optind > argc) { ++argerr; } if (argerr) { usage(crm_system_name,LSB_EXIT_GENERIC); } /* read local config file */ return init_start(); } unsigned long cib_num_ops = 0; -const char *cib_stat_interval = "10s"; +const char *cib_stat_interval = "10min"; unsigned long cib_num_local = 0, cib_num_updates = 0, cib_num_fail = 0; unsigned long cib_bad_connects = 0, cib_num_timeouts = 0; longclock_t cib_call_time = 0; gboolean cib_stats(gpointer data); gboolean cib_stats(gpointer data) { int local_log_level = LOG_DEBUG; static unsigned long last_stat = 0; unsigned int cib_calls_ms = 0; static unsigned long cib_stat_interval_ms = 0; cl_mem_stats_t saved_stats; if(crm_running_stats == NULL) { START_stat_free_op(); crm_malloc0(crm_running_stats, sizeof(cl_mem_stats_t)); END_stat_free_op(); crm_zero_mem_stats(crm_running_stats); } crm_save_mem_stats(__PRETTY_FUNCTION__, &saved_stats); crm_diff_mem_stats(LOG_ERR, LOG_ERR, __PRETTY_FUNCTION__, NULL, crm_running_stats); *crm_running_stats = saved_stats; crm_info("Total alloc's %ld for %ld bytes", crm_running_stats->numalloc, crm_running_stats->nbytes_alloc); if(cib_stat_interval_ms == 0) { cib_stat_interval_ms = crm_get_msec(cib_stat_interval); } cib_calls_ms = longclockto_ms(cib_call_time); if((cib_num_ops - last_stat) > 0) { unsigned long calls_diff = cib_num_ops - last_stat; double stat_1 = (1000*cib_calls_ms)/calls_diff; local_log_level = LOG_INFO; crm_log_maybe(local_log_level, "Processed %lu operations" " (%.2fus average, %lu%% utilization) in the last %s", calls_diff, stat_1, (100*cib_calls_ms)/cib_stat_interval_ms, cib_stat_interval); } crm_log_maybe(local_log_level+1, "\tDetail: %lu operations (%ums total)" " (%lu local, %lu updates, %lu failures," " %lu timeouts, %lu bad connects)", cib_num_ops, cib_calls_ms, cib_num_local, cib_num_updates, cib_num_fail, cib_bad_connects, cib_num_timeouts); #ifdef HA_MALLOC_TRACK cl_malloc_dump_allocated(LOG_ERR, TRUE); #endif last_stat = cib_num_ops; cib_call_time = 0; return TRUE; } int init_start(void) { gboolean was_error = FALSE; if(stand_alone == FALSE) { hb_conn = ll_cluster_new("heartbeat"); if(cib_register_ha(hb_conn, CRM_SYSTEM_CIB) == FALSE) { crm_crit("Cannot sign in to heartbeat... terminating"); exit(1); } } if(startCib(CIB_FILENAME) == FALSE){ crm_crit("Cannot start CIB... terminating"); exit(1); } was_error = init_server_ipc_comms( crm_strdup(cib_channel_callback), cib_client_connect_null, default_ipc_connection_destroy); was_error = was_error || init_server_ipc_comms( crm_strdup(cib_channel_ro), cib_client_connect_rw_ro, default_ipc_connection_destroy); was_error = was_error || init_server_ipc_comms( crm_strdup(cib_channel_rw), cib_client_connect_rw_ro, default_ipc_connection_destroy); was_error = was_error || init_server_ipc_comms( crm_strdup(cib_channel_rw_synchronous), cib_client_connect_rw_synch, default_ipc_connection_destroy); was_error = was_error || init_server_ipc_comms( crm_strdup(cib_channel_ro_synchronous), cib_client_connect_ro_synch, default_ipc_connection_destroy); if(stand_alone) { if(was_error) { crm_err("Couldnt start"); return 1; } cib_is_master = TRUE; /* Create the mainloop and run it... */ mainloop = g_main_new(FALSE); crm_info("Starting %s mainloop", crm_system_name); /* Gmain_timeout_add(crm_get_msec("10s"), cib_msg_timeout, NULL); */ /* Gmain_timeout_add( */ /* crm_get_msec(cib_stat_interval), cib_stats, NULL); */ g_main_run(mainloop); return_to_orig_privs(); return 0; } if(was_error == FALSE) { crm_debug_3("Be informed of CRM Client Status changes"); if (HA_OK != hb_conn->llc_ops->set_cstatus_callback( hb_conn, cib_client_status_callback, hb_conn)) { crm_err("Cannot set cstatus callback: %s", hb_conn->llc_ops->errmsg(hb_conn)); was_error = TRUE; } else { crm_debug_3("Client Status callback set"); } } if(was_error == FALSE) { gboolean did_fail = TRUE; int num_ccm_fails = 0; int max_ccm_fails = 30; int ret; int cib_ev_fd; while(did_fail && was_error == FALSE) { did_fail = FALSE; crm_debug_3("Registering with CCM"); ret = oc_ev_register(&cib_ev_token); if (ret != 0) { crm_warn("CCM registration failed"); did_fail = TRUE; } if(did_fail == FALSE) { crm_debug_3("Setting up CCM callbacks"); ret = oc_ev_set_callback( cib_ev_token, OC_EV_MEMB_CLASS, cib_ccm_msg_callback, NULL); if (ret != 0) { crm_warn("CCM callback not set"); did_fail = TRUE; } } if(did_fail == FALSE) { oc_ev_special(cib_ev_token, OC_EV_MEMB_CLASS, 0); crm_debug_3("Activating CCM token"); ret = oc_ev_activate(cib_ev_token, &cib_ev_fd); if (ret != 0){ crm_warn("CCM Activation failed"); did_fail = TRUE; } } if(did_fail) { num_ccm_fails++; oc_ev_unregister(cib_ev_token); if(num_ccm_fails < max_ccm_fails){ crm_warn("CCM Connection failed" " %d times (%d max)", num_ccm_fails, max_ccm_fails); sleep(1); } else { crm_err("CCM Activation failed" " %d (max) times", num_ccm_fails); was_error = TRUE; } } } if(was_error == FALSE) { crm_debug_3("CCM Activation passed... all set to go!"); G_main_add_fd(G_PRIORITY_HIGH, cib_ev_fd, FALSE, cib_ccm_dispatch, cib_ev_token, default_ipc_connection_destroy); } } if(was_error == FALSE) { /* Async get client status information in the cluster */ crm_debug_3("Requesting an initial dump of CIB client_status"); hb_conn->llc_ops->client_status( hb_conn, NULL, CRM_SYSTEM_CIB, -1); /* Create the mainloop and run it... */ mainloop = g_main_new(FALSE); crm_info("Starting %s mainloop", crm_system_name); Gmain_timeout_add(crm_get_msec("10s"), cib_msg_timeout, NULL); Gmain_timeout_add( crm_get_msec(cib_stat_interval), cib_stats, NULL); g_main_run(mainloop); return_to_orig_privs(); } else { crm_err("Couldnt start all communication channels, exiting."); } return 0; } void usage(const char* cmd, int exit_status) { FILE* stream; stream = exit_status ? stderr : stdout; fprintf(stream, "usage: %s [-srkh]" "[-c configure file]\n", cmd); /* fprintf(stream, "\t-d\tsets debug level\n"); */ /* fprintf(stream, "\t-s\tgets daemon status\n"); */ /* fprintf(stream, "\t-r\trestarts daemon\n"); */ /* fprintf(stream, "\t-k\tstops daemon\n"); */ /* fprintf(stream, "\t-h\thelp message\n"); */ fflush(stream); exit(exit_status); } gboolean cib_register_ha(ll_cluster_t *hb_cluster, const char *client_name) { const char *uname = NULL; crm_info("Signing in with Heartbeat"); if (hb_cluster->llc_ops->signon(hb_cluster, client_name)!= HA_OK) { crm_err("Cannot sign on with heartbeat: %s", hb_cluster->llc_ops->errmsg(hb_cluster)); return FALSE; } crm_debug_3("Be informed of CIB messages"); if (HA_OK != hb_cluster->llc_ops->set_msg_callback( hb_cluster, T_CIB, cib_peer_callback, hb_cluster)){ crm_err("Cannot set msg callback: %s", hb_cluster->llc_ops->errmsg(hb_cluster)); return FALSE; } crm_debug_3("Finding our node name"); if ((uname = hb_cluster->llc_ops->get_mynodeid(hb_cluster)) == NULL) { crm_err("get_mynodeid() failed"); return FALSE; } cib_our_uname = crm_strdup(uname); crm_info("FSA Hostname: %s", cib_our_uname); crm_debug_3("Adding channel to mainloop"); G_main_add_IPC_Channel( G_PRIORITY_DEFAULT, hb_cluster->llc_ops->ipcchan(hb_cluster), FALSE, cib_ha_dispatch, hb_cluster /* userdata */, cib_ha_connection_destroy); return TRUE; } void cib_ha_connection_destroy(gpointer user_data) { if(cib_shutdown_flag) { crm_info("Heartbeat disconnection complete... exiting"); } else { crm_err("Heartbeat connection lost! Exiting."); } uninitializeCib(); crm_free(ccm_transition_id); #ifdef HA_MALLOC_TRACK cl_malloc_dump_allocated(LOG_ERR, FALSE); #endif if (mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); } else { exit(LSB_EXIT_OK); } } static void disconnect_cib_client(gpointer key, gpointer value, gpointer user_data) { cib_client_t *a_client = value; crm_debug_2("Processing client %s/%s... send=%d, recv=%d", a_client->name, a_client->channel_name, (int)a_client->channel->send_queue->current_qlen, (int)a_client->channel->recv_queue->current_qlen); if(a_client->channel->ch_status == IPC_CONNECT) { a_client->channel->ops->resume_io(a_client->channel); if(a_client->channel->send_queue->current_qlen != 0 || a_client->channel->recv_queue->current_qlen != 0) { crm_info("Flushed messages to/from %s/%s... send=%d, recv=%d", a_client->name, a_client->channel_name, (int)a_client->channel->send_queue->current_qlen, (int)a_client->channel->recv_queue->current_qlen); } } if(a_client->channel->ch_status == IPC_CONNECT) { crm_warn("Disconnecting %s/%s...", a_client->name, a_client->channel_name); a_client->channel->ops->disconnect(a_client->channel); } } extern gboolean cib_process_disconnect( IPC_Channel *channel, cib_client_t *cib_client); gboolean cib_shutdown(int nsig, gpointer unused) { if(cib_shutdown_flag == FALSE) { cib_shutdown_flag = TRUE; crm_debug("Disconnecting %d clients", g_hash_table_size(client_list)); g_hash_table_foreach(client_list, disconnect_cib_client, NULL); crm_info("Disconnected %d clients", g_hash_table_size(client_list)); cib_process_disconnect(NULL, NULL); } else { crm_info("Waiting for %d clients to disconnect...", g_hash_table_size(client_list)); } return TRUE; } gboolean startCib(const char *filename) { gboolean active = FALSE; crm_data_t *cib = readCibXmlFile(filename, TRUE); if(cib == NULL) { crm_warn("Cluster configuration not found: %s." " Creating an empty one.", filename); cib = createEmptyCib(); crm_xml_add(cib, XML_ATTR_GENERATION_ADMIN, "0"); crm_xml_add(cib, XML_ATTR_GENERATION, "0"); crm_xml_add(cib, XML_ATTR_NUMUPDATES, "0"); } if(activateCibXml(cib, filename) == 0) { active = TRUE; crm_info("CIB Initialization completed successfully"); if(per_action_cib) { uninitializeCib(); } } return active; } diff --git a/lib/crm/common/ipc.c b/lib/crm/common/ipc.c index 35861993a1..e01ee66e61 100644 --- a/lib/crm/common/ipc.c +++ b/lib/crm/common/ipc.c @@ -1,432 +1,432 @@ -/* $Id: ipc.c,v 1.25 2006/07/06 13:30:23 andrew Exp $ */ +/* $Id: ipc.c,v 1.26 2006/07/09 09:54:09 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include gboolean send_ha_message(ll_cluster_t *hb_conn, HA_Message *msg, const char *node, gboolean force_ordered) { gboolean all_is_good = TRUE; cl_mem_stats_t saved_stats; crm_save_mem_stats(__PRETTY_FUNCTION__, &saved_stats); if (msg == NULL) { crm_err("cant send NULL message"); all_is_good = FALSE; } else if(hb_conn == NULL) { crm_err("No heartbeat connection specified"); all_is_good = FALSE; } else if(hb_conn->llc_ops->chan_is_connected(hb_conn) == FALSE) { crm_err("Not connected to Heartbeat"); all_is_good = FALSE; } else if(node != NULL) { if(hb_conn->llc_ops->send_ordered_nodemsg( hb_conn, msg, node) != HA_OK) { all_is_good = FALSE; crm_err("Send failed"); } else { crm_debug_2("Message sent..."); } } else if(force_ordered) { if(hb_conn->llc_ops->send_ordered_clustermsg(hb_conn, msg) != HA_OK) { all_is_good = FALSE; crm_err("Broadcast Send failed"); } else { crm_debug_2("Broadcast message sent..."); } } else { if(hb_conn->llc_ops->sendclustermsg(hb_conn, msg) != HA_OK) { all_is_good = FALSE; crm_err("Broadcast Send failed"); } else { crm_debug_2("Broadcast message sent..."); } } if(all_is_good == FALSE && hb_conn != NULL) { IPC_Channel *ipc = NULL; IPC_Queue *send_q = NULL; if(hb_conn->llc_ops->chan_is_connected(hb_conn) != HA_OK) { ipc = hb_conn->llc_ops->ipcchan(hb_conn); } if(ipc != NULL) { - ipc->ops->resume_io(ipc); +/* ipc->ops->resume_io(ipc); */ send_q = ipc->send_queue; } if(send_q != NULL) { CRM_CHECK(send_q->current_qlen < send_q->max_qlen, ;); } } crm_log_message_adv(all_is_good?LOG_MSG:LOG_WARNING,"HA[outbound]",msg); crm_diff_mem_stats(LOG_DEBUG, LOG_DEBUG, __PRETTY_FUNCTION__, NULL, &saved_stats); return all_is_good; } /* frees msg */ gboolean send_ipc_message(IPC_Channel *ipc_client, HA_Message *msg) { gboolean all_is_good = TRUE; int fail_level = LOG_WARNING; cl_mem_stats_t saved_stats; crm_save_mem_stats(__PRETTY_FUNCTION__, &saved_stats); if(ipc_client != NULL && ipc_client->conntype == IPC_CLIENT) { fail_level = LOG_ERR; } if (msg == NULL) { crm_err("cant send NULL message"); all_is_good = FALSE; } else if (ipc_client == NULL) { crm_err("cant send message without an IPC Channel"); all_is_good = FALSE; } else if(ipc_client->ops->get_chan_status(ipc_client) != IPC_CONNECT) { crm_log_maybe(fail_level, "IPC Channel to %d is not connected", (int)ipc_client->farside_pid); all_is_good = FALSE; } if(all_is_good && msg2ipcchan(msg, ipc_client) != HA_OK) { crm_log_maybe(fail_level, "Could not send IPC message to %d", (int)ipc_client->farside_pid); all_is_good = FALSE; if(ipc_client->ops->get_chan_status(ipc_client) != IPC_CONNECT) { crm_log_maybe(fail_level, "IPC Channel to %d is no longer connected", (int)ipc_client->farside_pid); } else if(ipc_client->conntype == IPC_CLIENT) { CRM_CHECK(ipc_client->send_queue->current_qlen < ipc_client->send_queue->max_qlen, ;); } } - ipc_client->ops->resume_io(ipc_client); +/* ipc_client->ops->resume_io(ipc_client); */ crm_log_message_adv(all_is_good?LOG_MSG:LOG_WARNING,"IPC[outbound]",msg); crm_diff_mem_stats(LOG_DEBUG, LOG_DEBUG, __PRETTY_FUNCTION__, NULL, &saved_stats); return all_is_good; } void default_ipc_connection_destroy(gpointer user_data) { return; } int init_server_ipc_comms( char *channel_name, gboolean (*channel_client_connect)(IPC_Channel *newclient,gpointer user_data), void (*channel_connection_destroy)(gpointer user_data)) { /* the clients wait channel is the other source of events. * This source delivers the clients connection events. * listen to this source at a relatively lower priority. */ char commpath[SOCKET_LEN]; IPC_WaitConnection *wait_ch; sprintf(commpath, CRM_SOCK_DIR "/%s", channel_name); wait_ch = wait_channel_init(commpath); if (wait_ch == NULL) { return 1; } G_main_add_IPC_WaitConnection( G_PRIORITY_LOW, wait_ch, NULL, FALSE, channel_client_connect, channel_name, channel_connection_destroy); crm_debug_3("Listening on: %s", commpath); return 0; } GCHSource* init_client_ipc_comms(const char *channel_name, gboolean (*dispatch)( IPC_Channel* source_data, gpointer user_data), void *client_data, IPC_Channel **ch) { IPC_Channel *a_ch = NULL; GCHSource *the_source = NULL; void *callback_data = client_data; a_ch = init_client_ipc_comms_nodispatch(channel_name); if(ch != NULL) { *ch = a_ch; if(callback_data == NULL) { callback_data = a_ch; } } if(a_ch == NULL) { crm_warn("Setup of client connection failed," " not adding channel to mainloop"); return NULL; } if(dispatch == NULL) { crm_warn("No dispatch method specified..." "maybe you meant init_client_ipc_comms_nodispatch()?"); } else { crm_debug_3("Adding dispatch method to channel"); the_source = G_main_add_IPC_Channel( G_PRIORITY_HIGH, a_ch, FALSE, dispatch, callback_data, default_ipc_connection_destroy); } return the_source; } IPC_Channel * init_client_ipc_comms_nodispatch(const char *channel_name) { IPC_Channel *ch; GHashTable *attrs; static char path[] = IPC_PATH_ATTR; char *commpath = NULL; int local_socket_len = 2; /* 2 = '/' + '\0' */ local_socket_len += strlen(channel_name); local_socket_len += strlen(CRM_SOCK_DIR); crm_malloc0(commpath, local_socket_len); if(commpath != NULL) { sprintf(commpath, CRM_SOCK_DIR "/%s", channel_name); commpath[local_socket_len - 1] = '\0'; crm_debug_3("Attempting to talk on: %s", commpath); } attrs = g_hash_table_new(g_str_hash,g_str_equal); g_hash_table_insert(attrs, path, commpath); ch = ipc_channel_constructor(IPC_ANYTYPE, attrs); g_hash_table_destroy(attrs); if (ch == NULL) { crm_err("Could not access channel on: %s", commpath); return NULL; } else if (ch->ops->initiate_connection(ch) != IPC_OK) { crm_debug("Could not init comms on: %s", commpath); ch->ops->destroy(ch); return NULL; } ch->ops->set_recv_qlen(ch, 100); ch->ops->set_send_qlen(ch, 100); /* ch->should_send_block = TRUE; */ crm_debug_3("Processing of %s complete", commpath); return ch; } IPC_WaitConnection * wait_channel_init(char daemonsocket[]) { IPC_WaitConnection *wait_ch; mode_t mask; char path[] = IPC_PATH_ATTR; GHashTable * attrs; attrs = g_hash_table_new(g_str_hash,g_str_equal); g_hash_table_insert(attrs, path, daemonsocket); mask = umask(0); wait_ch = ipc_wait_conn_constructor(IPC_ANYTYPE, attrs); if (wait_ch == NULL) { cl_perror("Can't create wait channel of type %s", IPC_ANYTYPE); exit(1); } mask = umask(mask); g_hash_table_destroy(attrs); return wait_ch; } longclock_t ipc_call_start = 0; longclock_t ipc_call_stop = 0; longclock_t ipc_call_diff = 0; gboolean subsystem_msg_dispatch(IPC_Channel *sender, void *user_data) { int lpc = 0; HA_Message *msg = NULL; ha_msg_input_t *new_input = NULL; gboolean all_is_well = TRUE; const char *sys_to; const char *task; while(IPC_ISRCONN(sender)) { gboolean process = FALSE; if(sender->ops->is_message_pending(sender) == 0) { break; } msg = msgfromIPC_noauth(sender); if (msg == NULL) { crm_err("No message from %d this time", sender->farside_pid); continue; } lpc++; new_input = new_ha_msg_input(msg); crm_msg_del(msg); msg = NULL; crm_log_message(LOG_MSG, new_input->msg); sys_to = cl_get_string(new_input->msg, F_CRM_SYS_TO); task = cl_get_string(new_input->msg, F_CRM_TASK); if(safe_str_eq(task, CRM_OP_HELLO)) { process = TRUE; } else if(sys_to == NULL) { crm_err("Value of %s was NULL!!", F_CRM_SYS_TO); } else if(task == NULL) { crm_err("Value of %s was NULL!!", F_CRM_TASK); } else { process = TRUE; } if(process){ gboolean (*process_function) (HA_Message *msg, crm_data_t *data, IPC_Channel *sender) = NULL; process_function = user_data; #ifdef MSG_LOG crm_log_message_adv( LOG_MSG, __FUNCTION__, new_input->msg); #endif if(ipc_call_diff_max_ms > 0) { ipc_call_start = time_longclock(); } if(FALSE == process_function( new_input->msg, new_input->xml, sender)) { crm_warn("Received a message destined for %s" " by mistake", sys_to); } if(ipc_call_diff_max_ms > 0) { unsigned int ipc_call_diff_ms = 0; ipc_call_stop = time_longclock(); ipc_call_diff = sub_longclock( ipc_call_stop, ipc_call_start); ipc_call_diff_ms = longclockto_ms( ipc_call_diff); if(ipc_call_diff_ms > ipc_call_diff_max_ms) { crm_err("%s took %dms to complete", sys_to, ipc_call_diff_ms); } } } else { #ifdef MSG_LOG crm_log_message_adv( LOG_ERR, NULL, new_input->msg); #endif } delete_ha_msg_input(new_input); new_input = NULL; if(sender->ch_status == IPC_CONNECT) { break; } } crm_debug_2("Processed %d messages", lpc); if (sender->ch_status != IPC_CONNECT) { crm_err("The server %d has left us: Shutting down...NOW", sender->farside_pid); exit(1); /* shutdown properly later */ return !all_is_well; } return all_is_well; } gboolean is_ipc_empty(IPC_Channel *ch) { if(ch == NULL) { return TRUE; } else if(ch->send_queue->current_qlen == 0 && ch->recv_queue->current_qlen == 0) { return TRUE; } return FALSE; }