diff --git a/cib/main.c b/cib/main.c index 5ba19fd290..684af26719 100644 --- a/cib/main.c +++ b/cib/main.c @@ -1,652 +1,652 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if HAVE_LIBXML2 # include #endif #ifdef HAVE_GETOPT_H # include #endif #if HAVE_BZLIB_H # include #endif extern int init_remote_listener(int port, gboolean encrypted); extern gboolean stand_alone; gboolean cib_shutdown_flag = FALSE; enum cib_errors cib_status = cib_ok; #if SUPPORT_HEARTBEAT oc_ev_t *cib_ev_token; ll_cluster_t *hb_conn = NULL; extern void oc_ev_special(const oc_ev_t *, oc_ev_class_t, int); gboolean cib_register_ha(ll_cluster_t * hb_cluster, const char *client_name); #endif extern void terminate_cib(const char *caller, gboolean fast); GMainLoop *mainloop = NULL; const char *cib_root = CRM_CONFIG_DIR; char *cib_our_uname = NULL; gboolean preserve_status = FALSE; gboolean cib_writes_enabled = TRUE; int remote_fd = 0; int remote_tls_fd = 0; void usage(const char *cmd, int exit_status); int cib_init(void); void cib_shutdown(int nsig); gboolean startCib(const char *filename); extern int write_cib_contents(gpointer p); GTRIGSource *cib_writer = NULL; GHashTable *client_list = NULL; GHashTable *config_hash = NULL; char *channel1 = NULL; char *channel2 = NULL; char *channel3 = NULL; char *channel4 = NULL; char *channel5 = NULL; #define OPTARGS "maswr:V?" void cib_cleanup(void); static void cib_enable_writes(int nsig) { crm_info("(Re)enabling disk writes"); cib_writes_enabled = TRUE; } static void cib_diskwrite_complete(gpointer userdata, int status, int signo, int exitcode) { if (exitcode != LSB_EXIT_OK || signo != 0 || status != 0) { crm_err("Disk write failed: status=%d, signo=%d, exitcode=%d", status, signo, exitcode); if (cib_writes_enabled) { crm_err("Disabling disk writes after write failure"); cib_writes_enabled = FALSE; } } else { crm_trace("Disk write passed"); } } static void log_cib_client(gpointer key, gpointer value, gpointer user_data) { cib_client_t *a_client = value; crm_info("Client %s", crm_str(a_client->name)); } int main(int argc, char **argv) { int flag; int rc = 0; int argerr = 0; #ifdef HAVE_GETOPT_H int option_index = 0; /* *INDENT-OFF* */ static struct option long_options[] = { {"per-action-cib", 0, 0, 'a'}, {"stand-alone", 0, 0, 's'}, {"disk-writes", 0, 0, 'w'}, {"cib-root", 1, 0, 'r'}, {"verbose", 0, 0, 'V'}, {"help", 0, 0, '?'}, {"metadata", 0, 0, 'm'}, {0, 0, 0, 0} }; /* *INDENT-ON* */ #endif struct passwd *pwentry = NULL; crm_log_init("cib", LOG_INFO, TRUE, FALSE, 0, NULL); mainloop_add_signal(SIGTERM, cib_shutdown); mainloop_add_signal(SIGPIPE, cib_enable_writes); cib_writer = G_main_add_tempproc_trigger(G_PRIORITY_LOW, write_cib_contents, "write_cib_contents", NULL, NULL, NULL, cib_diskwrite_complete); /* EnableProcLogging(); */ set_sigchld_proctrack(G_PRIORITY_HIGH, DEFAULT_MAXDISPATCHTIME); crm_peer_init(); client_list = g_hash_table_new(crm_str_hash, g_str_equal); while (1) { #ifdef HAVE_GETOPT_H flag = getopt_long(argc, argv, OPTARGS, long_options, &option_index); #else flag = getopt(argc, argv, OPTARGS); #endif if (flag == -1) break; switch (flag) { case 'V': crm_bump_log_level(); break; case 's': stand_alone = TRUE; preserve_status = TRUE; cib_writes_enabled = FALSE; pwentry = getpwnam(CRM_DAEMON_USER); CRM_CHECK(pwentry != NULL, crm_perror(LOG_ERR, "Invalid uid (%s) specified", CRM_DAEMON_USER); return 100); rc = setgid(pwentry->pw_gid); if (rc < 0) { crm_perror(LOG_ERR, "Could not set group to %d", pwentry->pw_gid); return 100; } rc = setuid(pwentry->pw_uid); if (rc < 0) { crm_perror(LOG_ERR, "Could not set user to %d", pwentry->pw_uid); return 100; } break; case '?': /* Help message */ usage(crm_system_name, LSB_EXIT_OK); break; case 'w': cib_writes_enabled = TRUE; break; case 'r': cib_root = optarg; break; case 'm': cib_metadata(); return 0; default: ++argerr; break; } } if (argc - optind == 1 && safe_str_eq("metadata", argv[optind])) { cib_metadata(); return 0; } if (optind > argc) { ++argerr; } if (argerr) { usage(crm_system_name, LSB_EXIT_GENERIC); } if (crm_is_writable(cib_root, NULL, CRM_DAEMON_USER, CRM_DAEMON_GROUP, FALSE) == FALSE) { crm_err("Bad permissions on %s. Terminating", cib_root); fprintf(stderr, "ERROR: Bad permissions on %s. See logs for details\n", cib_root); fflush(stderr); return 100; } /* read local config file */ rc = cib_init(); CRM_CHECK(g_hash_table_size(client_list) == 0, crm_warn("Not all clients gone at exit")); g_hash_table_foreach(client_list, log_cib_client, NULL); cib_cleanup(); #if SUPPORT_HEARTBEAT if (hb_conn) { hb_conn->llc_ops->delete(hb_conn); } #endif crm_info("Done"); return rc; } void cib_cleanup(void) { crm_peer_destroy(); g_hash_table_destroy(config_hash); g_hash_table_destroy(client_list); crm_free(cib_our_uname); #if HAVE_LIBXML2 crm_xml_cleanup(); #endif crm_free(channel1); crm_free(channel2); crm_free(channel3); crm_free(channel4); crm_free(channel5); } unsigned long cib_num_ops = 0; const char *cib_stat_interval = "10min"; unsigned long cib_num_local = 0, cib_num_updates = 0, cib_num_fail = 0; unsigned long cib_bad_connects = 0, cib_num_timeouts = 0; longclock_t cib_call_time = 0; gboolean cib_stats(gpointer data); gboolean cib_stats(gpointer data) { int local_log_level = LOG_DEBUG; static unsigned long last_stat = 0; unsigned int cib_calls_ms = 0; static unsigned long cib_stat_interval_ms = 0; if (cib_stat_interval_ms == 0) { cib_stat_interval_ms = crm_get_msec(cib_stat_interval); } cib_calls_ms = longclockto_ms(cib_call_time); if ((cib_num_ops - last_stat) > 0) { unsigned long calls_diff = cib_num_ops - last_stat; double stat_1 = (1000 * cib_calls_ms) / calls_diff; local_log_level = LOG_INFO; do_crm_log(local_log_level, "Processed %lu operations" " (%.2fus average, %lu%% utilization) in the last %s", calls_diff, stat_1, (100 * cib_calls_ms) / cib_stat_interval_ms, cib_stat_interval); } crm_trace( "\tDetail: %lu operations (%ums total)" " (%lu local, %lu updates, %lu failures," " %lu timeouts, %lu bad connects)", cib_num_ops, cib_calls_ms, cib_num_local, cib_num_updates, cib_num_fail, cib_bad_connects, cib_num_timeouts); last_stat = cib_num_ops; cib_call_time = 0; return TRUE; } #if SUPPORT_HEARTBEAT gboolean ccm_connect(void); static void ccm_connection_destroy(gpointer user_data) { crm_err("CCM connection failed... blocking while we reconnect"); CRM_ASSERT(ccm_connect()); return; } static void *ccm_library = NULL; gboolean ccm_connect(void) { gboolean did_fail = TRUE; int num_ccm_fails = 0; int max_ccm_fails = 30; int ret; int cib_ev_fd; int (*ccm_api_register) (oc_ev_t ** token) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_register"); int (*ccm_api_set_callback) (const oc_ev_t * token, oc_ev_class_t class, oc_ev_callback_t * fn, oc_ev_callback_t ** prev_fn) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_set_callback"); void (*ccm_api_special) (const oc_ev_t *, oc_ev_class_t, int) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_special"); int (*ccm_api_activate) (const oc_ev_t * token, int *fd) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_activate"); int (*ccm_api_unregister) (oc_ev_t * token) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_unregister"); while (did_fail) { did_fail = FALSE; crm_info("Registering with CCM..."); ret = (*ccm_api_register) (&cib_ev_token); if (ret != 0) { did_fail = TRUE; } if (did_fail == FALSE) { crm_trace("Setting up CCM callbacks"); ret = (*ccm_api_set_callback) (cib_ev_token, OC_EV_MEMB_CLASS, cib_ccm_msg_callback, NULL); if (ret != 0) { crm_warn("CCM callback not set"); did_fail = TRUE; } } if (did_fail == FALSE) { (*ccm_api_special) (cib_ev_token, OC_EV_MEMB_CLASS, 0); crm_trace("Activating CCM token"); ret = (*ccm_api_activate) (cib_ev_token, &cib_ev_fd); if (ret != 0) { crm_warn("CCM Activation failed"); did_fail = TRUE; } } if (did_fail) { num_ccm_fails++; (*ccm_api_unregister) (cib_ev_token); if (num_ccm_fails < max_ccm_fails) { crm_warn("CCM Connection failed %d times (%d max)", num_ccm_fails, max_ccm_fails); sleep(3); } else { crm_err("CCM Activation failed %d (max) times", num_ccm_fails); return FALSE; } } } crm_debug("CCM Activation passed... all set to go!"); G_main_add_fd(G_PRIORITY_HIGH, cib_ev_fd, FALSE, cib_ccm_dispatch, cib_ev_token, ccm_connection_destroy); return TRUE; } #endif #if SUPPORT_COROSYNC static gboolean cib_ais_dispatch(AIS_Message * wrapper, char *data, int sender) { xmlNode *xml = NULL; if (wrapper->header.id == crm_class_cluster) { xml = string2xml(data); if (xml == NULL) { goto bail; } crm_xml_add(xml, F_ORIG, wrapper->sender.uname); crm_xml_add_int(xml, F_SEQ, wrapper->id); cib_peer_callback(xml, NULL); } free_xml(xml); return TRUE; bail: crm_err("Invalid XML: '%.120s'", data); return TRUE; } static void cib_ais_destroy(gpointer user_data) { if (cib_shutdown_flag) { crm_info("Corosync disconnection complete"); } else { crm_err("Corosync connection lost! Exiting."); terminate_cib(__FUNCTION__, TRUE); } } #endif static void cib_peer_update_callback(enum crm_status_type type, crm_node_t * node, const void *data) { #if 0 /* crm_active_peers(crm_proc_cib) appears to give the wrong answer * sometimes, this might help figure out why */ if(type == crm_status_nstate) { crm_info("status: %s is now %s (was %s)", node->uname, node->state, (const char *)data); if (safe_str_eq(CRMD_STATE_ACTIVE, node->state)) { return; } } else if(type == crm_status_processes) { uint32_t old = 0; if (data) { old = *(const uint32_t *)data; } if ((node->processes ^ old) & crm_proc_cib) { crm_info("status: cib process on %s is now %sactive", node->uname, is_set(node->processes, crm_proc_cib)?"":"in"); } else { return; } } else { return; } #endif if(cib_shutdown_flag && crm_active_peers() < 2 && g_hash_table_size(client_list) == 0) { crm_info("No more peers"); terminate_cib(__FUNCTION__, FALSE); } } static void cib_ha_connection_destroy(gpointer user_data) { if (cib_shutdown_flag) { crm_info("Heartbeat disconnection complete... exiting"); terminate_cib(__FUNCTION__, FALSE); } else { crm_err("Heartbeat connection lost! Exiting."); terminate_cib(__FUNCTION__, TRUE); } } int cib_init(void) { gboolean was_error = FALSE; config_hash = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); if (startCib("cib.xml") == FALSE) { crm_crit("Cannot start CIB... terminating"); exit(1); } if (stand_alone == FALSE) { void *dispatch = cib_ha_peer_callback; void *destroy = cib_ha_connection_destroy; if (is_openais_cluster()) { #if SUPPORT_COROSYNC destroy = cib_ais_destroy; dispatch = cib_ais_dispatch; #endif } if (crm_cluster_connect(&cib_our_uname, NULL, dispatch, destroy, #if SUPPORT_HEARTBEAT &hb_conn #else NULL #endif ) == FALSE) { crm_crit("Cannot sign in to the cluster... terminating"); exit(100); } if (is_openais_cluster()) { crm_set_status_callback(&cib_peer_update_callback); } #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { if (was_error == FALSE) { if (HA_OK != hb_conn->llc_ops->set_cstatus_callback(hb_conn, cib_client_status_callback, hb_conn)) { crm_err("Cannot set cstatus callback: %s", hb_conn->llc_ops->errmsg(hb_conn)); was_error = TRUE; } } if (was_error == FALSE) { was_error = (ccm_connect() == FALSE); } if (was_error == FALSE) { /* Async get client status information in the cluster */ crm_info("Requesting the list of configured nodes"); hb_conn->llc_ops->client_status(hb_conn, NULL, CRM_SYSTEM_CIB, -1); } } #endif } else { cib_our_uname = crm_strdup("localhost"); } - ipcs_ro = mainloop_add_ipc_server(cib_channel_ro, QB_IPC_SOCKET, &ipc_ro_callbacks); - ipcs_rw = mainloop_add_ipc_server(cib_channel_rw, QB_IPC_SOCKET, &ipc_rw_callbacks); + ipcs_ro = mainloop_add_ipc_server(cib_channel_ro, QB_IPC_NATIVE, &ipc_ro_callbacks); + ipcs_rw = mainloop_add_ipc_server(cib_channel_rw, QB_IPC_NATIVE, &ipc_rw_callbacks); if (stand_alone) { if (was_error) { crm_err("Couldnt start"); return 1; } cib_is_master = TRUE; /* Create the mainloop and run it... */ mainloop = g_main_new(FALSE); crm_info("Starting %s mainloop", crm_system_name); g_main_run(mainloop); return 0; } if (was_error == FALSE) { /* Create the mainloop and run it... */ mainloop = g_main_new(FALSE); crm_info("Starting %s mainloop", crm_system_name); g_timeout_add(crm_get_msec(cib_stat_interval), cib_stats, NULL); g_main_run(mainloop); } else { crm_err("Couldnt start all communication channels, exiting."); } return 0; } void usage(const char *cmd, int exit_status) { FILE *stream; stream = exit_status ? stderr : stdout; fprintf(stream, "usage: %s [-%s]\n", cmd, OPTARGS); fprintf(stream, "\t--%s (-%c)\t\tTurn on debug info." " Additional instances increase verbosity\n", "verbose", 'V'); fprintf(stream, "\t--%s (-%c)\t\tThis help message\n", "help", '?'); fprintf(stream, "\t--%s (-%c)\t\tShow configurable cib options\n", "metadata", 'm'); fprintf(stream, "\t--%s (-%c)\tAdvanced use only\n", "per-action-cib", 'a'); fprintf(stream, "\t--%s (-%c)\tAdvanced use only\n", "stand-alone", 's'); fprintf(stream, "\t--%s (-%c)\tAdvanced use only\n", "disk-writes", 'w'); fprintf(stream, "\t--%s (-%c)\t\tAdvanced use only\n", "cib-root", 'r'); fflush(stream); exit(exit_status); } gboolean startCib(const char *filename) { gboolean active = FALSE; xmlNode *cib = readCibXmlFile(cib_root, filename, !preserve_status); CRM_ASSERT(cib != NULL); if (activateCibXml(cib, TRUE, "start") == 0) { int port = 0; const char *port_s = NULL; active = TRUE; cib_read_config(config_hash, cib); port_s = crm_element_value(cib, "remote-tls-port"); if (port_s) { port = crm_parse_int(port_s, "0"); remote_tls_fd = init_remote_listener(port, TRUE); } port_s = crm_element_value(cib, "remote-clear-port"); if (port_s) { port = crm_parse_int(port_s, "0"); remote_fd = init_remote_listener(port, FALSE); } crm_info("CIB Initialization completed successfully"); } return active; } diff --git a/crmd/control.c b/crmd/control.c index c8536f96ff..1fb46cc1bb 100644 --- a/crmd/control.c +++ b/crmd/control.c @@ -1,993 +1,993 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include "../lib/cluster/stack.h" #include #include #include #include #include #include #include #include #include qb_ipcs_service_t *ipcs = NULL; extern gboolean crm_connect_corosync(void); extern void crmd_ha_connection_destroy(gpointer user_data); void crm_shutdown(int nsig); gboolean crm_read_options(gpointer user_data); gboolean fsa_has_quorum = FALSE; GHashTable *ipc_clients = NULL; crm_trigger_t *fsa_source = NULL; crm_trigger_t *config_read = NULL; /* A_HA_CONNECT */ void do_ha_control(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { gboolean registered = FALSE; if (action & A_HA_DISCONNECT) { if (is_openais_cluster()) { crm_peer_destroy(); #if SUPPORT_COROSYNC terminate_ais_connection(); #endif crm_info("Disconnected from OpenAIS"); #if SUPPORT_HEARTBEAT } else if (fsa_cluster_conn != NULL) { set_bit_inplace(fsa_input_register, R_HA_DISCONNECTED); fsa_cluster_conn->llc_ops->signoff(fsa_cluster_conn, FALSE); crm_info("Disconnected from Heartbeat"); #endif } } if (action & A_HA_CONNECT) { crm_set_status_callback(&peer_update_callback); if (is_openais_cluster()) { #if SUPPORT_COROSYNC registered = crm_connect_corosync(); #endif } else if (is_heartbeat_cluster()) { #if SUPPORT_HEARTBEAT registered = crm_cluster_connect(&fsa_our_uname, &fsa_our_uuid, crmd_ha_msg_callback, crmd_ha_connection_destroy, &fsa_cluster_conn); #endif } #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { crm_trace("Be informed of Node Status changes"); if (registered && fsa_cluster_conn->llc_ops->set_nstatus_callback(fsa_cluster_conn, crmd_ha_status_callback, fsa_cluster_conn) != HA_OK) { crm_err("Cannot set nstatus callback: %s", fsa_cluster_conn->llc_ops->errmsg(fsa_cluster_conn)); registered = FALSE; } crm_trace("Be informed of CRM Client Status changes"); if (registered && fsa_cluster_conn->llc_ops->set_cstatus_callback(fsa_cluster_conn, crmd_client_status_callback, fsa_cluster_conn) != HA_OK) { crm_err("Cannot set cstatus callback: %s", fsa_cluster_conn->llc_ops->errmsg(fsa_cluster_conn)); registered = FALSE; } if (registered) { crm_trace("Requesting an initial dump of CRMD client_status"); fsa_cluster_conn->llc_ops->client_status(fsa_cluster_conn, NULL, CRM_SYSTEM_CRMD, -1); } } #endif if (registered == FALSE) { set_bit_inplace(fsa_input_register, R_HA_DISCONNECTED); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); return; } clear_bit_inplace(fsa_input_register, R_HA_DISCONNECTED); crm_info("Connected to the cluster"); } if (action & ~(A_HA_CONNECT | A_HA_DISCONNECT)) { crm_err("Unexpected action %s in %s", fsa_action2string(action), __FUNCTION__); } } /* A_SHUTDOWN */ void do_shutdown(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { /* just in case */ set_bit_inplace(fsa_input_register, R_SHUTDOWN); if (is_heartbeat_cluster()) { if (is_set(fsa_input_register, pe_subsystem->flag_connected)) { crm_info("Terminating the %s", pe_subsystem->name); if (stop_subsystem(pe_subsystem, TRUE) == FALSE) { /* its gone... */ crm_err("Faking %s exit", pe_subsystem->name); clear_bit_inplace(fsa_input_register, pe_subsystem->flag_connected); } else { crm_info("Waiting for subsystems to exit"); crmd_fsa_stall(NULL); } } crm_info("All subsystems stopped, continuing"); } if (stonith_api) { /* Prevent it from comming up again */ clear_bit_inplace(fsa_input_register, R_ST_REQUIRED); crm_info("Disconnecting STONITH..."); stonith_api->cmds->disconnect(stonith_api); } } /* A_SHUTDOWN_REQ */ void do_shutdown_req(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { xmlNode *msg = NULL; crm_info("Sending shutdown request to %s", crm_str(fsa_our_dc)); msg = create_request(CRM_OP_SHUTDOWN_REQ, NULL, NULL, CRM_SYSTEM_DC, CRM_SYSTEM_CRMD, NULL); /* set_bit_inplace(fsa_input_register, R_STAYDOWN); */ if (send_cluster_message(NULL, crm_msg_crmd, msg, TRUE) == FALSE) { register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } free_xml(msg); } extern char *max_generation_from; extern xmlNode *max_generation_xml; extern GHashTable *resource_history; extern GHashTable *voted; extern GHashTable *reload_hash; void log_connected_client(gpointer key, gpointer value, gpointer user_data); void log_connected_client(gpointer key, gpointer value, gpointer user_data) { crmd_client_t *client = value; crm_err("%s is still connected at exit", client->table_key); } static void free_mem(fsa_data_t * msg_data) { g_main_loop_quit(crmd_mainloop); g_main_loop_unref(crmd_mainloop); #if SUPPORT_HEARTBEAT if (fsa_cluster_conn) { fsa_cluster_conn->llc_ops->delete(fsa_cluster_conn); fsa_cluster_conn = NULL; } #endif slist_destroy(fsa_data_t, fsa_data, fsa_message_queue, crm_info("Dropping %s: [ state=%s cause=%s origin=%s ]", fsa_input2string(fsa_data->fsa_input), fsa_state2string(fsa_state), fsa_cause2string(fsa_data->fsa_cause), fsa_data->origin); delete_fsa_input(fsa_data); ); delete_fsa_input(msg_data); if (ipc_clients) { crm_debug("Number of connected clients: %d", g_hash_table_size(ipc_clients)); /* g_hash_table_foreach(ipc_clients, log_connected_client, NULL); */ g_hash_table_destroy(ipc_clients); } empty_uuid_cache(); crm_peer_destroy(); clear_bit_inplace(fsa_input_register, R_MEMBERSHIP); if (te_subsystem->client && te_subsystem->client->ipc) { crm_debug("Full destroy: TE"); qb_ipcs_disconnect(te_subsystem->client->ipc); } crm_free(te_subsystem); if (pe_subsystem->client && pe_subsystem->client->ipc) { crm_debug("Full destroy: PE"); qb_ipcs_disconnect(pe_subsystem->client->ipc); } crm_free(pe_subsystem); crm_free(cib_subsystem); if (integrated_nodes) { g_hash_table_destroy(integrated_nodes); } if (finalized_nodes) { g_hash_table_destroy(finalized_nodes); } if (confirmed_nodes) { g_hash_table_destroy(confirmed_nodes); } if (reload_hash) { g_hash_table_destroy(reload_hash); } if (resource_history) { g_hash_table_destroy(resource_history); } if (voted) { g_hash_table_destroy(voted); } cib_delete(fsa_cib_conn); fsa_cib_conn = NULL; if (fsa_lrm_conn) { fsa_lrm_conn->lrm_ops->delete(fsa_lrm_conn); } crm_free(transition_timer); crm_free(integration_timer); crm_free(finalization_timer); crm_free(election_trigger); crm_free(election_timeout); crm_free(shutdown_escalation_timer); crm_free(wait_timer); crm_free(recheck_timer); crm_free(fsa_our_dc_version); crm_free(fsa_our_uname); crm_free(fsa_our_uuid); crm_free(fsa_our_dc); crm_free(max_generation_from); free_xml(max_generation_xml); crm_xml_cleanup(); } /* A_EXIT_0, A_EXIT_1 */ void do_exit(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { int exit_code = 0; int log_level = LOG_INFO; const char *exit_type = "gracefully"; if (action & A_EXIT_1) { exit_code = 1; log_level = LOG_ERR; exit_type = "forcefully"; } verify_stopped(cur_state, LOG_ERR); do_crm_log(log_level, "Performing %s - %s exiting the CRMd", fsa_action2string(action), exit_type); if (is_set(fsa_input_register, R_IN_RECOVERY)) { crm_err("Could not recover from internal error"); exit_code = 2; } if (is_set(fsa_input_register, R_STAYDOWN)) { crm_warn("Inhibiting respawn by Heartbeat"); exit_code = 100; } free_mem(msg_data); crm_info("[%s] stopped (%d)", crm_system_name, exit_code); cl_flush_logs(); exit(exit_code); } /* A_STARTUP */ void do_startup(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { int was_error = 0; int interval = 1; /* seconds between DC heartbeats */ crm_debug("Registering Signal Handlers"); mainloop_add_signal(SIGTERM, crm_shutdown); fsa_source = mainloop_add_trigger(G_PRIORITY_HIGH, crm_fsa_trigger, NULL); config_read = mainloop_add_trigger(G_PRIORITY_HIGH, crm_read_options, NULL); ipc_clients = g_hash_table_new(crm_str_hash, g_str_equal); crm_debug("Creating CIB and LRM objects"); fsa_cib_conn = cib_new(); fsa_lrm_conn = ll_lrm_new(XML_CIB_TAG_LRM); /* set up the timers */ crm_malloc0(transition_timer, sizeof(fsa_timer_t)); crm_malloc0(integration_timer, sizeof(fsa_timer_t)); crm_malloc0(finalization_timer, sizeof(fsa_timer_t)); crm_malloc0(election_trigger, sizeof(fsa_timer_t)); crm_malloc0(election_timeout, sizeof(fsa_timer_t)); crm_malloc0(shutdown_escalation_timer, sizeof(fsa_timer_t)); crm_malloc0(wait_timer, sizeof(fsa_timer_t)); crm_malloc0(recheck_timer, sizeof(fsa_timer_t)); interval = interval * 1000; if (election_trigger != NULL) { election_trigger->source_id = 0; election_trigger->period_ms = -1; election_trigger->fsa_input = I_DC_TIMEOUT; election_trigger->callback = crm_timer_popped; election_trigger->repeat = FALSE; } else { was_error = TRUE; } if (election_timeout != NULL) { election_timeout->source_id = 0; election_timeout->period_ms = -1; election_timeout->fsa_input = I_ELECTION_DC; election_timeout->callback = crm_timer_popped; election_timeout->repeat = FALSE; } else { was_error = TRUE; } if (transition_timer != NULL) { transition_timer->source_id = 0; transition_timer->period_ms = -1; transition_timer->fsa_input = I_PE_CALC; transition_timer->callback = crm_timer_popped; transition_timer->repeat = FALSE; } else { was_error = TRUE; } if (integration_timer != NULL) { integration_timer->source_id = 0; integration_timer->period_ms = -1; integration_timer->fsa_input = I_INTEGRATED; integration_timer->callback = crm_timer_popped; integration_timer->repeat = FALSE; } else { was_error = TRUE; } if (finalization_timer != NULL) { finalization_timer->source_id = 0; finalization_timer->period_ms = -1; finalization_timer->fsa_input = I_FINALIZED; finalization_timer->callback = crm_timer_popped; finalization_timer->repeat = FALSE; /* for possible enabling... a bug in the join protocol left * a slave in S_PENDING while we think its in S_NOT_DC * * raising I_FINALIZED put us into a transition loop which is * never resolved. * in this loop we continually send probes which the node * NACK's because its in S_PENDING * * if we have nodes where heartbeat is active but the * CRM is not... then this will be handled in the * integration phase */ finalization_timer->fsa_input = I_ELECTION; } else { was_error = TRUE; } if (shutdown_escalation_timer != NULL) { shutdown_escalation_timer->source_id = 0; shutdown_escalation_timer->period_ms = -1; shutdown_escalation_timer->fsa_input = I_STOP; shutdown_escalation_timer->callback = crm_timer_popped; shutdown_escalation_timer->repeat = FALSE; } else { was_error = TRUE; } if (wait_timer != NULL) { wait_timer->source_id = 0; wait_timer->period_ms = 2000; wait_timer->fsa_input = I_NULL; wait_timer->callback = crm_timer_popped; wait_timer->repeat = FALSE; } else { was_error = TRUE; } if (recheck_timer != NULL) { recheck_timer->source_id = 0; recheck_timer->period_ms = -1; recheck_timer->fsa_input = I_PE_CALC; recheck_timer->callback = crm_timer_popped; recheck_timer->repeat = FALSE; } else { was_error = TRUE; } /* set up the sub systems */ crm_malloc0(cib_subsystem, sizeof(struct crm_subsystem_s)); crm_malloc0(te_subsystem, sizeof(struct crm_subsystem_s)); crm_malloc0(pe_subsystem, sizeof(struct crm_subsystem_s)); if (cib_subsystem != NULL) { cib_subsystem->pid = -1; cib_subsystem->name = CRM_SYSTEM_CIB; cib_subsystem->flag_connected = R_CIB_CONNECTED; cib_subsystem->flag_required = R_CIB_REQUIRED; } else { was_error = TRUE; } if (te_subsystem != NULL) { te_subsystem->pid = -1; te_subsystem->name = CRM_SYSTEM_TENGINE; te_subsystem->flag_connected = R_TE_CONNECTED; te_subsystem->flag_required = R_TE_REQUIRED; } else { was_error = TRUE; } if (pe_subsystem != NULL) { pe_subsystem->pid = -1; pe_subsystem->path = CRM_DAEMON_DIR; pe_subsystem->name = CRM_SYSTEM_PENGINE; pe_subsystem->command = CRM_DAEMON_DIR "/" CRM_SYSTEM_PENGINE; pe_subsystem->args = NULL; pe_subsystem->flag_connected = R_PE_CONNECTED; pe_subsystem->flag_required = R_PE_REQUIRED; } else { was_error = TRUE; } if (was_error == FALSE && is_heartbeat_cluster()) { if(start_subsystem(pe_subsystem) == FALSE) { was_error = TRUE; } } if (was_error) { register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } welcomed_nodes = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); integrated_nodes = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); finalized_nodes = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); confirmed_nodes = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); set_sigchld_proctrack(G_PRIORITY_HIGH, DEFAULT_MAXDISPATCHTIME); } static int32_t crmd_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { crm_trace("Connecting %p for uid=%d gid=%d", c, uid, gid); return 0; } static void crmd_ipc_created(qb_ipcs_connection_t *c) { crmd_client_t *blank_client = NULL; crm_malloc0(blank_client, sizeof(crmd_client_t)); CRM_ASSERT(blank_client != NULL); crm_trace("Created client: %p", blank_client); blank_client->ipc = c; blank_client->sub_sys = NULL; blank_client->uuid = NULL; blank_client->table_key = NULL; qb_ipcs_context_set(c, blank_client); } static int32_t crmd_ipc_dispatch(qb_ipcs_connection_t *c, void *data, size_t size) { crmd_client_t *client = qb_ipcs_context_get(c); xmlNode *msg = crm_ipcs_recv(c, data, size); xmlNode *ack = create_xml_node(NULL, "ack"); crm_trace("Invoked: %s", client->table_key); crm_ipcs_send(c, ack, FALSE); /* TODO: Do not unconditionally send this */ free_xml(ack); if (msg == NULL) { return 0; } #if ENABLE_ACL determine_request_user(&client->user, client, msg, F_CRM_USER); #endif crm_trace("Processing msg from %s", client->table_key); crm_log_xml_trace(msg, "CRMd[inbound]"); if (crmd_authorize_message(msg, client)) { route_message(C_IPC_MESSAGE, msg); } trigger_fsa(fsa_source); free_xml(msg); return 0; } static int32_t crmd_ipc_closed(qb_ipcs_connection_t *c) { return 0; } static void crmd_ipc_destroy(qb_ipcs_connection_t *c) { crmd_client_t *client = qb_ipcs_context_get(c); if (client == NULL) { crm_trace("No client to delete"); return; } process_client_disconnect(client); crm_trace("Disconnecting client %s (%p)", client->table_key, client); crm_free(client->table_key); crm_free(client->sub_sys); crm_free(client->uuid); crm_free(client->user); crm_free(client); trigger_fsa(fsa_source); } /* A_STOP */ void do_stop(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { if (is_heartbeat_cluster()) { stop_subsystem(pe_subsystem, FALSE); } mainloop_del_ipc_server(ipcs); register_fsa_input(C_FSA_INTERNAL, I_TERMINATE, NULL); } /* A_STARTED */ void do_started(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { static struct qb_ipcs_service_handlers crmd_callbacks = { .connection_accept = crmd_ipc_accept, .connection_created = crmd_ipc_created, .msg_process = crmd_ipc_dispatch, .connection_closed = crmd_ipc_closed, .connection_destroyed = crmd_ipc_destroy }; if (cur_state != S_STARTING) { crm_err("Start cancelled... %s", fsa_state2string(cur_state)); return; } else if (is_set(fsa_input_register, R_MEMBERSHIP) == FALSE) { crm_info("Delaying start, no membership data (%.16llx)", R_MEMBERSHIP); crmd_fsa_stall(NULL); return; } else if (is_set(fsa_input_register, R_LRM_CONNECTED) == FALSE) { crm_info("Delaying start, LRM not connected (%.16llx)", R_LRM_CONNECTED); crmd_fsa_stall(NULL); return; } else if (is_set(fsa_input_register, R_CIB_CONNECTED) == FALSE) { crm_info("Delaying start, CIB not connected (%.16llx)", R_CIB_CONNECTED); crmd_fsa_stall(NULL); return; } else if (is_set(fsa_input_register, R_READ_CONFIG) == FALSE) { crm_info("Delaying start, Config not read (%.16llx)", R_READ_CONFIG); crmd_fsa_stall(NULL); return; } else if (is_set(fsa_input_register, R_PEER_DATA) == FALSE) { HA_Message *msg = NULL; /* try reading from HA */ crm_info("Delaying start, No peer data (%.16llx)", R_PEER_DATA); crm_trace("Looking for a HA message"); #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { msg = fsa_cluster_conn->llc_ops->readmsg(fsa_cluster_conn, 0); } #endif if (msg != NULL) { crm_trace("There was a HA message"); crm_msg_del(msg); } crmd_fsa_stall(NULL); return; } crm_debug("Init server comms"); - ipcs = mainloop_add_ipc_server(CRM_SYSTEM_CRMD, QB_IPC_SOCKET, &crmd_callbacks); + ipcs = mainloop_add_ipc_server(CRM_SYSTEM_CRMD, QB_IPC_NATIVE, &crmd_callbacks); if (ipcs == NULL) { crm_err("Couldn't start IPC server"); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } if (stonith_reconnect == NULL) { int dummy; stonith_reconnect = mainloop_add_trigger(G_PRIORITY_LOW, te_connect_stonith, &dummy); } set_bit_inplace(fsa_input_register, R_ST_REQUIRED); mainloop_set_trigger(stonith_reconnect); crm_notice("The local CRM is operational"); clear_bit_inplace(fsa_input_register, R_STARTING); register_fsa_input(msg_data->fsa_cause, I_PENDING, NULL); } /* A_RECOVER */ void do_recover(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { set_bit_inplace(fsa_input_register, R_IN_RECOVERY); crm_err("Action %s (%.16llx) not supported", fsa_action2string(action), action); register_fsa_input(C_FSA_INTERNAL, I_TERMINATE, NULL); } /* *INDENT-OFF* */ pe_cluster_option crmd_opts[] = { /* name, old-name, validate, default, description */ { "dc-version", NULL, "string", NULL, "none", NULL, "Version of Pacemaker on the cluster's DC.", "Includes the hash which identifies the exact Mercurial changeset it was built from. Used for diagnostic purposes." }, { "cluster-infrastructure", NULL, "string", NULL, "heartbeat", NULL, "The messaging stack on which Pacemaker is currently running.", "Used for informational and diagnostic purposes." }, { XML_CONFIG_ATTR_DC_DEADTIME, "dc_deadtime", "time", NULL, "20s", &check_time, "How long to wait for a response from other nodes during startup.", "The \"correct\" value will depend on the speed/load of your network and the type of switches used." }, { XML_CONFIG_ATTR_RECHECK, "cluster_recheck_interval", "time", "Zero disables polling. Positive values are an interval in seconds (unless other SI units are specified. eg. 5min)", "15min", &check_timer, "Polling interval for time based changes to options, resource parameters and constraints.", "The Cluster is primarily event driven, however the configuration can have elements that change based on time." " To ensure these changes take effect, we can optionally poll the cluster's status for changes." }, { XML_CONFIG_ATTR_ELECTION_FAIL, "election_timeout", "time", NULL, "2min", &check_timer, "*** Advanced Use Only ***.", "If need to adjust this value, it probably indicates the presence of a bug." }, { XML_CONFIG_ATTR_FORCE_QUIT, "shutdown_escalation", "time", NULL, "20min", &check_timer, "*** Advanced Use Only ***.", "If need to adjust this value, it probably indicates the presence of a bug." }, { "crmd-integration-timeout", NULL, "time", NULL, "3min", &check_timer, "*** Advanced Use Only ***.", "If need to adjust this value, it probably indicates the presence of a bug." }, { "crmd-finalization-timeout", NULL, "time", NULL, "30min", &check_timer, "*** Advanced Use Only ***.", "If you need to adjust this value, it probably indicates the presence of a bug." }, { "crmd-transition-delay", NULL, "time", NULL, "0s", &check_timer, "*** Advanced Use Only ***\nEnabling this option will slow down cluster recovery under all conditions", "Delay cluster recovery for the configured interval to allow for additional/related events to occur.\nUseful if your configuration is sensitive to the order in which ping updates arrive." }, { XML_ATTR_EXPECTED_VOTES, NULL, "integer", NULL, "2", &check_number, "The number of nodes expected to be in the cluster", "Used to calculate quorum in openais based clusters." }, }; /* *INDENT-ON* */ void crmd_metadata(void) { config_metadata("CRM Daemon", "1.0", "CRM Daemon Options", "This is a fake resource that details the options that can be configured for the CRM Daemon.", crmd_opts, DIMOF(crmd_opts)); } static void verify_crmd_options(GHashTable * options) { verify_all_options(options, crmd_opts, DIMOF(crmd_opts)); } static const char * crmd_pref(GHashTable * options, const char *name) { return get_cluster_pref(options, crmd_opts, DIMOF(crmd_opts), name); } static void config_query_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void *user_data) { const char *value = NULL; GHashTable *config_hash = NULL; ha_time_t *now = new_ha_date(TRUE); if (rc != cib_ok) { fsa_data_t *msg_data = NULL; crm_err("Local CIB query resulted in an error: %s", cib_error2string(rc)); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); if (rc == cib_bad_permissions || rc == cib_dtd_validation || rc == cib_bad_digest || rc == cib_bad_config) { crm_err("The cluster is mis-configured - shutting down and staying down"); set_bit_inplace(fsa_input_register, R_STAYDOWN); } goto bail; } crm_debug("Call %d : Parsing CIB options", call_id); config_hash = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); unpack_instance_attributes(output, output, XML_CIB_TAG_PROPSET, NULL, config_hash, CIB_OPTIONS_FIRST, FALSE, now); verify_crmd_options(config_hash); value = crmd_pref(config_hash, XML_CONFIG_ATTR_DC_DEADTIME); election_trigger->period_ms = crm_get_msec(value); value = crmd_pref(config_hash, XML_CONFIG_ATTR_FORCE_QUIT); shutdown_escalation_timer->period_ms = crm_get_msec(value); crm_debug("Shutdown escalation occurs after: %dms", shutdown_escalation_timer->period_ms); value = crmd_pref(config_hash, XML_CONFIG_ATTR_ELECTION_FAIL); election_timeout->period_ms = crm_get_msec(value); value = crmd_pref(config_hash, XML_CONFIG_ATTR_RECHECK); recheck_timer->period_ms = crm_get_msec(value); crm_debug("Checking for expired actions every %dms", recheck_timer->period_ms); value = crmd_pref(config_hash, "crmd-transition-delay"); transition_timer->period_ms = crm_get_msec(value); value = crmd_pref(config_hash, "crmd-integration-timeout"); integration_timer->period_ms = crm_get_msec(value); value = crmd_pref(config_hash, "crmd-finalization-timeout"); finalization_timer->period_ms = crm_get_msec(value); #if SUPPORT_COROSYNC if (is_classic_ais_cluster()) { value = crmd_pref(config_hash, XML_ATTR_EXPECTED_VOTES); crm_debug("Sending expected-votes=%s to corosync", value); send_ais_text(crm_class_quorum, value, TRUE, NULL, crm_msg_ais); } #endif set_bit_inplace(fsa_input_register, R_READ_CONFIG); crm_trace("Triggering FSA: %s", __FUNCTION__); mainloop_set_trigger(fsa_source); g_hash_table_destroy(config_hash); bail: free_ha_date(now); } gboolean crm_read_options(gpointer user_data) { int call_id = fsa_cib_conn->cmds->query(fsa_cib_conn, XML_CIB_TAG_CRMCONFIG, NULL, cib_scope_local); add_cib_op_callback(fsa_cib_conn, call_id, FALSE, NULL, config_query_callback); crm_trace("Querying the CIB... call %d", call_id); return TRUE; } /* A_READCONFIG */ void do_read_config(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { mainloop_set_trigger(config_read); } void crm_shutdown(int nsig) { if (crmd_mainloop != NULL && g_main_is_running(crmd_mainloop)) { if (is_set(fsa_input_register, R_SHUTDOWN)) { crm_err("Escalating the shutdown"); register_fsa_input_before(C_SHUTDOWN, I_ERROR, NULL); } else { set_bit_inplace(fsa_input_register, R_SHUTDOWN); register_fsa_input(C_SHUTDOWN, I_SHUTDOWN, NULL); if (shutdown_escalation_timer->period_ms < 1) { const char *value = crmd_pref(NULL, XML_CONFIG_ATTR_FORCE_QUIT); int msec = crm_get_msec(value); crm_debug("Using default shutdown escalation: %dms", msec); shutdown_escalation_timer->period_ms = msec; } /* cant rely on this... */ crm_notice("Requesting shutdown, upper limit is %dms", shutdown_escalation_timer->period_ms); crm_timer_start(shutdown_escalation_timer); } } else { crm_info("exit from shutdown"); exit(LSB_EXIT_OK); } } static void default_cib_update_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void *user_data) { if (rc != cib_ok) { fsa_data_t *msg_data = NULL; crm_err("CIB Update failed: %s", cib_error2string(rc)); crm_log_xml_warn(output, "update:failed"); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } } #if SUPPORT_HEARTBEAT static void populate_cib_nodes_ha(gboolean with_client_status) { int call_id = 0; const char *ha_node = NULL; xmlNode *cib_node_list = NULL; if (fsa_cluster_conn == NULL) { crm_debug("Not connected"); return; } /* Async get client status information in the cluster */ crm_info("Requesting the list of configured nodes"); fsa_cluster_conn->llc_ops->init_nodewalk(fsa_cluster_conn); cib_node_list = create_xml_node(NULL, XML_CIB_TAG_NODES); do { const char *ha_node_type = NULL; const char *ha_node_uuid = NULL; xmlNode *cib_new_node = NULL; ha_node = fsa_cluster_conn->llc_ops->nextnode(fsa_cluster_conn); if (ha_node == NULL) { continue; } ha_node_type = fsa_cluster_conn->llc_ops->node_type(fsa_cluster_conn, ha_node); if (safe_str_neq(NORMALNODE, ha_node_type)) { crm_debug("Node %s: skipping '%s'", ha_node, ha_node_type); continue; } ha_node_uuid = get_uuid(ha_node); if (ha_node_uuid == NULL) { crm_warn("Node %s: no uuid found", ha_node); continue; } crm_debug("Node: %s (uuid: %s)", ha_node, ha_node_uuid); cib_new_node = create_xml_node(cib_node_list, XML_CIB_TAG_NODE); crm_xml_add(cib_new_node, XML_ATTR_ID, ha_node_uuid); crm_xml_add(cib_new_node, XML_ATTR_UNAME, ha_node); crm_xml_add(cib_new_node, XML_ATTR_TYPE, ha_node_type); } while (ha_node != NULL); fsa_cluster_conn->llc_ops->end_nodewalk(fsa_cluster_conn); /* Now update the CIB with the list of nodes */ fsa_cib_update(XML_CIB_TAG_NODES, cib_node_list, cib_scope_local | cib_quorum_override, call_id, NULL); add_cib_op_callback(fsa_cib_conn, call_id, FALSE, NULL, default_cib_update_callback); free_xml(cib_node_list); crm_trace("Complete"); } #endif static void create_cib_node_definition(gpointer key, gpointer value, gpointer user_data) { crm_node_t *node = value; xmlNode *cib_nodes = user_data; xmlNode *cib_new_node = NULL; crm_trace("Creating node entry for %s/%s", node->uname, node->uuid); cib_new_node = create_xml_node(cib_nodes, XML_CIB_TAG_NODE); crm_xml_add(cib_new_node, XML_ATTR_ID, node->uuid); crm_xml_add(cib_new_node, XML_ATTR_UNAME, node->uname); crm_xml_add(cib_new_node, XML_ATTR_TYPE, NORMALNODE); } void populate_cib_nodes(gboolean with_client_status) { int call_id = 0; xmlNode *cib_node_list = NULL; #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { populate_cib_nodes_ha(with_client_status); return; } #endif cib_node_list = create_xml_node(NULL, XML_CIB_TAG_NODES); g_hash_table_foreach(crm_peer_cache, create_cib_node_definition, cib_node_list); fsa_cib_update(XML_CIB_TAG_NODES, cib_node_list, cib_scope_local | cib_quorum_override, call_id, NULL); add_cib_op_callback(fsa_cib_conn, call_id, FALSE, NULL, default_cib_update_callback); free_xml(cib_node_list); crm_trace("Complete"); } diff --git a/fencing/main.c b/fencing/main.c index 7e178b12dc..70d1b2097f 100644 --- a/fencing/main.c +++ b/fencing/main.c @@ -1,832 +1,832 @@ /* * Copyright (C) 2009 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include char *channel1 = NULL; char *channel2 = NULL; char *stonith_our_uname = NULL; GMainLoop *mainloop = NULL; GHashTable *client_list = NULL; gboolean stand_alone = FALSE; gboolean stonith_shutdown_flag = FALSE; #if SUPPORT_HEARTBEAT ll_cluster_t *hb_conn = NULL; #endif static int32_t st_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { crm_trace("Connecting %p for uid=%d gid=%d", c, uid, gid); if(stonith_shutdown_flag) { crm_info("Ignoring new client [%d] during shutdown", crm_ipcs_client_pid(c)); return -EPERM; } return 0; } static void st_ipc_created(qb_ipcs_connection_t *c) { cl_uuid_t client_id; stonith_client_t *new_client = NULL; char uuid_str[UU_UNPARSE_SIZEOF]; #if 0 struct qb_ipcs_stats srv_stats; qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE); qb_log(LOG_INFO, "Connection created (active:%d, closed:%d)", srv_stats.active_connections, srv_stats.closed_connections); #endif crm_malloc0(new_client, sizeof(stonith_client_t)); new_client->channel = c; new_client->channel_name = crm_strdup("ipc"); cl_uuid_generate(&client_id); cl_uuid_unparse(&client_id, uuid_str); CRM_CHECK(new_client->id == NULL, crm_free(new_client->id)); new_client->id = crm_strdup(uuid_str); crm_trace("Created channel %p for client %s", c, new_client->id); /* make sure we can find ourselves later for sync calls * redirected to the master instance */ g_hash_table_insert(client_list, new_client->id, new_client); qb_ipcs_context_set(c, new_client); } /* Exit code means? */ static int32_t st_ipc_dispatch(qb_ipcs_connection_t *c, void *data, size_t size) { xmlNode *request = NULL; stonith_client_t *client = (stonith_client_t*)qb_ipcs_context_get(c); CRM_CHECK(client != NULL, crm_err("Invalid client"); return FALSE); CRM_CHECK(client->id != NULL, crm_err("Invalid client: %p", client); return FALSE); request = crm_ipcs_recv(c, data, size); if (request == NULL) { return 0; } if(client->name == NULL) { const char *value = crm_element_value(request, F_STONITH_CLIENTNAME); if(value == NULL) { client->name = crm_itoa(crm_ipcs_client_pid(c)); } else { client->name = crm_strdup(value); } } crm_xml_add(request, F_STONITH_CLIENTID, client->id); crm_xml_add(request, F_STONITH_CLIENTNAME, client->name); crm_log_xml_trace(request, "Client[inbound]"); stonith_command(client, request, NULL); free_xml(request); return 0; } /* Error code means? */ static int32_t st_ipc_closed(qb_ipcs_connection_t *c) { stonith_client_t *client = (stonith_client_t*)qb_ipcs_context_get(c); #if 0 qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE); qb_ipcs_connection_stats_get(c, &stats, QB_FALSE); qb_log(LOG_INFO, "Connection to pid:%d destroyed (active:%d, closed:%d)", stats.client_pid, srv_stats.active_connections, srv_stats.closed_connections); qb_log(LOG_DEBUG, " Requests %"PRIu64"", stats.requests); qb_log(LOG_DEBUG, " Responses %"PRIu64"", stats.responses); qb_log(LOG_DEBUG, " Events %"PRIu64"", stats.events); qb_log(LOG_DEBUG, " Send retries %"PRIu64"", stats.send_retries); qb_log(LOG_DEBUG, " Recv retries %"PRIu64"", stats.recv_retries); qb_log(LOG_DEBUG, " FC state %d", stats.flow_control_state); qb_log(LOG_DEBUG, " FC count %"PRIu64"", stats.flow_control_count); #endif if (client == NULL) { crm_err("No client"); return 0; } crm_trace("Cleaning up after client disconnect: %p/%s/%s", client, crm_str(client->name), client->id); if(client->id != NULL) { g_hash_table_remove(client_list, client->id); } /* 0 means: yes, go ahead and destroy the connection */ return 0; } static void st_ipc_destroy(qb_ipcs_connection_t *c) { stonith_client_t *client = (stonith_client_t*)qb_ipcs_context_get(c); /* Make sure the connection is fully cleaned up */ st_ipc_closed(c); if(client == NULL) { crm_trace("Nothing to destroy"); return; } crm_trace("Destroying %s (%p)", client->name, client); crm_free(client->name); crm_free(client->id); crm_free(client); crm_trace("Done"); return; } static void stonith_peer_callback(xmlNode * msg, void* private_data) { const char *remote = crm_element_value(msg, F_ORIG); crm_log_xml_trace(msg, "Peer[inbound]"); stonith_command(NULL, msg, remote); } static void stonith_peer_hb_callback(HA_Message * msg, void* private_data) { xmlNode *xml = convert_ha_message(NULL, msg, __FUNCTION__); stonith_peer_callback(xml, private_data); free_xml(xml); } #if SUPPORT_COROSYNC static gboolean stonith_peer_ais_callback( AIS_Message *wrapper, char *data, int sender) { xmlNode *xml = NULL; if(wrapper->header.id == crm_class_cluster) { xml = string2xml(data); if(xml == NULL) { goto bail; } crm_xml_add(xml, F_ORIG, wrapper->sender.uname); crm_xml_add_int(xml, F_SEQ, wrapper->id); stonith_peer_callback(xml, NULL); } free_xml(xml); return TRUE; bail: crm_err("Invalid XML: '%.120s'", data); return TRUE; } static void stonith_peer_ais_destroy(gpointer user_data) { crm_err("AIS connection terminated"); exit(1); } #endif static void stonith_peer_hb_destroy(gpointer user_data) { if(stonith_shutdown_flag) { crm_info("Heartbeat disconnection complete... exiting"); } else { crm_err("Heartbeat connection lost! Exiting."); } crm_info("Exiting..."); if (mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); } else { exit(LSB_EXIT_OK); } } void do_local_reply(xmlNode *notify_src, const char *client_id, gboolean sync_reply, gboolean from_peer) { /* send callback to originating child */ stonith_client_t *client_obj = NULL; enum stonith_errors local_rc = stonith_ok; crm_trace("Sending response"); if(client_id != NULL) { client_obj = g_hash_table_lookup(client_list, client_id); } else { crm_trace("No client to sent the response to." " F_STONITH_CLIENTID not set."); } crm_trace("Sending callback to request originator"); if(client_obj == NULL) { local_rc = -1; } else { crm_trace("Sending %ssync response to %s %s", sync_reply?"":"an a-", client_obj->name, from_peer?"(originator of delegated request)":""); local_rc = crm_ipcs_send(client_obj->channel, notify_src, !sync_reply); } if(local_rc < stonith_ok && client_obj != NULL) { crm_warn("%sSync reply to %s failed: %s", sync_reply?"":"A-", client_obj?client_obj->name:"", stonith_error2string(local_rc)); } } long long get_stonith_flag(const char *name) { if(safe_str_eq(name, STONITH_OP_FENCE)) { return 0x01; } else if(safe_str_eq(name, STONITH_OP_DEVICE_ADD)) { return 0x04; } else if(safe_str_eq(name, STONITH_OP_DEVICE_DEL)) { return 0x10; } return 0; } static void stonith_notify_client(gpointer key, gpointer value, gpointer user_data) { xmlNode *update_msg = user_data; stonith_client_t *client = value; const char *type = NULL; CRM_CHECK(client != NULL, return); CRM_CHECK(update_msg != NULL, return); type = crm_element_value(update_msg, F_SUBTYPE); CRM_CHECK(type != NULL, crm_log_xml_err(update_msg, "notify"); return); if(client == NULL) { crm_trace("Skipping NULL client"); return; } else if(client->channel == NULL) { crm_trace("Skipping client with NULL channel"); return; } else if(client->name == NULL) { crm_trace("Skipping unnammed client / comamnd channel"); return; } if(client->flags & get_stonith_flag(type)) { crm_trace("Sending %s-notification to client %s/%s", type, client->name, client->id); if(crm_ipcs_send(client->channel, update_msg, ipcs_send_event|ipcs_send_error) <= 0) { crm_warn("%s-Notification of client %s/%s failed", type, client->name, client->id); } } } void do_stonith_notify( int options, const char *type, enum stonith_errors result, xmlNode *data, const char *remote) { /* TODO: Standardize the contents of data */ xmlNode *update_msg = create_xml_node(NULL, "notify"); CRM_CHECK(type != NULL, ;); crm_xml_add(update_msg, F_TYPE, T_STONITH_NOTIFY); crm_xml_add(update_msg, F_SUBTYPE, type); crm_xml_add(update_msg, F_STONITH_OPERATION, type); crm_xml_add_int(update_msg, F_STONITH_RC, result); if(data != NULL) { add_message_xml(update_msg, F_STONITH_CALLDATA, data); } crm_trace("Notifying clients"); g_hash_table_foreach(client_list, stonith_notify_client, update_msg); free_xml(update_msg); crm_trace("Notify complete"); } static stonith_key_value_t *parse_device_list(const char *devices) { int lpc = 0; int max = 0; int last = 0; stonith_key_value_t *output = NULL; if(devices == NULL) { return output; } max = strlen(devices); for(lpc = 0; lpc <= max; lpc++) { if(devices[lpc] == ',' || devices[lpc] == 0) { char *line = NULL; crm_malloc0(line, 2 + lpc - last); snprintf(line, 1 + lpc - last, "%s", devices+last); output = stonith_key_value_add(output, NULL, line); crm_free(line); last = lpc + 1; } } return output; } static void topology_remove_helper(const char *node, int level) { xmlNode *data = create_xml_node(NULL, F_STONITH_LEVEL); crm_xml_add(data, "origin", __FUNCTION__); crm_xml_add_int(data, XML_ATTR_ID, level); crm_xml_add(data, F_STONITH_TARGET, node); stonith_level_remove(data); free_xml(data); } static void topology_register_helper(const char *node, int level, stonith_key_value_t *device_list) { xmlNode *data = create_level_registration_xml(node, level, device_list); stonith_level_register(data); free_xml(data); } static void remove_fencing_topology(xmlXPathObjectPtr xpathObj) { int max = 0, lpc = 0; if(xpathObj && xpathObj->nodesetval) { max = xpathObj->nodesetval->nodeNr; } for(lpc = 0; lpc < max; lpc++) { xmlNode *match = getXpathResult(xpathObj, lpc); CRM_CHECK(match != NULL, continue); if(crm_element_value(match, XML_DIFF_MARKER)) { /* Deletion */ int index = 0; const char *target = crm_element_value(match, XML_ATTR_STONITH_TARGET); crm_element_value_int(match, XML_ATTR_STONITH_INDEX, &index); if(target == NULL) { crm_err("Invalid fencing target in element %s", ID(match)); } else if(index <= 0) { crm_err("Invalid level for %s in element %s", target, ID(match)); } else { topology_remove_helper(target, index); } /* } else { Deal with modifications during the 'addition' stage */ } } } static void register_fencing_topology(xmlXPathObjectPtr xpathObj, gboolean force) { int max = 0, lpc = 0; if(xpathObj && xpathObj->nodesetval) { max = xpathObj->nodesetval->nodeNr; } for(lpc = 0; lpc < max; lpc++) { int index = 0; const char *target; const char *dev_list; stonith_key_value_t *devices = NULL; xmlNode *match = getXpathResult(xpathObj, lpc); CRM_CHECK(match != NULL, continue); crm_element_value_int(match, XML_ATTR_STONITH_INDEX, &index); target = crm_element_value(match, XML_ATTR_STONITH_TARGET); dev_list = crm_element_value(match, XML_ATTR_STONITH_DEVICES); devices = parse_device_list(dev_list); crm_trace("Updating %s[%d] (%s) to %s", target, index, ID(match), dev_list); if(target == NULL) { crm_err("Invalid fencing target in element %s", ID(match)); } else if(index <= 0) { crm_err("Invalid level for %s in element %s", target, ID(match)); } else if(force == FALSE && crm_element_value(match, XML_DIFF_MARKER)) { /* Addition */ topology_register_helper(target, index, devices); } else { /* Modification */ /* Remove then re-add */ topology_remove_helper(target, index); topology_register_helper(target, index, devices); } stonith_key_value_freeall(devices, 1, 1); } } /* Fencing */ static void fencing_topology_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void *user_data) { xmlXPathObjectPtr xpathObj = NULL; const char *xpath = "//" XML_TAG_FENCING_LEVEL; crm_trace("Pushing in stonith topology"); /* Grab everything */ xpathObj = xpath_search(msg, xpath); register_fencing_topology(xpathObj, TRUE); if(xpathObj) { xmlXPathFreeObject(xpathObj); } } static void update_fencing_topology(const char *event, xmlNode * msg) { const char *xpath; xmlXPathObjectPtr xpathObj = NULL; /* Process deletions (only) */ xpath = "//" F_CIB_UPDATE_RESULT "//" XML_TAG_DIFF_REMOVED "//" XML_TAG_FENCING_LEVEL; xpathObj = xpath_search(msg, xpath); remove_fencing_topology(xpathObj); if(xpathObj) { xmlXPathFreeObject(xpathObj); } /* Process additions and changes */ xpath = "//" F_CIB_UPDATE_RESULT "//" XML_TAG_DIFF_ADDED "//" XML_TAG_FENCING_LEVEL; xpathObj = xpath_search(msg, xpath); register_fencing_topology(xpathObj, FALSE); if(xpathObj) { xmlXPathFreeObject(xpathObj); } } static void stonith_shutdown(int nsig) { stonith_shutdown_flag = TRUE; crm_info("Terminating with %d clients", g_hash_table_size(client_list)); exit(0); } static void stonith_cleanup(void) { crm_peer_destroy(); g_hash_table_destroy(client_list); crm_free(stonith_our_uname); #if HAVE_LIBXML2 crm_xml_cleanup(); #endif crm_free(channel1); } /* *INDENT-OFF* */ static struct crm_option long_options[] = { {"stand-alone", 0, 0, 's'}, {"verbose", 0, 0, 'V'}, {"version", 0, 0, '$'}, {"help", 0, 0, '?'}, {0, 0, 0, 0} }; /* *INDENT-ON* */ static void setup_cib(void) { static void *cib_library = NULL; static cib_t *(*cib_new_fn)(void) = NULL; static const char *(*cib_err_fn)(enum cib_errors) = NULL; int rc, retries = 0; cib_t *cib = NULL; if(cib_library == NULL) { cib_library = dlopen(CIB_LIBRARY, RTLD_LAZY); } if(cib_library && cib_new_fn == NULL) { cib_new_fn = dlsym(cib_library, "cib_new"); } if(cib_library && cib_err_fn == NULL) { cib_err_fn = dlsym(cib_library, "cib_error2string"); } if(cib_new_fn != NULL) { cib = (*cib_new_fn)(); } if(cib == NULL) { crm_err("No connection to the CIB"); return; } do { sleep(retries); rc = cib->cmds->signon(cib, CRM_SYSTEM_CRMD, cib_command); } while(rc == cib_connection && ++retries < 5); if (rc != cib_ok) { crm_err("Could not connect to the CIB service: %s", (*cib_err_fn)(rc)); } else if (cib_ok != cib->cmds->add_notify_callback( cib, T_CIB_DIFF_NOTIFY, update_fencing_topology)) { crm_err("Could not set CIB notification callback"); } else { rc = cib->cmds->query(cib, NULL, NULL, cib_scope_local); add_cib_op_callback(cib, rc, FALSE, NULL, fencing_topology_callback); crm_notice("Watching for stonith topology changes"); } } struct qb_ipcs_service_handlers ipc_callbacks = { .connection_accept = st_ipc_accept, .connection_created = st_ipc_created, .msg_process = st_ipc_dispatch, .connection_closed = st_ipc_closed, .connection_destroyed = st_ipc_destroy }; int main(int argc, char ** argv) { int flag; int rc = 0; int lpc = 0; int argerr = 0; int option_index = 0; qb_ipcs_service_t *ipcs = NULL; const char *actions[] = { "reboot", "poweroff", "list", "monitor", "status" }; crm_log_init("stonith-ng", LOG_INFO, TRUE, FALSE, argc, argv); crm_set_options(NULL, "mode [options]", long_options, "Provides a summary of cluster's current state." "\n\nOutputs varying levels of detail in a number of different formats.\n"); while (1) { flag = crm_get_option(argc, argv, &option_index); if (flag == -1) break; switch(flag) { case 'V': crm_bump_log_level(); break; case 's': stand_alone = TRUE; break; case '$': case '?': crm_help(flag, LSB_EXIT_OK); break; default: ++argerr; break; } } if(argc - optind == 1 && safe_str_eq("metadata", argv[optind])) { printf("\n"); printf("\n"); printf(" 1.0\n"); printf(" This is a fake resource that details the instance attributes handled by stonithd.\n"); printf(" Options available for all stonith resources\n"); printf(" \n"); printf(" \n"); printf(" How long to wait for the STONITH action to complete.\n"); printf(" Overrides the stonith-timeout cluster property\n"); printf(" \n"); printf(" \n"); printf(" \n"); printf(" The priority of the stonith resource. The lower the number, the higher the priority.\n"); printf(" \n"); printf(" \n"); printf(" \n", STONITH_ATTR_HOSTARG); printf(" Advanced use only: An alternate parameter to supply instead of 'port'\n"); printf(" Some devices do not support the standard 'port' parameter or may provide additional ones.\n" "Use this to specify an alternate, device-specific, parameter that should indicate the machine to be fenced.\n" "A value of 'none' can be used to tell the cluster not to supply any additional parameters.\n" " \n"); printf(" \n"); printf(" \n"); printf(" \n", STONITH_ATTR_HOSTMAP); printf(" A mapping of host names to ports numbers for devices that do not support host names.\n"); printf(" Eg. node1:1;node2:2,3 would tell the cluster to use port 1 for node1 and ports 2 and 3 for node2\n"); printf(" \n"); printf(" \n"); printf(" \n", STONITH_ATTR_HOSTLIST); printf(" A list of machines controlled by this device (Optional unless %s=static-list).\n", STONITH_ATTR_HOSTCHECK); printf(" \n"); printf(" \n"); printf(" \n", STONITH_ATTR_HOSTCHECK); printf(" How to determin which machines are controlled by the device.\n"); printf(" Allowed values: dynamic-list (query the device), static-list (check the %s attribute), none (assume every device can fence every machine)\n", STONITH_ATTR_HOSTLIST); printf(" \n"); printf(" \n"); for(lpc = 0; lpc < DIMOF(actions); lpc++) { printf(" \n", actions[lpc]); printf(" Advanced use only: An alternate command to run instead of '%s'\n", actions[lpc]); printf(" Some devices do not support the standard commands or may provide additional ones.\n" "Use this to specify an alternate, device-specific, command that implements the '%s' action.\n", actions[lpc]); printf(" \n", actions[lpc]); printf(" \n"); } printf(" \n"); printf("\n"); return 0; } if (optind != argc) { ++argerr; } if (argerr) { crm_help('?', LSB_EXIT_GENERIC); } mainloop_add_signal(SIGTERM, stonith_shutdown); /* EnableProcLogging(); */ set_sigchld_proctrack(G_PRIORITY_HIGH,DEFAULT_MAXDISPATCHTIME); crm_peer_init(); client_list = g_hash_table_new(crm_str_hash, g_str_equal); if(stand_alone == FALSE) { void *dispatch = stonith_peer_hb_callback; void *destroy = stonith_peer_hb_destroy; if(is_openais_cluster()) { #if SUPPORT_COROSYNC destroy = stonith_peer_ais_destroy; dispatch = stonith_peer_ais_callback; #endif } if(crm_cluster_connect(&stonith_our_uname, NULL, dispatch, destroy, #if SUPPORT_HEARTBEAT &hb_conn #else NULL #endif ) == FALSE){ crm_crit("Cannot sign in to the cluster... terminating"); exit(100); } setup_cib(); } else { stonith_our_uname = crm_strdup("localhost"); } device_list = g_hash_table_new_full( crm_str_hash, g_str_equal, NULL, free_device); topology = g_hash_table_new_full( crm_str_hash, g_str_equal, NULL, free_topology_entry); - ipcs = mainloop_add_ipc_server("stonith-ng", QB_IPC_SOCKET, &ipc_callbacks); + ipcs = mainloop_add_ipc_server("stonith-ng", QB_IPC_NATIVE, &ipc_callbacks); #if SUPPORT_STONITH_CONFIG if (((stand_alone == TRUE)) && !(standalone_cfg_read_file(STONITH_NG_CONF_FILE))) { standalone_cfg_commit(); } #endif if(ipcs != NULL) { /* Create the mainloop and run it... */ mainloop = g_main_new(FALSE); crm_info("Starting %s mainloop", crm_system_name); g_main_run(mainloop); } else { crm_err("Couldnt start all communication channels, exiting."); } stonith_cleanup(); #if SUPPORT_HEARTBEAT if(hb_conn) { hb_conn->llc_ops->delete(hb_conn); } #endif crm_info("Done"); return rc; } diff --git a/include/crm/common/ipc.h b/include/crm/common/ipc.h index 46fe618472..38a3bec98f 100644 --- a/include/crm/common/ipc.h +++ b/include/crm/common/ipc.h @@ -1,107 +1,80 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #ifndef CRM_COMMON_IPC__H # define CRM_COMMON_IPC__H # include # include # include # include /* clplumbing based IPC */ -extern gboolean send_ipc_message(IPC_Channel * ipc_client, xmlNode * msg); - -extern void default_ipc_connection_destroy(gpointer user_data); - -extern int init_server_ipc_comms(char *channel_name, - gboolean(*channel_client_connect) (IPC_Channel * newclient, - gpointer user_data), - void (*channel_connection_destroy) (gpointer user_data)); - -extern GCHSource *init_client_ipc_comms(const char *channel_name, - gboolean(*dispatch) (IPC_Channel * source_data, - gpointer user_data), void *client_data, - IPC_Channel ** ch); - -extern IPC_Channel *init_client_ipc_comms_nodispatch(const char *channel_name); - -extern gboolean subsystem_msg_dispatch(IPC_Channel * sender, void *user_data); - -extern IPC_WaitConnection *wait_channel_init(char daemonsocket[]); - -extern gboolean is_ipc_empty(IPC_Channel * ch); - -extern xmlNode *createPingRequest(const char *crm_msg_reference, const char *to); - -extern xmlNode *validate_crm_message(xmlNode * msg, - const char *sys, const char *uuid, const char *msg_type); - # define create_reply(request, xml_response_data) create_reply_adv(request, xml_response_data, __FUNCTION__); extern xmlNode *create_reply_adv(xmlNode * request, xmlNode * xml_response_data, const char *origin); # define create_request(task, xml_data, host_to, sys_to, sys_from, uuid_from) create_request_adv(task, xml_data, host_to, sys_to, sys_from, uuid_from, __FUNCTION__) extern xmlNode *create_request_adv(const char *task, xmlNode * xml_data, const char *host_to, const char *sys_to, const char *sys_from, const char *uuid_from, const char *origin); typedef struct ha_msg_input_s { xmlNode *msg; xmlNode *xml; } ha_msg_input_t; extern ha_msg_input_t *new_ha_msg_input(xmlNode * orig); extern void delete_ha_msg_input(ha_msg_input_t * orig); extern xmlNode *xmlfromIPC(IPC_Channel * ch, int timeout); /* Libqb based IPC */ #include ssize_t crm_ipcs_send(qb_ipcs_connection_t *c, xmlNode *msg, gboolean event); xmlNode *crm_ipcs_recv(qb_ipcs_connection_t *c, void *data, size_t size); int crm_ipcs_client_pid(qb_ipcs_connection_t *c); void crm_ipcs_send_ack(qb_ipcs_connection_t *c, const char *tag, const char *function, int line); #include typedef struct crm_ipc_s crm_ipc_t; crm_ipc_t *crm_ipc_new(const char *name, size_t max_size); bool crm_ipc_connect(crm_ipc_t *client); void crm_ipc_close(crm_ipc_t *client); void crm_ipc_destroy(crm_ipc_t *client); int crm_ipc_send(crm_ipc_t *client, xmlNode *message, xmlNode **reply, int32_t ms_timeout); int crm_ipc_get_fd(crm_ipc_t *client); bool crm_ipc_connected(crm_ipc_t *client); int crm_ipc_ready(crm_ipc_t *client); long crm_ipc_read(crm_ipc_t *client); const char *crm_ipc_buffer(crm_ipc_t *client); const char *crm_ipc_name(crm_ipc_t *client); /* Utils */ xmlNode *create_hello_message(const char *uuid, const char *client_name, const char *major_version, const char *minor_version); #endif diff --git a/lib/common/ipc.c b/lib/common/ipc.c index b55f800536..b51ae0b95c 100644 --- a/lib/common/ipc.c +++ b/lib/common/ipc.c @@ -1,931 +1,516 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include -xmlNode * -xmlfromIPC(IPC_Channel * ch, int timeout) -{ - xmlNode *xml = NULL; - HA_Message *msg = NULL; - -#if HAVE_MSGFROMIPC_TIMEOUT - int ipc_rc = IPC_OK; - - msg = msgfromIPC_timeout(ch, MSG_ALLOWINTR, timeout, &ipc_rc); - - if (ipc_rc == IPC_TIMEOUT) { - crm_warn("No message received in the required interval (%ds)", timeout); - return NULL; - - } else if (ipc_rc == IPC_BROKEN) { - crm_debug("Peer disconnected"); - return NULL; - - } else if (ipc_rc != IPC_OK) { - crm_err("msgfromIPC_timeout failed: rc=%d", ipc_rc); - return NULL; - - } else if (msg == NULL) { - crm_err("Empty reply from msgfromIPC_timeout"); - return NULL; - } -#else - static gboolean do_show_error = TRUE; - - if (timeout && do_show_error) { - crm_err("Timeouts are not supported by the current heartbeat libraries"); - do_show_error = FALSE; - } - - msg = msgfromIPC_noauth(ch); - if (msg == NULL) { - crm_debug("Empty reply from msgfromIPC_noauth"); - return NULL; - } -#endif - - xml = convert_ha_message(NULL, msg, __FUNCTION__); - CRM_CHECK(xml != NULL, crm_err("Invalid ipc message")); - crm_msg_del(msg); - return xml; -} - -static int -xml2ipcchan(xmlNode * m, IPC_Channel * ch) -{ - HA_Message *msg = NULL; - IPC_Message *imsg = NULL; - - if (m == NULL || ch == NULL) { - crm_err("Invalid msg2ipcchan argument"); - errno = EINVAL; - return HA_FAIL; - } - - msg = convert_xml_message(m); - if ((imsg = hamsg2ipcmsg(msg, ch)) == NULL) { - crm_err("hamsg2ipcmsg() failure"); - crm_msg_del(msg); - return HA_FAIL; - } - crm_msg_del(msg); - - if (ch->ops->send(ch, imsg) != IPC_OK) { - if (ch->ch_status == IPC_CONNECT) { - snprintf(ch->failreason, MAXFAILREASON, - "send failed,farside_pid=%d, sendq length=%ld(max is %ld)", - ch->farside_pid, (long)ch->send_queue->current_qlen, - (long)ch->send_queue->max_qlen); - } - imsg->msg_done(imsg); - return HA_FAIL; - } - return HA_OK; -} - -/* frees msg */ -gboolean -send_ipc_message(IPC_Channel * ipc_client, xmlNode * msg) -{ - gboolean all_is_good = TRUE; - int fail_level = LOG_WARNING; - - if (ipc_client != NULL && ipc_client->conntype == IPC_CLIENT) { - fail_level = LOG_ERR; - } - - if (msg == NULL) { - crm_err("cant send NULL message"); - all_is_good = FALSE; - - } else if (ipc_client == NULL) { - crm_err("cant send message without an IPC Channel"); - all_is_good = FALSE; - - } else if (ipc_client->ops->get_chan_status(ipc_client) != IPC_CONNECT) { - do_crm_log(fail_level, "IPC Channel to %d is not connected", (int)ipc_client->farside_pid); - all_is_good = FALSE; - } - - if (all_is_good && xml2ipcchan(msg, ipc_client) != HA_OK) { - do_crm_log(fail_level, "Could not send IPC message to %d", (int)ipc_client->farside_pid); - all_is_good = FALSE; - - if (ipc_client->ops->get_chan_status(ipc_client) != IPC_CONNECT) { - do_crm_log(fail_level, - "IPC Channel to %d is no longer connected", (int)ipc_client->farside_pid); - - } else if (ipc_client->conntype == IPC_CLIENT) { - if (ipc_client->send_queue->current_qlen >= ipc_client->send_queue->max_qlen) { - crm_err("Send queue to %d (size=%d) full.", - ipc_client->farside_pid, (int)ipc_client->send_queue->max_qlen); - } - } - } - return all_is_good; -} - -void -default_ipc_connection_destroy(gpointer user_data) -{ - return; -} - -int -init_server_ipc_comms(char *channel_name, - gboolean(*channel_client_connect) (IPC_Channel * newclient, - gpointer user_data), - void (*channel_connection_destroy) (gpointer user_data)) -{ - /* the clients wait channel is the other source of events. - * This source delivers the clients connection events. - * listen to this source at a relatively lower priority. - */ - - char commpath[SOCKET_LEN]; - IPC_WaitConnection *wait_ch; - - sprintf(commpath, CRM_STATE_DIR "/%s", channel_name); - - wait_ch = wait_channel_init(commpath); - - if (wait_ch == NULL) { - return 1; - } - - G_main_add_IPC_WaitConnection(G_PRIORITY_LOW, wait_ch, NULL, FALSE, - channel_client_connect, channel_name, channel_connection_destroy); - - crm_trace("Listening on: %s", commpath); - - return 0; -} - -GCHSource * -init_client_ipc_comms(const char *channel_name, - gboolean(*dispatch) (IPC_Channel * source_data, gpointer user_data), - void *client_data, IPC_Channel ** ch) -{ - IPC_Channel *a_ch = NULL; - GCHSource *the_source = NULL; - void *callback_data = client_data; - - a_ch = init_client_ipc_comms_nodispatch(channel_name); - if (ch != NULL) { - *ch = a_ch; - if (callback_data == NULL) { - callback_data = a_ch; - } - } - - if (a_ch == NULL) { - crm_warn("Setup of client connection failed," " not adding channel to mainloop"); - - return NULL; - } - - if (dispatch == NULL) { - crm_warn("No dispatch method specified..." - "maybe you meant init_client_ipc_comms_nodispatch()?"); - } else { - crm_trace("Adding dispatch method to channel"); - - the_source = G_main_add_IPC_Channel(G_PRIORITY_HIGH, a_ch, FALSE, dispatch, callback_data, - default_ipc_connection_destroy); - } - - return the_source; -} - -IPC_Channel * -init_client_ipc_comms_nodispatch(const char *channel_name) -{ - IPC_Channel *ch; - GHashTable *attrs; - static char path[] = IPC_PATH_ATTR; - - char *commpath = NULL; - int local_socket_len = 2; /* 2 = '/' + '\0' */ - - local_socket_len += strlen(channel_name); - local_socket_len += strlen(CRM_STATE_DIR); - - crm_malloc0(commpath, local_socket_len); - - sprintf(commpath, CRM_STATE_DIR "/%s", channel_name); - commpath[local_socket_len - 1] = '\0'; - crm_debug("Attempting to talk on: %s", commpath); - - attrs = g_hash_table_new(crm_str_hash, g_str_equal); - g_hash_table_insert(attrs, path, commpath); - - ch = ipc_channel_constructor(IPC_ANYTYPE, attrs); - g_hash_table_destroy(attrs); - - if (ch == NULL) { - crm_err("Could not access channel on: %s", commpath); - crm_free(commpath); - return NULL; - - } else if (ch->ops->initiate_connection(ch) != IPC_OK) { - crm_debug("Could not init comms on: %s", commpath); - ch->ops->destroy(ch); - crm_free(commpath); - return NULL; - } - - ch->ops->set_recv_qlen(ch, 512); - ch->ops->set_send_qlen(ch, 512); - ch->should_send_block = TRUE; - - crm_trace("Processing of %s complete", commpath); - - crm_free(commpath); - return ch; -} - -IPC_WaitConnection * -wait_channel_init(char daemonsocket[]) -{ - IPC_WaitConnection *wait_ch; - mode_t mask; - char path[] = IPC_PATH_ATTR; - GHashTable *attrs; - - attrs = g_hash_table_new(crm_str_hash, g_str_equal); - g_hash_table_insert(attrs, path, daemonsocket); - - mask = umask(0); - wait_ch = ipc_wait_conn_constructor(IPC_ANYTYPE, attrs); - if (wait_ch == NULL) { - crm_perror(LOG_ERR, "Can't create wait channel of type %s", IPC_ANYTYPE); - exit(1); - } - mask = umask(mask); - - g_hash_table_destroy(attrs); - - return wait_ch; -} - -gboolean -subsystem_msg_dispatch(IPC_Channel * sender, void *user_data) -{ - int lpc = 0; - xmlNode *msg = NULL; - xmlNode *data = NULL; - gboolean all_is_well = TRUE; - const char *sys_to; - const char *task; - - gboolean(*process_function) - (xmlNode * msg, xmlNode * data, IPC_Channel * sender) = NULL; - - while (IPC_ISRCONN(sender)) { - gboolean process = FALSE; - - if (sender->ops->is_message_pending(sender) == 0) { - break; - } - - msg = xmlfromIPC(sender, MAX_IPC_DELAY); - if (msg == NULL) { - break; - } - - lpc++; - crm_log_xml_trace(msg, __FUNCTION__); - - sys_to = crm_element_value(msg, F_CRM_SYS_TO); - task = crm_element_value(msg, F_CRM_TASK); - - if (safe_str_eq(task, CRM_OP_HELLO)) { - process = TRUE; - - } else if (sys_to == NULL) { - crm_err("Value of %s was NULL!!", F_CRM_SYS_TO); - - } else if (task == NULL) { - crm_err("Value of %s was NULL!!", F_CRM_TASK); - - } else { - process = TRUE; - } - - if (process == FALSE) { - free_xml(msg); - msg = NULL; - continue; - } - - data = get_message_xml(msg, F_CRM_DATA); - process_function = user_data; - if (FALSE == process_function(msg, data, sender)) { - crm_warn("Received a message destined for %s" " by mistake", sys_to); - } - - free_xml(msg); - msg = NULL; - - if (sender->ch_status == IPC_CONNECT) { - break; - } - } - - crm_trace("Processed %d messages", lpc); - if (sender->ch_status != IPC_CONNECT) { - crm_err("The server %d has left us: Shutting down...NOW", sender->farside_pid); - - exit(1); /* shutdown properly later */ - - return !all_is_well; - } - return all_is_well; -} - -gboolean -is_ipc_empty(IPC_Channel * ch) -{ - if (ch == NULL) { - return TRUE; - - } else if (ch->send_queue->current_qlen == 0 && ch->recv_queue->current_qlen == 0) { - return TRUE; - } - return FALSE; -} - xmlNode * create_request_adv(const char *task, xmlNode * msg_data, const char *host_to, const char *sys_to, const char *sys_from, const char *uuid_from, const char *origin) { char *true_from = NULL; xmlNode *request = NULL; char *reference = generateReference(task, sys_from); if (uuid_from != NULL) { true_from = generate_hash_key(sys_from, uuid_from); } else if (sys_from != NULL) { true_from = crm_strdup(sys_from); } else { crm_err("No sys from specified"); } /* host_from will get set for us if necessary by CRMd when routed */ request = create_xml_node(NULL, __FUNCTION__); crm_xml_add(request, F_CRM_ORIGIN, origin); crm_xml_add(request, F_TYPE, T_CRM); crm_xml_add(request, F_CRM_VERSION, CRM_FEATURE_SET); crm_xml_add(request, F_CRM_MSG_TYPE, XML_ATTR_REQUEST); crm_xml_add(request, XML_ATTR_REFERENCE, reference); crm_xml_add(request, F_CRM_TASK, task); crm_xml_add(request, F_CRM_SYS_TO, sys_to); crm_xml_add(request, F_CRM_SYS_FROM, true_from); /* HOSTTO will be ignored if it is to the DC anyway. */ if (host_to != NULL && strlen(host_to) > 0) { crm_xml_add(request, F_CRM_HOST_TO, host_to); } if (msg_data != NULL) { add_message_xml(request, F_CRM_DATA, msg_data); } crm_free(reference); crm_free(true_from); return request; } ha_msg_input_t * new_ha_msg_input(xmlNode * orig) { ha_msg_input_t *input_copy = NULL; crm_malloc0(input_copy, sizeof(ha_msg_input_t)); input_copy->msg = orig; input_copy->xml = get_message_xml(input_copy->msg, F_CRM_DATA); return input_copy; } void delete_ha_msg_input(ha_msg_input_t * orig) { if (orig == NULL) { return; } free_xml(orig->msg); crm_free(orig); } -xmlNode * -validate_crm_message(xmlNode * msg, const char *sys, const char *uuid, const char *msg_type) -{ - const char *to = NULL; - const char *type = NULL; - const char *crm_msg_reference = NULL; - xmlNode *action = NULL; - const char *true_sys; - char *local_sys = NULL; - - if (msg == NULL) { - return NULL; - } - - to = crm_element_value(msg, F_CRM_SYS_TO); - type = crm_element_value(msg, F_CRM_MSG_TYPE); - - crm_msg_reference = crm_element_value(msg, XML_ATTR_REFERENCE); - action = msg; - true_sys = sys; - - if (uuid != NULL) { - local_sys = generate_hash_key(sys, uuid); - true_sys = local_sys; - } - - if (to == NULL) { - crm_info("No sub-system defined."); - action = NULL; - } else if (true_sys != NULL && strcasecmp(to, true_sys) != 0) { - crm_trace("The message is not for this sub-system (%s != %s).", to, true_sys); - action = NULL; - } - - crm_free(local_sys); - - if (type == NULL) { - crm_info("No message type defined."); - return NULL; - - } else if (msg_type != NULL && strcasecmp(msg_type, type) != 0) { - crm_info("Expecting a (%s) message but received a (%s).", msg_type, type); - action = NULL; - } - - if (crm_msg_reference == NULL) { - crm_info("No message crm_msg_reference defined."); - action = NULL; - } -/* - if(action != NULL) - crm_trace( - "XML is valid and node with message type (%s) found.", - type); - crm_trace("Returning node (%s)", crm_element_name(action)); -*/ - - return action; -} - /* * This method adds a copy of xml_response_data */ xmlNode * create_reply_adv(xmlNode * original_request, xmlNode * xml_response_data, const char *origin) { xmlNode *reply = NULL; const char *host_from = crm_element_value(original_request, F_CRM_HOST_FROM); const char *sys_from = crm_element_value(original_request, F_CRM_SYS_FROM); const char *sys_to = crm_element_value(original_request, F_CRM_SYS_TO); const char *type = crm_element_value(original_request, F_CRM_MSG_TYPE); const char *operation = crm_element_value(original_request, F_CRM_TASK); const char *crm_msg_reference = crm_element_value(original_request, XML_ATTR_REFERENCE); if (type == NULL) { crm_err("Cannot create new_message," " no message type in original message"); CRM_ASSERT(type != NULL); return NULL; #if 0 } else if (strcasecmp(XML_ATTR_REQUEST, type) != 0) { crm_err("Cannot create new_message," " original message was not a request"); return NULL; #endif } reply = create_xml_node(NULL, __FUNCTION__); crm_xml_add(reply, F_CRM_ORIGIN, origin); crm_xml_add(reply, F_TYPE, T_CRM); crm_xml_add(reply, F_CRM_VERSION, CRM_FEATURE_SET); crm_xml_add(reply, F_CRM_MSG_TYPE, XML_ATTR_RESPONSE); crm_xml_add(reply, XML_ATTR_REFERENCE, crm_msg_reference); crm_xml_add(reply, F_CRM_TASK, operation); /* since this is a reply, we reverse the from and to */ crm_xml_add(reply, F_CRM_SYS_TO, sys_from); crm_xml_add(reply, F_CRM_SYS_FROM, sys_to); /* HOSTTO will be ignored if it is to the DC anyway. */ if (host_from != NULL && strlen(host_from) > 0) { crm_xml_add(reply, F_CRM_HOST_TO, host_from); } if (xml_response_data != NULL) { add_message_xml(reply, F_CRM_DATA, xml_response_data); } return reply; } /* Libqb based IPC */ /* Server... */ int crm_ipcs_client_pid(qb_ipcs_connection_t *c) { struct qb_ipcs_connection_stats stats; stats.client_pid = 0; qb_ipcs_connection_stats_get(c, &stats, 0); return stats.client_pid; } xmlNode * crm_ipcs_recv(qb_ipcs_connection_t *c, void *data, size_t size) { char *text = ((char*)data) + sizeof(struct qb_ipc_request_header); crm_trace("Received %.120s", text); return string2xml(text); } ssize_t crm_ipcs_send(qb_ipcs_connection_t *c, xmlNode *message, gboolean event) { int rc; struct iovec iov[2]; static uint32_t id = 0; const char *type = "Response"; struct qb_ipc_response_header header; char *buffer = dump_xml_unformatted(message); iov[0].iov_len = sizeof(struct qb_ipc_response_header); iov[0].iov_base = &header; iov[1].iov_len = 1 + strlen(buffer); iov[1].iov_base = buffer; header.id = id++; /* We don't really use it, but doesn't hurt to set one */ header.error = 0; /* unused */ header.size = iov[0].iov_len + iov[1].iov_len; if(event) { rc = qb_ipcs_event_sendv(c, iov, 2); type = "Event"; } else { rc = qb_ipcs_response_sendv(c, iov, 2); } if(rc < header.size) { do_crm_log((flags & ipcs_send_error)?LOG_ERR:LOG_INFO, "%s %d failed, size=%d, to=%p[%d], rc=%d: %.120s", type, header.id, header.size, crm_ipcs_client_pid(c), rc, buffer); } else { crm_trace("%s %d sent, %d bytes to %p: %.120s", type, header.id, rc, c, buffer); } crm_free(buffer); return rc; } void crm_ipcs_send_ack( qb_ipcs_connection_t *c, const char *tag, const char *function, int line) { xmlNode *ack = create_xml_node(NULL, tag); crm_xml_add(ack, "function", function); crm_xml_add_int(ack, "line", line); crm_ipcs_send(c, ack, FALSE); free_xml(ack); } /* Client... */ #define MIN_MSG_SIZE 12336 /* sizeof(struct qb_ipc_connection_response) */ #define MAX_MSG_SIZE 20*1024 typedef struct crm_ipc_s { struct pollfd pfd; int buf_size; int msg_size; char *buffer; char *name; qb_ipcc_connection_t *ipc; } crm_ipc_t; static int -_infer_ipc_buffer(int max) +pick_ipc_buffer(int max) { const char *env = getenv("PCMK_ipc_buffer"); if(env) { max = crm_parse_int(env, "0"); } if(max <= 0) { max = MAX_MSG_SIZE; } if(max < MIN_MSG_SIZE) { max = MIN_MSG_SIZE; } crm_trace("Using max message size of %d", max); return max; } crm_ipc_t * crm_ipc_new(const char *name, size_t max_size) { crm_ipc_t *client = NULL; crm_malloc0(client, sizeof(crm_ipc_t)); client->name = crm_strdup(name); - client->buf_size = _infer_ipc_buffer(max_size); + client->buf_size = pick_ipc_buffer(max_size); client->buffer = malloc(client->buf_size); client->pfd.fd = -1; client->pfd.events = POLLIN; client->pfd.revents = 0; - crm_trace("Using max message size of %d for %s", client->buf_size, client->name); - return client; } bool crm_ipc_connect(crm_ipc_t *client) { client->ipc = qb_ipcc_connect(client->name, client->buf_size); if (client->ipc == NULL) { crm_perror(LOG_INFO, "Could not establish %s connection", client->name); return FALSE; } client->closed = FALSE; client->pfd.fd = crm_ipc_get_fd(client); if(client->pfd.fd < 0) { crm_perror(LOG_INFO, "Could not obtain file descriptor for %s connection", client->name); return FALSE; } qb_ipcc_context_set(client->ipc, client); return TRUE; } void crm_ipc_close(crm_ipc_t *client) { crm_trace("Disconnecting %s IPC connection %p", client->name, client); if(client->ipc) { qb_ipcc_disconnect(client->ipc); } } void crm_ipc_destroy(crm_ipc_t *client) { crm_trace("Destroying %s IPC connection %p", client->name, client); free(client->buffer); free(client->name); free(client); } int crm_ipc_get_fd(crm_ipc_t *client) { int fd = 0; CRM_ASSERT(client != NULL); if(qb_ipcc_fd_get(client->ipc, &fd) < 0) { crm_perror(LOG_ERR, "Could not obtain file IPC descriptor for %s", client->name); } return fd; } bool crm_ipc_connected(crm_ipc_t *client) { bool rc = FALSE; if(client == NULL) { crm_trace("No client"); return FALSE; } else if(client->pfd.fd < 0) { crm_trace("Bad descriptor"); return FALSE; } rc = qb_ipcc_is_connected(client->ipc); if(rc == FALSE) { client->pfd.fd = -1; } return rc; } int crm_ipc_ready(crm_ipc_t *client) { CRM_ASSERT(client != NULL); if(crm_ipc_connected(client) == FALSE) { return -ENOTCONN; } client->pfd.revents = 0; return poll(&(client->pfd), 1, 0); } long crm_ipc_read(crm_ipc_t *client) { CRM_ASSERT(client != NULL); CRM_ASSERT(client->buffer != NULL); crm_trace("Message recieved on %s IPC connection", client->name); client->buffer[0] = 0; client->msg_size = qb_ipcc_event_recv(client->ipc, client->buffer, client->buf_size-1, -1); if(client->msg_size >= 0) { struct qb_ipc_response_header *header = (struct qb_ipc_response_header *)client->buffer; crm_trace("Recieved response %d, size=%d, rc=%d", header->id, header->size, client->msg_size); client->buffer[client->msg_size] = 0; } if(crm_ipc_connected(client) == FALSE || client->msg_size == -ENOTCONN) { crm_err("Connection to %s failed", client->name); } return client->msg_size; } const char * crm_ipc_buffer(crm_ipc_t *client) { CRM_ASSERT(client != NULL); return client->buffer + sizeof(struct qb_ipc_response_header); } const char *crm_ipc_name(crm_ipc_t *client) { CRM_ASSERT(client != NULL); return client->name; } int crm_ipc_send(crm_ipc_t *client, xmlNode *message, xmlNode **reply, int32_t ms_timeout) { long rc = 0; struct iovec iov[2]; static uint32_t id = 0; struct qb_ipc_request_header header; char *buffer = dump_xml_unformatted(message); iov[0].iov_len = sizeof(struct qb_ipc_request_header); iov[0].iov_base = &header; iov[1].iov_len = 1 + strlen(buffer); iov[1].iov_base = buffer; header.id = id++; /* We don't really use it, but doesn't hurt to set one */ header.size = iov[0].iov_len + iov[1].iov_len; if(ms_timeout == 0) { ms_timeout = 5000; } crm_trace("Waiting for reply to %ld bytes: %.120s...", iov[1].iov_base, buffer); rc = qb_ipcc_sendv_recv(client->ipc, iov, 2, client->buffer, client->buf_size, ms_timeout); crm_trace("rc=%d, errno=%d", rc, errno); if(rc > 0 && reply) { *reply = string2xml(crm_ipc_buffer(client)); } if(crm_ipc_connected(client) == FALSE) { crm_notice("Connection to %s closed: %d", client->name, rc); } else if(rc <= 0) { crm_perror(LOG_ERR, "Request to %s failed: %ld", client->name, rc); crm_info("Request was %.120s", buffer); } crm_free(buffer); return rc; } /* Utils */ xmlNode * create_hello_message(const char *uuid, const char *client_name, const char *major_version, const char *minor_version) { xmlNode *hello_node = NULL; xmlNode *hello = NULL; if (uuid == NULL || strlen(uuid) == 0 || client_name == NULL || strlen(client_name) == 0 || major_version == NULL || strlen(major_version) == 0 || minor_version == NULL || strlen(minor_version) == 0) { crm_err("Missing fields, Hello message will not be valid."); return NULL; } hello_node = create_xml_node(NULL, XML_TAG_OPTIONS); crm_xml_add(hello_node, "major_version", major_version); crm_xml_add(hello_node, "minor_version", minor_version); crm_xml_add(hello_node, "client_name", client_name); crm_xml_add(hello_node, "client_uuid", uuid); crm_trace("creating hello message"); hello = create_request(CRM_OP_HELLO, hello_node, NULL, NULL, client_name, uuid); free_xml(hello_node); return hello; } gboolean process_hello_message(xmlNode * hello, char **uuid, char **client_name, char **major_version, char **minor_version) { const char *local_uuid; const char *local_client_name; const char *local_major_version; const char *local_minor_version; *uuid = NULL; *client_name = NULL; *major_version = NULL; *minor_version = NULL; if (hello == NULL) { return FALSE; } local_uuid = crm_element_value(hello, "client_uuid"); local_client_name = crm_element_value(hello, "client_name"); local_major_version = crm_element_value(hello, "major_version"); local_minor_version = crm_element_value(hello, "minor_version"); if (local_uuid == NULL || strlen(local_uuid) == 0) { crm_err("Hello message was not valid (field %s not found)", "uuid"); return FALSE; } else if (local_client_name == NULL || strlen(local_client_name) == 0) { crm_err("Hello message was not valid (field %s not found)", "client name"); return FALSE; } else if (local_major_version == NULL || strlen(local_major_version) == 0) { crm_err("Hello message was not valid (field %s not found)", "major version"); return FALSE; } else if (local_minor_version == NULL || strlen(local_minor_version) == 0) { crm_err("Hello message was not valid (field %s not found)", "minor version"); return FALSE; } *uuid = crm_strdup(local_uuid); *client_name = crm_strdup(local_client_name); *major_version = crm_strdup(local_major_version); *minor_version = crm_strdup(local_minor_version); crm_trace("Hello message ok"); return TRUE; } diff --git a/lib/common/mainloop.c b/lib/common/mainloop.c index 1a879c8ae4..267b861d6c 100644 --- a/lib/common/mainloop.c +++ b/lib/common/mainloop.c @@ -1,541 +1,557 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #ifndef _GNU_SOURCE # define _GNU_SOURCE #endif #include #include #include #include #include #include #include static gboolean crm_trigger_prepare(GSource * source, gint * timeout) { crm_trigger_t *trig = (crm_trigger_t *) source; /* cluster-glue's FD and IPC related sources make use of * g_source_add_poll() but do not set a timeout in their prepare * functions * * This means mainloop's poll() will block until an event for one * of these sources occurs - any /other/ type of source, such as * this one or g_idle_*, that doesn't use g_source_add_poll() is * S-O-L and wont be processed until there is something fd-based * happens. * * Luckily the timeout we can set here affects all sources and * puts an upper limit on how long poll() can take. * * So unconditionally set a small-ish timeout, not too small that * we're in constant motion, which will act as an upper bound on * how long the signal handling might be delayed for. */ *timeout = 500; /* Timeout in ms */ return trig->trigger; } static gboolean crm_trigger_check(GSource * source) { crm_trigger_t *trig = (crm_trigger_t *) source; return trig->trigger; } static gboolean crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata) { crm_trigger_t *trig = (crm_trigger_t *) source; trig->trigger = FALSE; if (callback) { return callback(trig->user_data); } return TRUE; } static GSourceFuncs crm_trigger_funcs = { crm_trigger_prepare, crm_trigger_check, crm_trigger_dispatch, NULL }; static crm_trigger_t * mainloop_setup_trigger(GSource * source, int priority, gboolean(*dispatch) (gpointer user_data), gpointer userdata) { crm_trigger_t *trigger = NULL; trigger = (crm_trigger_t *) source; trigger->id = 0; trigger->trigger = FALSE; trigger->user_data = userdata; if (dispatch) { g_source_set_callback(source, dispatch, trigger, NULL); } g_source_set_priority(source, priority); g_source_set_can_recurse(source, FALSE); trigger->id = g_source_attach(source, NULL); return trigger; } crm_trigger_t * mainloop_add_trigger(int priority, gboolean(*dispatch) (gpointer user_data), gpointer userdata) { GSource *source = NULL; CRM_ASSERT(sizeof(crm_trigger_t) > sizeof(GSource)); source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t)); CRM_ASSERT(source != NULL); return mainloop_setup_trigger(source, priority, dispatch, userdata); } void mainloop_set_trigger(crm_trigger_t * source) { source->trigger = TRUE; } gboolean mainloop_destroy_trigger(crm_trigger_t * source) { source->trigger = FALSE; if (source->id > 0) { g_source_remove(source->id); } return TRUE; } typedef struct signal_s { crm_trigger_t trigger; /* must be first */ void (*handler) (int sig); int signal; } crm_signal_t; static crm_signal_t *crm_signals[NSIG]; static gboolean crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata) { crm_signal_t *sig = (crm_signal_t *) source; crm_info("Invoking handler for signal %d: %s", sig->signal, strsignal(sig->signal)); sig->trigger.trigger = FALSE; if (sig->handler) { sig->handler(sig->signal); } return TRUE; } static void mainloop_signal_handler(int sig) { if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) { mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]); } } static GSourceFuncs crm_signal_funcs = { crm_trigger_prepare, crm_trigger_check, crm_signal_dispatch, NULL }; gboolean crm_signal(int sig, void (*dispatch) (int sig)) { sigset_t mask; struct sigaction sa; struct sigaction old; if (sigemptyset(&mask) < 0) { crm_perror(LOG_ERR, "Call to sigemptyset failed"); return FALSE; } memset(&sa, 0, sizeof(struct sigaction)); sa.sa_handler = dispatch; sa.sa_flags = SA_RESTART; sa.sa_mask = mask; if (sigaction(sig, &sa, &old) < 0) { crm_perror(LOG_ERR, "Could not install signal handler for signal %d", sig); return FALSE; } return TRUE; } gboolean mainloop_add_signal(int sig, void (*dispatch) (int sig)) { GSource *source = NULL; int priority = G_PRIORITY_HIGH - 1; if (sig == SIGTERM) { /* TERM is higher priority than other signals, * signals are higher priority than other ipc. * Yes, minus: smaller is "higher" */ priority--; } if (sig >= NSIG || sig < 0) { crm_err("Signal %d is out of range", sig); return FALSE; } else if (crm_signals[sig] != NULL) { crm_err("Signal handler for %d is already installed", sig); return FALSE; } CRM_ASSERT(sizeof(crm_signal_t) > sizeof(GSource)); source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t)); crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL); CRM_ASSERT(crm_signals[sig] != NULL); crm_signals[sig]->handler = dispatch; crm_signals[sig]->signal = sig; if (crm_signal(sig, mainloop_signal_handler) == FALSE) { crm_signal_t *tmp = crm_signals[sig]; crm_signals[sig] = NULL; mainloop_destroy_trigger((crm_trigger_t *) tmp); return FALSE; } #if 0 /* If we want signals to interrupt mainloop's poll(), instead of waiting for * the timeout, then we should call siginterrupt() below * * For now, just enforce a low timeout */ if (siginterrupt(sig, 1) < 0) { crm_perror(LOG_INFO, "Could not enable system call interruptions for signal %d", sig); } #endif return TRUE; } gboolean mainloop_destroy_signal(int sig) { crm_signal_t *tmp = NULL; if (sig >= NSIG || sig < 0) { crm_err("Signal %d is out of range", sig); return FALSE; } else if (crm_signal(sig, NULL) == FALSE) { crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig); return FALSE; } else if (crm_signals[sig] == NULL) { return TRUE; } tmp = crm_signals[sig]; crm_signals[sig] = NULL; mainloop_destroy_trigger((crm_trigger_t *) tmp); return TRUE; } static qb_array_t *gio_map = NULL; /* * libqb... */ struct gio_to_qb_poll { int32_t is_used; GIOChannel *channel; int32_t events; void * data; qb_ipcs_dispatch_fn_t fn; enum qb_loop_priority p; }; static gboolean gio_read_socket (GIOChannel *gio, GIOCondition condition, gpointer data) { struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; gint fd = g_io_channel_unix_get_fd(gio); crm_trace("%p.%d %d vs. %d (G_IO_IN)", data, fd, condition, (condition & G_IO_IN)); crm_trace("%p.%d %d vs. %d (G_IO_HUP)", data, fd, condition, (condition & G_IO_HUP)); if(condition & G_IO_NVAL) { crm_trace("Marking failed adaptor %p unused", adaptor); adaptor->is_used = QB_FALSE; } return (adaptor->fn(fd, condition, adaptor->data) == 0); } static void gio_destroy(gpointer data) { struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; crm_trace("Marking adaptor %p unused", adaptor); adaptor->is_used = QB_FALSE; } static int32_t gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { struct gio_to_qb_poll *adaptor; GIOChannel *channel; int32_t res = 0; res = qb_array_index(gio_map, fd, (void**)&adaptor); if (res < 0) { crm_err("Array lookup failed for fd=%d: %d", fd, res); return res; } crm_trace("Adding fd=%d to mainloop as adapater %p", fd, adaptor); if (adaptor->is_used) { crm_err("Adapter for descriptor %d is still in-use", fd); return -EEXIST; } channel = g_io_channel_unix_new(fd); if (!channel) { crm_err("No memory left to add fd=%d", fd); return -ENOMEM; } /* Because unlike the poll() API, glib doesn't tell us about HUPs by default */ evts |= (G_IO_HUP|G_IO_NVAL|G_IO_ERR); adaptor->channel = channel; adaptor->fn = fn; adaptor->events = evts; adaptor->data = data; adaptor->p = p; adaptor->is_used = QB_TRUE; res = g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor, gio_destroy); crm_trace("Added to mainloop with gsource id=%d", res); if(res > 0) { return 0; } return -EINVAL; } static int32_t gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { return 0; } static int32_t gio_poll_dispatch_del(int32_t fd) { struct gio_to_qb_poll *adaptor; crm_trace("Looking for fd=%d", fd); if (qb_array_index(gio_map, fd, (void**)&adaptor) == 0) { crm_trace("Marking adaptor %p unused", adaptor); g_io_channel_unref(adaptor->channel); adaptor->is_used = QB_FALSE; } return 0; } struct qb_ipcs_poll_handlers gio_poll_funcs = { .job_add = NULL, .dispatch_add = gio_poll_dispatch_add, .dispatch_mod = gio_poll_dispatch_mod, .dispatch_del = gio_poll_dispatch_del, }; +static enum qb_ipc_type +pick_ipc_type(enum qb_ipc_type requested) +{ + const char *env = getenv("PCMK_ipc_type"); + + if(env && strcmp("shared-mem", env) == 0) { + return QB_IPC_SHM; + } else if(env && strcmp("socket", env) == 0) { + return QB_IPC_SOCKET; + } else if(env && strcmp("posix", env) == 0) { + return QB_IPC_POSIX_MQ; + } else if(env && strcmp("sysv", env) == 0) { + return QB_IPC_SYSV_MQ; + } else if(requested == QB_IPC_NATIVE) { + /* We prefer sockets actually */ + return QB_IPC_SOCKET; + } + return requested; +} + qb_ipcs_service_t *mainloop_add_ipc_server( const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks) { qb_ipcs_service_t* server = NULL; if(gio_map == NULL) { gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1); } - if(type < 0) { - type = QB_IPC_SHM; - } - - server = qb_ipcs_create(name, 0, type, callbacks); + server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks); qb_ipcs_poll_handlers_set(server, &gio_poll_funcs); qb_ipcs_run(server); return server; } void mainloop_del_ipc_server(qb_ipcs_service_t *server) { qb_ipcs_destroy(server); } typedef struct mainloop_ipc_s { char *name; void *userdata; guint source; crm_ipc_t *ipc; GIOChannel *channel; struct ipc_client_callbacks *callbacks; } mainloop_ipc_t; static gboolean mainloop_ipcc_callback(GIOChannel *gio, GIOCondition condition, gpointer data) { gboolean keep = TRUE; mainloop_ipc_t *client = data; if(condition & G_IO_IN) { long rc = crm_ipc_read(client->ipc); crm_trace("New message from %s[%p] = %d", client->name, client, rc); if(rc <= 0) { crm_perror(LOG_TRACE, "Message acquisition failed: %ld", rc); } else if(client->callbacks && client->callbacks->dispatch) { const char *buffer = crm_ipc_buffer(client->ipc); if(client->callbacks->dispatch(buffer, rc, client->userdata) < 0) { crm_trace("Connection to %s no longer required", client->name); keep = FALSE; } else { crm_trace("delivered: %.60s", buffer); } } else { crm_trace("No callbacks? %p", client->callbacks); } } if(crm_ipc_connected(client->ipc) == FALSE) { crm_err("Connection to %s[%p] closed", client->name, client); keep = FALSE; } else if(condition & G_IO_HUP) { crm_trace("Recieved G_IO_HUP for %s [%p] connection", client->name, client); keep = FALSE; } else if(condition & G_IO_NVAL) { crm_trace("Recieved G_IO_NVAL for %s [%p] connection", client->name, client); keep = FALSE; } else if(condition & G_IO_ERR) { crm_trace("Recieved G_IO_ERR for %s [%p] connection", client->name, client); keep = FALSE; } return keep; } static void mainloop_ipcc_destroy(gpointer c) { mainloop_ipc_t *client = c; crm_trace("Destroying %s[%p]", client->name, c); if(client->callbacks && client->callbacks->destroy) { client->callbacks->destroy(client->userdata); } crm_ipc_close(client->ipc); crm_ipc_destroy(client->ipc); free(client->name); free(client); } mainloop_ipc_t * mainloop_add_ipc_client( const char *name, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks) { mainloop_ipc_t *client = NULL; crm_ipc_t *conn = crm_ipc_new(name, max_size); if(conn && crm_ipc_connect(conn)) { int32_t fd = crm_ipc_get_fd(conn); if(fd > 0) { crm_malloc0(client, sizeof(mainloop_ipc_t)); client->ipc = conn; client->name = crm_strdup(name); client->userdata = userdata; client->callbacks = callbacks; client->channel = g_io_channel_unix_new(fd); client->source = g_io_add_watch_full( client->channel, G_PRIORITY_DEFAULT, (G_IO_IN|G_IO_HUP|G_IO_NVAL|G_IO_ERR), mainloop_ipcc_callback, client, mainloop_ipcc_destroy); crm_trace("Added connection %d for %s[%p].%d", client->source, client->name, client, fd); } } if(conn && client == NULL) { crm_trace("Connection to %s failed", name); crm_ipc_close(conn); crm_ipc_destroy(conn); } return client; } void mainloop_del_ipc_client(mainloop_ipc_t *client) { if(client != NULL) { crm_trace("Removing client %s[%p]", client->name, client); g_io_channel_unref(client->channel); /* Results in mainloop_ipcc_destroy() being called once the source is removed from mainloop? */ } } crm_ipc_t * mainloop_get_ipc_client(mainloop_ipc_t *client) { if(client) { return client->ipc; } return NULL; } diff --git a/mcp/pacemaker.c b/mcp/pacemaker.c index affde1405c..3b9c3b3cb4 100644 --- a/mcp/pacemaker.c +++ b/mcp/pacemaker.c @@ -1,889 +1,889 @@ /* * Copyright (C) 2010 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include gboolean fatal_error = FALSE; GMainLoop *mainloop = NULL; GHashTable *client_list = NULL; GHashTable *peers = NULL; qb_ipcs_service_t *ipcs = NULL; char *local_name = NULL; uint32_t local_nodeid = 0; crm_trigger_t *shutdown_trigger = NULL; const char *pid_file = "/var/run/pacemaker.pid"; /* *INDENT-OFF* */ enum crm_proc_flag { crm_proc_none = 0x00000001, crm_proc_plugin = 0x00000002, crm_proc_lrmd = 0x00000010, crm_proc_cib = 0x00000100, crm_proc_crmd = 0x00000200, crm_proc_attrd = 0x00001000, crm_proc_stonithd = 0x00002000, crm_proc_pe = 0x00010000, crm_proc_te = 0x00020000, crm_proc_mgmtd = 0x00040000, crm_proc_stonith_ng = 0x00100000, }; /* *INDENT-ON* */ typedef struct pcmk_child_s { int pid; long flag; int start_seq; int respawn_count; gboolean respawn; const char *name; const char *uid; const char *command; } pcmk_child_t; /* Index into the array below */ #define pcmk_child_crmd 4 #define pcmk_child_mgmtd 8 /* *INDENT-OFF* */ static pcmk_child_t pcmk_children[] = { { 0, crm_proc_none, 0, 0, FALSE, "none", NULL, NULL }, { 0, crm_proc_plugin, 0, 0, FALSE, "ais", NULL, NULL }, { 0, crm_proc_lrmd, 3, 0, TRUE, "lrmd", NULL, HB_DAEMON_DIR"/lrmd" }, { 0, crm_proc_cib, 1, 0, TRUE, "cib", CRM_DAEMON_USER, CRM_DAEMON_DIR"/cib" }, { 0, crm_proc_crmd, 6, 0, TRUE, "crmd", CRM_DAEMON_USER, CRM_DAEMON_DIR"/crmd" }, { 0, crm_proc_attrd, 4, 0, TRUE, "attrd", CRM_DAEMON_USER, CRM_DAEMON_DIR"/attrd" }, { 0, crm_proc_stonithd, 0, 0, TRUE, "stonithd", NULL, NULL }, { 0, crm_proc_pe, 5, 0, TRUE, "pengine", CRM_DAEMON_USER, CRM_DAEMON_DIR"/pengine" }, { 0, crm_proc_mgmtd, 0, 0, TRUE, "mgmtd", NULL, HB_DAEMON_DIR"/mgmtd" }, { 0, crm_proc_stonith_ng, 2, 0, TRUE, "stonith-ng", NULL, CRM_DAEMON_DIR"/stonithd" }, }; /* *INDENT-ON* */ static gboolean start_child(pcmk_child_t * child); void enable_crmd_as_root(gboolean enable) { if (enable) { pcmk_children[pcmk_child_crmd].uid = NULL; } else { pcmk_children[pcmk_child_crmd].uid = CRM_DAEMON_USER; } } void enable_mgmtd(gboolean enable) { if (enable) { pcmk_children[pcmk_child_mgmtd].start_seq = 7; } else { pcmk_children[pcmk_child_mgmtd].start_seq = 0; } } static uint32_t get_process_list(void) { int lpc = 0; uint32_t procs = crm_proc_plugin; for (lpc = 0; lpc < SIZEOF(pcmk_children); lpc++) { if (pcmk_children[lpc].pid != 0) { procs |= pcmk_children[lpc].flag; } } return procs; } static int pcmk_user_lookup(const char *name, uid_t * uid, gid_t * gid) { int rc = -1; char *buffer = NULL; struct passwd pwd; struct passwd *pwentry = NULL; crm_malloc0(buffer, PW_BUFFER_LEN); getpwnam_r(name, &pwd, buffer, PW_BUFFER_LEN, &pwentry); if (pwentry) { rc = 0; if (uid) { *uid = pwentry->pw_uid; } if (gid) { *gid = pwentry->pw_gid; } crm_trace("Cluster user %s has uid=%d gid=%d", name, pwentry->pw_uid, pwentry->pw_gid); } else { crm_err("Cluster user %s does not exist", name); } crm_free(buffer); return rc; } static void pcmk_child_exit(ProcTrack * p, int status, int signo, int exitcode, int waslogged) { pcmk_child_t *child = p->privatedata; p->privatedata = NULL; if (signo) { crm_notice("Child process %s terminated with signal %d (pid=%d, rc=%d)", child->name, signo, child->pid, exitcode); } else { do_crm_log(exitcode == 0 ? LOG_INFO : LOG_ERR, "Child process %s exited (pid=%d, rc=%d)", child->name, child->pid, exitcode); } child->pid = 0; if (exitcode == 100) { crm_warn("Pacemaker child process %s no longer wishes to be respawned. " "Shutting ourselves down.", child->name); child->respawn = FALSE; fatal_error = TRUE; pcmk_shutdown(15); } /* Broadcast the fact that one of our processes died ASAP * * Try to get some logging of the cause out first though * because we're probably about to get fenced * * Potentially do this only if respawn_count > N * to allow for local recovery */ update_node_processes(local_nodeid, NULL, get_process_list()); child->respawn_count += 1; if (child->respawn_count > MAX_RESPAWN) { crm_err("Child respawn count exceeded by %s", child->name); child->respawn = FALSE; } if (shutdown_trigger) { mainloop_set_trigger(shutdown_trigger); update_node_processes(local_nodeid, NULL, get_process_list()); } else if (child->respawn) { crm_notice("Respawning failed child process: %s", child->name); start_child(child); } } static void pcmkManagedChildRegistered(ProcTrack * p) { pcmk_child_t *child = p->privatedata; child->pid = p->pid; } static const char * pcmkManagedChildName(ProcTrack * p) { pcmk_child_t *child = p->privatedata; return child->name; } static ProcTrack_ops pcmk_managed_child_ops = { pcmk_child_exit, pcmkManagedChildRegistered, pcmkManagedChildName }; static gboolean stop_child(pcmk_child_t * child, int signal) { if (signal == 0) { signal = SIGTERM; } if (child->command == NULL) { crm_debug("Nothing to do for child \"%s\"", child->name); return TRUE; } if (child->pid <= 0) { crm_trace("Client %s not running", child->name); return TRUE; } errno = 0; if (kill(child->pid, signal) == 0) { crm_notice("Stopping %s: Sent -%d to process %d", child->name, signal, child->pid); } else { crm_perror(LOG_ERR, "Stopping %s: Could not send -%d to process %d failed", child->name, signal, child->pid); } return TRUE; } static char *opts_default[] = { NULL, NULL }; static char *opts_vgrind[] = { NULL, NULL, NULL, NULL, NULL }; static gboolean start_child(pcmk_child_t * child) { int lpc = 0; uid_t uid = 0; struct rlimit oflimits; gboolean use_valgrind = FALSE; gboolean use_callgrind = FALSE; const char *devnull = "/dev/null"; const char *env_valgrind = getenv("PCMK_valgrind_enabled"); const char *env_callgrind = getenv("PCMK_callgrind_enabled"); if (child->command == NULL) { crm_info("Nothing to do for child \"%s\"", child->name); return TRUE; } if (env_callgrind != NULL && crm_is_true(env_callgrind)) { use_callgrind = TRUE; use_valgrind = TRUE; } else if (env_callgrind != NULL && strstr(env_callgrind, child->name)) { use_callgrind = TRUE; use_valgrind = TRUE; } else if (env_valgrind != NULL && crm_is_true(env_valgrind)) { use_valgrind = TRUE; } else if (env_valgrind != NULL && strstr(env_valgrind, child->name)) { use_valgrind = TRUE; } if (use_valgrind && strlen(VALGRIND_BIN) == 0) { crm_warn("Cannot enable valgrind for %s:" " The location of the valgrind binary is unknown", child->name); use_valgrind = FALSE; } child->pid = fork(); CRM_ASSERT(child->pid != -1); if (child->pid > 0) { /* parent */ NewTrackedProc(child->pid, 0, PT_LOGNORMAL, child, &pcmk_managed_child_ops); crm_info("Forked child %d for process %s%s", child->pid, child->name, use_valgrind ? " (valgrind enabled: " VALGRIND_BIN ")" : ""); update_node_processes(local_nodeid, NULL, get_process_list()); return TRUE; } else { /* Start a new session */ (void)setsid(); /* Setup the two alternate arg arrarys */ opts_vgrind[0] = crm_strdup(VALGRIND_BIN); if (use_callgrind) { opts_vgrind[1] = crm_strdup("--tool=callgrind"); opts_vgrind[2] = crm_strdup("--callgrind-out-file=" CRM_STATE_DIR "/callgrind.out.%p"); opts_vgrind[3] = crm_strdup(child->command); opts_vgrind[4] = NULL; } else { opts_vgrind[1] = crm_strdup(child->command); opts_vgrind[2] = NULL; opts_vgrind[3] = NULL; opts_vgrind[4] = NULL; } opts_default[0] = crm_strdup(child->command);; #if 0 /* Dont set the group for now - it prevents connection to the cluster */ if (gid && setgid(gid) < 0) { crm_perror("Could not set group to %d", gid); } #endif if (child->uid) { if (pcmk_user_lookup(child->uid, &uid, NULL) < 0) { crm_err("Invalid uid (%s) specified for %s", child->uid, child->name); return TRUE; } } if (uid && setuid(uid) < 0) { crm_perror(LOG_ERR, "Could not set user to %d (%s)", uid, child->uid); } /* Close all open file descriptors */ getrlimit(RLIMIT_NOFILE, &oflimits); for (lpc = 0; lpc < oflimits.rlim_cur; lpc++) { close(lpc); } (void)open(devnull, O_RDONLY); /* Stdin: fd 0 */ (void)open(devnull, O_WRONLY); /* Stdout: fd 1 */ (void)open(devnull, O_WRONLY); /* Stderr: fd 2 */ if (use_valgrind) { (void)execvp(VALGRIND_BIN, opts_vgrind); } else { (void)execvp(child->command, opts_default); } crm_perror(LOG_ERR, "FATAL: Cannot exec %s", child->command); exit(100); } return TRUE; /* never reached */ } static gboolean escalate_shutdown(gpointer data) { pcmk_child_t *child = data; if (child->pid) { /* Use SIGSEGV instead of SIGKILL to create a core so we can see what it was up to */ crm_err("Child %s not terminating in a timely manner, forcing", child->name); stop_child(child, SIGSEGV); } return FALSE; } static gboolean pcmk_shutdown_worker(gpointer user_data) { static int phase = 0; static time_t next_log = 0; static int max = SIZEOF(pcmk_children); int lpc = 0; if (phase == 0) { crm_notice("Shuting down Pacemaker"); phase = max; } for (; phase > 0; phase--) { /* dont stop anything with start_seq < 1 */ for (lpc = max - 1; lpc >= 0; lpc--) { pcmk_child_t *child = &(pcmk_children[lpc]); if (phase != child->start_seq) { continue; } if (child->pid) { time_t now = time(NULL); if (child->respawn) { next_log = now + 30; child->respawn = FALSE; stop_child(child, SIGTERM); if (phase < pcmk_children[pcmk_child_crmd].start_seq) { g_timeout_add(180000 /* 3m */ , escalate_shutdown, child); } } else if (now >= next_log) { next_log = now + 30; crm_notice("Still waiting for %s (pid=%d, seq=%d) to terminate...", child->name, child->pid, child->start_seq); } return TRUE; } /* cleanup */ crm_debug("%s confirmed stopped", child->name); child->pid = 0; } } /* send_cluster_id(); */ crm_notice("Shutdown complete"); g_main_loop_quit(mainloop); if(fatal_error) { crm_notice("Attempting to inhibit respawning after fatal error"); exit(100); } return TRUE; } void pcmk_shutdown(int nsig) { if(ipcs) { crm_trace("Closing IPC server"); mainloop_del_ipc_server(ipcs); ipcs = NULL; } if (shutdown_trigger == NULL) { shutdown_trigger = mainloop_add_trigger(G_PRIORITY_HIGH, pcmk_shutdown_worker, NULL); } mainloop_set_trigger(shutdown_trigger); } static void build_path(const char *path_c, mode_t mode) { int offset = 1, len = 0; char *path = crm_strdup(path_c); CRM_CHECK(path != NULL, return); for (len = strlen(path); offset < len; offset++) { if (path[offset] == '/') { path[offset] = 0; if (mkdir(path, mode) < 0 && errno != EEXIST) { crm_perror(LOG_ERR, "Could not create directory '%s'", path); break; } path[offset] = '/'; } } if (mkdir(path, mode) < 0 && errno != EEXIST) { crm_perror(LOG_ERR, "Could not create directory '%s'", path); } crm_free(path); } static int32_t pcmk_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { crm_trace("Connecting %p for uid=%d gid=%d", c, uid, gid); return 0; } static void pcmk_ipc_created(qb_ipcs_connection_t *c) { g_hash_table_insert(client_list, c, c); crm_debug("Channel %p connected: %d children", c, g_hash_table_size(client_list)); /* update_process_clients(); */ } /* Exit code means? */ static int32_t pcmk_ipc_dispatch(qb_ipcs_connection_t *c, void *data, size_t size) { const char *task = NULL; xmlNode *msg = crm_ipcs_recv(c, data, size); xmlNode *ack = create_xml_node(NULL, "ack"); crm_trace("Message from %p", c); crm_ipcs_send(c, ack, FALSE); free_xml(ack); if (msg == NULL) { return 0; } task = crm_element_value(msg, F_CRM_TASK); if(crm_str_eq(task, CRM_OP_QUIT, TRUE)) { /* Time to quit */ crm_notice("Shutting down in responce to ticket %s (%s)", crm_element_value(msg, XML_ATTR_REFERENCE), crm_element_value(msg, F_CRM_ORIGIN)); pcmk_shutdown(15); } else { /* Just send to everyone */ update_process_clients(); } free_xml(msg); return 0; } /* Error code means? */ static int32_t pcmk_ipc_closed(qb_ipcs_connection_t *c) { crm_trace("%p closed", c); return 0; } static void pcmk_ipc_destroy(qb_ipcs_connection_t *c) { crm_trace("%p destroy", c); g_hash_table_remove(client_list, c); } struct qb_ipcs_service_handlers ipc_callbacks = { .connection_accept = pcmk_ipc_accept, .connection_created = pcmk_ipc_created, .msg_process = pcmk_ipc_dispatch, .connection_closed = pcmk_ipc_closed, .connection_destroyed = pcmk_ipc_destroy }; static gboolean ghash_send_proc_details(gpointer key, gpointer value, gpointer data) { if (crm_ipcs_send(key, data, TRUE) <= 0) { /* remove it */ return TRUE; } return FALSE; } static void peer_loop_fn(gpointer key, gpointer value, gpointer user_data) { pcmk_peer_t *node = value; xmlNode *update = user_data; xmlNode *xml = create_xml_node(update, "node"); crm_xml_add_int(xml, "id", node->id); crm_xml_add(xml, "uname", node->uname); crm_xml_add_int(xml, "processes", node->processes); } void update_process_clients(void) { xmlNode *update = create_xml_node(NULL, "nodes"); crm_trace("Sending process list to %d children", g_hash_table_size(client_list)); g_hash_table_foreach(peers, peer_loop_fn, update); g_hash_table_foreach_remove(client_list, ghash_send_proc_details, update); free_xml(update); } void update_process_peers(void) { char buffer[1024]; struct iovec iov; int rc = 0; memset(buffer, 0, SIZEOF(buffer)); if (local_name) { rc = snprintf(buffer, SIZEOF(buffer) - 1, "", local_name, get_process_list()); } else { rc = snprintf(buffer, SIZEOF(buffer) - 1, "", get_process_list()); } iov.iov_base = buffer; iov.iov_len = rc + 1; crm_trace("Sending %s", buffer); send_cpg_message(&iov); } gboolean update_node_processes(uint32_t id, const char *uname, uint32_t procs) { gboolean changed = FALSE; pcmk_peer_t *node = g_hash_table_lookup(peers, GUINT_TO_POINTER(id)); if (node == NULL) { changed = TRUE; crm_malloc0(node, sizeof(pcmk_peer_t)); node->id = id; g_hash_table_insert(peers, GUINT_TO_POINTER(id), node); node = g_hash_table_lookup(peers, GUINT_TO_POINTER(id)); CRM_ASSERT(node != NULL); } if (uname != NULL) { if (node->uname == NULL || safe_str_eq(node->uname, uname) == FALSE) { crm_notice("%p Node %u now known as %s%s%s", node, id, uname, node->uname?node->uname:", was: ", node->uname?node->uname:""); crm_free(node->uname); node->uname = crm_strdup(uname); changed = TRUE; } } else { crm_trace("Empty uname for node %u", id); } if (procs != 0) { if(procs != node->processes) { crm_debug("Node %s now has process list: %.32x (was %.32x)", node->uname, procs, node->processes); node->processes = procs; changed = TRUE; } else { crm_trace("Node %s still has process list: %.32x", node->uname, procs); } } if (changed && id == local_nodeid) { update_process_clients(); update_process_peers(); } return changed; } /* *INDENT-OFF* */ static struct crm_option long_options[] = { /* Top-level Options */ {"help", 0, 0, '?', "\tThis text"}, {"version", 0, 0, '$', "\tVersion information" }, {"verbose", 0, 0, 'V', "\tIncrease debug output"}, {"shutdown", 0, 0, 'S', "\tInstruct Pacemaker to shutdown on this machine"}, {"features", 0, 0, 'F', "\tDisplay the full version and list of features Pacemaker was built with"}, {"-spacer-", 1, 0, '-', "\nAdditional Options:"}, {"foreground", 0, 0, 'f', "\tRun in the foreground instead of as a daemon"}, {"pid-file", 1, 0, 'p', "\t(Advanced) Daemon pid file location"}, {NULL, 0, 0, 0} }; /* *INDENT-ON* */ int main(int argc, char **argv) { int rc; int flag; int argerr = 0; int option_index = 0; gboolean shutdown = FALSE; gboolean daemonize = TRUE; int start_seq = 1, lpc = 0; static int max = SIZEOF(pcmk_children); uid_t pcmk_uid = 0; gid_t pcmk_gid = 0; struct rlimit cores; crm_ipc_t *old_instance = NULL; /* *INDENT-OFF* */ /* =::=::= Default Environment =::=::= */ setenv("HA_mcp", "true", 1); setenv("HA_COMPRESSION", "bz2", 1); setenv("HA_debug", "0", 1); setenv("HA_logfacility", "daemon", 1); setenv("HA_LOGFACILITY", "daemon", 1); setenv("HA_use_logd", "off", 1); /* *INDENT-ON* */ crm_log_init(NULL, LOG_INFO, TRUE, FALSE, argc, argv); crm_set_options(NULL, "mode [options]", long_options, "Start/Stop Pacemaker\n"); #ifndef ON_DARWIN /* prevent zombies */ signal(SIGCLD, SIG_IGN); #endif while (1) { flag = crm_get_option(argc, argv, &option_index); if (flag == -1) break; switch (flag) { case 'V': crm_bump_log_level(); break; case 'f': daemonize = FALSE; break; case 'p': pid_file = optarg; break; case '$': case '?': crm_help(flag, LSB_EXIT_OK); break; case 'S': shutdown = TRUE; break; case 'F': printf("Pacemaker %s (Build: %s)\n Supporting: %s\n", VERSION, BUILD_VERSION, CRM_FEATURES); exit(0); default: printf("Argument code 0%o (%c) is not (?yet?) supported\n", flag, flag); ++argerr; break; } } if (optind < argc) { printf("non-option ARGV-elements: "); while (optind < argc) printf("%s ", argv[optind++]); printf("\n"); } if (argerr) { crm_help('?', LSB_EXIT_GENERIC); } crm_debug("Checking for old instances of %s", CRM_SYSTEM_MCP); old_instance = crm_ipc_new(CRM_SYSTEM_MCP, 0); crm_ipc_connect(old_instance); if(shutdown) { crm_debug("Terminating previous instance"); while (crm_ipc_connected(old_instance)) { xmlNode *cmd = create_request(CRM_OP_QUIT, NULL, NULL, CRM_SYSTEM_MCP, CRM_SYSTEM_MCP, NULL); crm_debug("."); crm_ipc_send(old_instance, cmd, NULL, 0); free_xml(cmd); sleep(2); } crm_ipc_close(old_instance); crm_ipc_destroy(old_instance); exit(0); } else if(crm_ipc_connected(old_instance)) { crm_ipc_close(old_instance); crm_ipc_destroy(old_instance); crm_err("Pacemaker is already active, aborting startup"); exit(100); } crm_ipc_close(old_instance); crm_ipc_destroy(old_instance); if (read_config() == FALSE) { return 1; } if (daemonize) { crm_enable_stderr(FALSE); crm_make_daemon(crm_system_name, TRUE, pid_file); /* Only Re-init if we're running daemonized */ crm_log_init(NULL, LOG_INFO, TRUE, FALSE, 0, NULL); } crm_notice("Starting Pacemaker %s (Build: %s): %s\n", VERSION, BUILD_VERSION, CRM_FEATURES); mainloop = g_main_new(FALSE); rc = getrlimit(RLIMIT_CORE, &cores); if (rc < 0) { crm_perror(LOG_ERR, "Cannot determine current maximum core size."); } else { if (cores.rlim_max == 0 && geteuid() == 0) { cores.rlim_max = RLIM_INFINITY; } else { crm_info("Maximum core file size is: %lu", (unsigned long)cores.rlim_max); } cores.rlim_cur = cores.rlim_max; rc = setrlimit(RLIMIT_CORE, &cores); if (rc < 0) { crm_perror(LOG_ERR, "Core file generation will remain disabled." " Core files are an important diagnositic tool," " please consider enabling them by default."); } #if 0 /* system() is not thread-safe, can't call from here * Actually, its a pretty hacky way to try and achieve this anyway */ if (system("echo 1 > /proc/sys/kernel/core_uses_pid") != 0) { crm_perror(LOG_ERR, "Could not enable /proc/sys/kernel/core_uses_pid"); } #endif } if (pcmk_user_lookup(CRM_DAEMON_USER, &pcmk_uid, &pcmk_gid) < 0) { crm_err("Cluster user %s does not exist, aborting Pacemaker startup", CRM_DAEMON_USER); return TRUE; } mkdir(CRM_STATE_DIR, 0750); rc = chown(CRM_STATE_DIR, pcmk_uid, pcmk_gid); if(rc < 0) { crm_warn("Cannot change the ownership of %s to user %s and gid %d", CRM_STATE_DIR, CRM_DAEMON_USER, pcmk_gid); } /* Used by stonithd */ build_path(HA_STATE_DIR "/heartbeat", 0755); /* Used by RAs - Leave owned by root */ build_path(CRM_RSCTMP_DIR, 0755); client_list = g_hash_table_new(g_direct_hash, g_direct_equal); peers = g_hash_table_new(g_direct_hash, g_direct_equal); - ipcs = mainloop_add_ipc_server(CRM_SYSTEM_MCP, QB_IPC_SOCKET, &ipc_callbacks); + ipcs = mainloop_add_ipc_server(CRM_SYSTEM_MCP, QB_IPC_NATIVE, &ipc_callbacks); if (ipcs == NULL) { crm_err("Couldn't start IPC server"); return 1; } if (cluster_connect_cfg(&local_nodeid) == FALSE) { crm_err("Couldn't connect to Corosync's CFG service"); return 1; } if (cluster_connect_cpg() == FALSE) { crm_err("Couldn't connect to Corosync's CPG service"); return 1; } local_name = get_local_node_name(); update_node_processes(local_nodeid, local_name, get_process_list()); mainloop_add_signal(SIGTERM, pcmk_shutdown); mainloop_add_signal(SIGINT, pcmk_shutdown); set_sigchld_proctrack(G_PRIORITY_HIGH, DEFAULT_MAXDISPATCHTIME); for (start_seq = 1; start_seq < max; start_seq++) { /* dont start anything with start_seq < 1 */ for (lpc = 0; lpc < max; lpc++) { if (start_seq == pcmk_children[lpc].start_seq) { start_child(&(pcmk_children[lpc])); } } } crm_info("Starting mainloop"); g_main_run(mainloop); g_main_destroy(mainloop); cluster_disconnect_cpg(); cluster_disconnect_cfg(); crm_info("Exiting %s", crm_system_name); return 0; } diff --git a/pengine/main.c b/pengine/main.c index c6f5440e64..5375b5c80c 100644 --- a/pengine/main.c +++ b/pengine/main.c @@ -1,211 +1,211 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #if HAVE_LIBXML2 # include #endif #define OPTARGS "hVc" GMainLoop *mainloop = NULL; qb_ipcs_service_t *ipcs = NULL; void usage(const char *cmd, int exit_status); void pengine_shutdown(int nsig); static int32_t pe_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { crm_trace("Connecting %p for uid=%d gid=%d", c, uid, gid); return 0; } static void pe_ipc_created(qb_ipcs_connection_t *c) { } gboolean process_pe_message(xmlNode * msg, xmlNode * xml_data, qb_ipcs_connection_t* sender); static int32_t pe_ipc_dispatch(qb_ipcs_connection_t *c, void *data, size_t size) { xmlNode *msg = crm_ipcs_recv(c, data, size); xmlNode *ack = create_xml_node(NULL, "ack"); crm_ipcs_send(c, ack, FALSE); free_xml(ack); if (msg != NULL) { xmlNode *data = get_message_xml(msg, F_CRM_DATA); process_pe_message(msg, data, c); free_xml(msg); } return 0; } /* Error code means? */ static int32_t pe_ipc_closed(qb_ipcs_connection_t *c) { return 0; } static void pe_ipc_destroy(qb_ipcs_connection_t *c) { crm_trace("Disconnecting %p", c); } struct qb_ipcs_service_handlers ipc_callbacks = { .connection_accept = pe_ipc_accept, .connection_created = pe_ipc_created, .msg_process = pe_ipc_dispatch, .connection_closed = pe_ipc_closed, .connection_destroyed = pe_ipc_destroy }; int main(int argc, char **argv) { int flag; int argerr = 0; gboolean allow_cores = TRUE; crm_ipc_t *old_instance = NULL; crm_system_name = CRM_SYSTEM_PENGINE; mainloop_add_signal(SIGTERM, pengine_shutdown); while ((flag = getopt(argc, argv, OPTARGS)) != EOF) { switch (flag) { case 'V': crm_bump_log_level(); break; case 'h': /* Help message */ usage(crm_system_name, LSB_EXIT_OK); break; case 'c': allow_cores = TRUE; break; default: ++argerr; break; } } if (argc - optind == 1 && safe_str_eq("metadata", argv[optind])) { pe_metadata(); return 0; } if (optind > argc) { ++argerr; } if (argerr) { usage(crm_system_name, LSB_EXIT_GENERIC); } crm_log_init(NULL, LOG_NOTICE, TRUE, FALSE, argc, argv); if (crm_is_writable(PE_STATE_DIR, NULL, CRM_DAEMON_USER, CRM_DAEMON_GROUP, FALSE) == FALSE) { crm_err("Bad permissions on " PE_STATE_DIR ". Terminating"); fprintf(stderr, "ERROR: Bad permissions on " PE_STATE_DIR ". See logs for details\n"); fflush(stderr); return 100; } /* find any previous instances and shut them down */ crm_debug("Checking for old instances of %s", CRM_SYSTEM_PENGINE); old_instance = crm_ipc_new(CRM_SYSTEM_PENGINE, 0); crm_ipc_connect(old_instance); crm_debug("Terminating previous instance"); while (crm_ipc_connected(old_instance)) { xmlNode *cmd = create_request(CRM_OP_QUIT, NULL, NULL, CRM_SYSTEM_PENGINE, CRM_SYSTEM_PENGINE, NULL); crm_debug("."); crm_ipc_send(old_instance, cmd, NULL, 0); free_xml(cmd); sleep(2); } crm_ipc_close(old_instance); crm_ipc_destroy(old_instance); crm_debug("Init server comms"); - ipcs = mainloop_add_ipc_server(CRM_SYSTEM_PENGINE, QB_IPC_SOCKET, &ipc_callbacks); + ipcs = mainloop_add_ipc_server(CRM_SYSTEM_PENGINE, QB_IPC_NATIVE, &ipc_callbacks); if (ipcs == NULL) { crm_err("Couldn't start IPC server"); return 1; } /* Create the mainloop and run it... */ crm_info("Starting %s", crm_system_name); mainloop = g_main_new(FALSE); g_main_run(mainloop); #if HAVE_LIBXML2 crm_xml_cleanup(); #endif crm_info("Exiting %s", crm_system_name); return 0; } void usage(const char *cmd, int exit_status) { FILE *stream; stream = exit_status ? stderr : stdout; fprintf(stream, "usage: %s [-srkh]" "[-c configure file]\n", cmd); /* fprintf(stream, "\t-d\tsets debug level\n"); */ /* fprintf(stream, "\t-s\tgets daemon status\n"); */ /* fprintf(stream, "\t-r\trestarts daemon\n"); */ /* fprintf(stream, "\t-k\tstops daemon\n"); */ /* fprintf(stream, "\t-h\thelp message\n"); */ fflush(stream); exit(exit_status); } void pengine_shutdown(int nsig) { mainloop_del_ipc_server(ipcs); exit(LSB_EXIT_OK); } diff --git a/tools/attrd.c b/tools/attrd.c index f47b74aa5f..e440a4926e 100644 --- a/tools/attrd.c +++ b/tools/attrd.c @@ -1,852 +1,852 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define OPTARGS "hV" #if SUPPORT_HEARTBEAT ll_cluster_t *attrd_cluster_conn; #endif GMainLoop *mainloop = NULL; char *attrd_uname = NULL; char *attrd_uuid = NULL; gboolean need_shutdown = FALSE; GHashTable *attr_hash = NULL; cib_t *cib_conn = NULL; typedef struct attrd_client_s { char *user; } attrd_client_t; typedef struct attr_hash_entry_s { char *uuid; char *id; char *set; char *section; char *value; char *stored_value; int timeout; char *dampen; guint timer_id; char *user; } attr_hash_entry_t; void attrd_local_callback(xmlNode * msg); gboolean attrd_timer_callback(void *user_data); gboolean attrd_trigger_update(attr_hash_entry_t * hash_entry); void attrd_perform_update(attr_hash_entry_t * hash_entry); static void free_hash_entry(gpointer data) { attr_hash_entry_t *entry = data; if (entry == NULL) { return; } crm_free(entry->id); crm_free(entry->set); crm_free(entry->dampen); crm_free(entry->section); crm_free(entry->uuid); crm_free(entry->value); crm_free(entry->stored_value); crm_free(entry->user); crm_free(entry); } static int32_t attrd_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { crm_trace("Connecting %p for uid=%d gid=%d", c, uid, gid); if (need_shutdown) { crm_info("Ignoring connection request during shutdown"); return FALSE; } return 0; } static void attrd_ipc_created(qb_ipcs_connection_t *c) { attrd_client_t *new_client = NULL; crm_malloc0(new_client, sizeof(attrd_client_t)); qb_ipcs_context_set(c, new_client); crm_trace("Client %p connected", c); } /* Exit code means? */ static int32_t attrd_ipc_dispatch(qb_ipcs_connection_t *c, void *data, size_t size) { xmlNode *msg = crm_ipcs_recv(c, data, size); xmlNode *ack = create_xml_node(NULL, "ack"); crm_ipcs_send(c, ack, FALSE); free_xml(ack); if (msg == NULL) { return 0; } #if ENABLE_ACL determine_request_user(&curr_client->user, c, msg, F_ATTRD_USER); #endif crm_trace("Processing msg from %p", c); crm_log_xml_trace(msg, __PRETTY_FUNCTION__); attrd_local_callback(msg); free_xml(msg); return 0; } /* Error code means? */ static int32_t attrd_ipc_closed(qb_ipcs_connection_t *c) { return 0; } static void attrd_ipc_destroy(qb_ipcs_connection_t *c) { attrd_client_t *client = qb_ipcs_context_get(c); if (client == NULL) { return; } crm_trace("Destroying %p", c); crm_free(client->user); crm_free(client); crm_trace("Freed the cib client"); return; } struct qb_ipcs_service_handlers ipc_callbacks = { .connection_accept = attrd_ipc_accept, .connection_created = attrd_ipc_created, .msg_process = attrd_ipc_dispatch, .connection_closed = attrd_ipc_closed, .connection_destroyed = attrd_ipc_destroy }; static void attrd_shutdown(int nsig) { need_shutdown = TRUE; crm_info("Exiting"); if (mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); } else { exit(0); } } static void usage(const char *cmd, int exit_status) { FILE *stream; stream = exit_status ? stderr : stdout; fprintf(stream, "usage: %s [-srkh] [-c configure file]\n", cmd); /* fprintf(stream, "\t-d\tsets debug level\n"); */ /* fprintf(stream, "\t-s\tgets daemon status\n"); */ /* fprintf(stream, "\t-r\trestarts daemon\n"); */ /* fprintf(stream, "\t-k\tstops daemon\n"); */ /* fprintf(stream, "\t-h\thelp message\n"); */ fflush(stream); exit(exit_status); } static void stop_attrd_timer(attr_hash_entry_t * hash_entry) { if (hash_entry != NULL && hash_entry->timer_id != 0) { crm_trace("Stopping %s timer", hash_entry->id); g_source_remove(hash_entry->timer_id); hash_entry->timer_id = 0; } } static void log_hash_entry(int level, attr_hash_entry_t * entry, const char *text) { do_crm_log(level, "%s", text); do_crm_log(level, "Set: %s", entry->section); do_crm_log(level, "Name: %s", entry->id); do_crm_log(level, "Value: %s", entry->value); do_crm_log(level, "Timeout: %s", entry->dampen); } static attr_hash_entry_t * find_hash_entry(xmlNode * msg) { const char *value = NULL; const char *attr = crm_element_value(msg, F_ATTRD_ATTRIBUTE); attr_hash_entry_t *hash_entry = NULL; if (attr == NULL) { crm_info("Ignoring message with no attribute name"); return NULL; } hash_entry = g_hash_table_lookup(attr_hash, attr); if (hash_entry == NULL) { /* create one and add it */ crm_info("Creating hash entry for %s", attr); crm_malloc0(hash_entry, sizeof(attr_hash_entry_t)); hash_entry->id = crm_strdup(attr); g_hash_table_insert(attr_hash, hash_entry->id, hash_entry); hash_entry = g_hash_table_lookup(attr_hash, attr); CRM_CHECK(hash_entry != NULL, return NULL); } value = crm_element_value(msg, F_ATTRD_SET); if (value != NULL) { crm_free(hash_entry->set); hash_entry->set = crm_strdup(value); crm_debug("\t%s->set: %s", attr, value); } value = crm_element_value(msg, F_ATTRD_SECTION); if (value == NULL) { value = XML_CIB_TAG_STATUS; } crm_free(hash_entry->section); hash_entry->section = crm_strdup(value); crm_trace("\t%s->section: %s", attr, value); value = crm_element_value(msg, F_ATTRD_DAMPEN); if (value != NULL) { crm_free(hash_entry->dampen); hash_entry->dampen = crm_strdup(value); hash_entry->timeout = crm_get_msec(value); crm_trace("\t%s->timeout: %s", attr, value); } #if ENABLE_ACL crm_free(hash_entry->user); value = crm_element_value(msg, F_ATTRD_USER); if (value != NULL) { hash_entry->user = crm_strdup(value); crm_trace("\t%s->user: %s", attr, value); } #endif log_hash_entry(LOG_DEBUG_2, hash_entry, "Found (and updated) entry:"); return hash_entry; } #if SUPPORT_HEARTBEAT static void attrd_ha_connection_destroy(gpointer user_data) { crm_trace("Invoked"); if (need_shutdown) { /* we signed out, so this is expected */ crm_info("Heartbeat disconnection complete"); return; } crm_crit("Lost connection to heartbeat service!"); if (mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); return; } exit(LSB_EXIT_OK); } static void attrd_ha_callback(HA_Message * msg, void *private_data) { attr_hash_entry_t *hash_entry = NULL; xmlNode *xml = convert_ha_message(NULL, msg, __FUNCTION__); const char *from = crm_element_value(xml, F_ORIG); const char *op = crm_element_value(xml, F_ATTRD_TASK); const char *host = crm_element_value(xml, F_ATTRD_HOST); const char *ignore = crm_element_value(xml, F_ATTRD_IGNORE_LOCALLY); if (host != NULL && safe_str_eq(host, attrd_uname)) { crm_info("Update relayed from %s", from); attrd_local_callback(xml); } else if (ignore == NULL || safe_str_neq(from, attrd_uname)) { crm_info("%s message from %s", op, from); hash_entry = find_hash_entry(xml); stop_attrd_timer(hash_entry); attrd_perform_update(hash_entry); } free_xml(xml); } #endif #if SUPPORT_COROSYNC static gboolean attrd_ais_dispatch(AIS_Message * wrapper, char *data, int sender) { xmlNode *xml = NULL; if (wrapper->header.id == crm_class_cluster) { xml = string2xml(data); if (xml == NULL) { crm_err("Bad message received: %d:'%.120s'", wrapper->id, data); } } if (xml != NULL) { attr_hash_entry_t *hash_entry = NULL; const char *op = crm_element_value(xml, F_ATTRD_TASK); const char *host = crm_element_value(xml, F_ATTRD_HOST); const char *ignore = crm_element_value(xml, F_ATTRD_IGNORE_LOCALLY); crm_xml_add_int(xml, F_SEQ, wrapper->id); crm_xml_add(xml, F_ORIG, wrapper->sender.uname); if (host != NULL && safe_str_eq(host, attrd_uname)) { crm_notice("Update relayed from %s", wrapper->sender.uname); attrd_local_callback(xml); } else if (ignore == NULL || safe_str_neq(wrapper->sender.uname, attrd_uname)) { crm_trace("%s message from %s", op, wrapper->sender.uname); hash_entry = find_hash_entry(xml); stop_attrd_timer(hash_entry); attrd_perform_update(hash_entry); } free_xml(xml); } return TRUE; } static void attrd_ais_destroy(gpointer unused) { if (need_shutdown) { /* we signed out, so this is expected */ crm_info("OpenAIS disconnection complete"); return; } crm_crit("Lost connection to OpenAIS service!"); if (mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); return; } exit(LSB_EXIT_GENERIC); } #endif static void attrd_cib_connection_destroy(gpointer user_data) { if (need_shutdown) { crm_info("Connection to the CIB terminated..."); } else { /* eventually this will trigger a reconnect, not a shutdown */ crm_err("Connection to the CIB terminated..."); exit(1); } return; } static void update_for_hash_entry(gpointer key, gpointer value, gpointer user_data) { attr_hash_entry_t *entry = value; if (entry->value != NULL) { attrd_timer_callback(value); } } static void do_cib_replaced(const char *event, xmlNode * msg) { crm_info("Sending full refresh"); g_hash_table_foreach(attr_hash, update_for_hash_entry, NULL); } static gboolean cib_connect(void *user_data) { static int attempts = 1; static int max_retry = 20; gboolean was_err = FALSE; static cib_t *local_conn = NULL; if (local_conn == NULL) { local_conn = cib_new(); } if (was_err == FALSE) { enum cib_errors rc = cib_not_connected; if (attempts < max_retry) { crm_debug("CIB signon attempt %d", attempts); rc = local_conn->cmds->signon(local_conn, T_ATTRD, cib_command); } if (rc != cib_ok && attempts > max_retry) { crm_err("Signon to CIB failed: %s", cib_error2string(rc)); was_err = TRUE; } else if (rc != cib_ok) { attempts++; return TRUE; } } crm_info("Connected to the CIB after %d signon attempts", attempts); if (was_err == FALSE) { enum cib_errors rc = local_conn->cmds->set_connection_dnotify(local_conn, attrd_cib_connection_destroy); if (rc != cib_ok) { crm_err("Could not set dnotify callback"); was_err = TRUE; } } if (was_err == FALSE) { if (cib_ok != local_conn->cmds->add_notify_callback(local_conn, T_CIB_REPLACE_NOTIFY, do_cib_replaced)) { crm_err("Could not set CIB notification callback"); was_err = TRUE; } } if (was_err) { crm_err("Aborting startup"); exit(100); } cib_conn = local_conn; crm_info("Sending full refresh"); g_hash_table_foreach(attr_hash, update_for_hash_entry, NULL); return FALSE; } int main(int argc, char **argv) { int flag = 0; int argerr = 0; gboolean was_err = FALSE; crm_log_init(T_ATTRD, LOG_NOTICE, TRUE, FALSE, argc, argv); mainloop_add_signal(SIGTERM, attrd_shutdown); while ((flag = getopt(argc, argv, OPTARGS)) != EOF) { switch (flag) { case 'V': crm_bump_log_level(); break; case 'h': /* Help message */ usage(T_ATTRD, LSB_EXIT_OK); break; default: ++argerr; break; } } if (optind > argc) { ++argerr; } if (argerr) { usage(T_ATTRD, LSB_EXIT_GENERIC); } attr_hash = g_hash_table_new_full(crm_str_hash, g_str_equal, NULL, free_hash_entry); crm_info("Starting up"); if (was_err == FALSE) { void *destroy = NULL; void *dispatch = NULL; void *data = NULL; #if SUPPORT_COROSYNC if (is_openais_cluster()) { destroy = attrd_ais_destroy; dispatch = attrd_ais_dispatch; } #endif #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { data = &attrd_cluster_conn; dispatch = attrd_ha_callback; destroy = attrd_ha_connection_destroy; } #endif if (FALSE == crm_cluster_connect(&attrd_uname, &attrd_uuid, dispatch, destroy, data)) { crm_err("HA Signon failed"); was_err = TRUE; } } crm_info("Cluster connection active"); if (was_err == FALSE) { - qb_ipcs_service_t *ipcs = mainloop_add_ipc_server(T_ATTRD, QB_IPC_SOCKET, &ipc_callbacks); + qb_ipcs_service_t *ipcs = mainloop_add_ipc_server(T_ATTRD, QB_IPC_NATIVE, &ipc_callbacks); if (ipcs == NULL) { crm_err("Could not start IPC server"); was_err = TRUE; } } crm_info("Accepting attribute updates"); mainloop = g_main_new(FALSE); if (0 == g_timeout_add_full(G_PRIORITY_LOW + 1, 5000, cib_connect, NULL, NULL)) { crm_info("Adding timer failed"); was_err = TRUE; } if (was_err) { crm_err("Aborting startup"); return 100; } crm_notice("Starting mainloop..."); g_main_run(mainloop); crm_notice("Exiting..."); #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { attrd_cluster_conn->llc_ops->signoff(attrd_cluster_conn, TRUE); attrd_cluster_conn->llc_ops->delete(attrd_cluster_conn); } #endif if (cib_conn) { cib_conn->cmds->signoff(cib_conn); cib_delete(cib_conn); } g_hash_table_destroy(attr_hash); crm_free(attrd_uuid); empty_uuid_cache(); return 0; } struct attrd_callback_s { char *attr; char *value; }; static void attrd_cib_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void *user_data) { attr_hash_entry_t *hash_entry = NULL; struct attrd_callback_s *data = user_data; if (data->value == NULL && rc == cib_NOTEXISTS) { rc = cib_ok; } switch (rc) { case cib_ok: crm_debug("Update %d for %s=%s passed", call_id, data->attr, data->value); hash_entry = g_hash_table_lookup(attr_hash, data->attr); if (hash_entry) { crm_free(hash_entry->stored_value); hash_entry->stored_value = NULL; if (data->value != NULL) { hash_entry->stored_value = crm_strdup(data->value); } } break; case cib_diff_failed: /* When an attr changes while the CIB is syncing */ case cib_remote_timeout: /* When an attr changes while there is a DC election */ case cib_NOTEXISTS: /* When an attr changes while the CIB is syncing a * newer config from a node that just came up */ crm_warn("Update %d for %s=%s failed: %s", call_id, data->attr, data->value, cib_error2string(rc)); break; default: crm_err("Update %d for %s=%s failed: %s", call_id, data->attr, data->value, cib_error2string(rc)); } crm_free(data->value); crm_free(data->attr); crm_free(data); } void attrd_perform_update(attr_hash_entry_t * hash_entry) { int rc = cib_ok; struct attrd_callback_s *data = NULL; const char *user_name = NULL; if (hash_entry == NULL) { return; } else if (cib_conn == NULL) { crm_info("Delaying operation %s=%s: cib not connected", hash_entry->id, crm_str(hash_entry->value)); return; } #if ENABLE_ACL if (hash_entry->user) { user_name = hash_entry->user; crm_trace("Performing request from user '%s'", hash_entry->user); } #endif if (hash_entry->value == NULL) { /* delete the attr */ rc = delete_attr_delegate(cib_conn, cib_none, hash_entry->section, attrd_uuid, NULL, hash_entry->set, hash_entry->uuid, hash_entry->id, NULL, FALSE, user_name); if (hash_entry->stored_value) { crm_notice("Sent delete %d: node=%s, attr=%s, id=%s, set=%s, section=%s", rc, attrd_uuid, hash_entry->id, hash_entry->uuid ? hash_entry->uuid : "", hash_entry->set, hash_entry->section); } else if (rc < 0 && rc != cib_NOTEXISTS) { crm_notice ("Delete operation failed: node=%s, attr=%s, id=%s, set=%s, section=%s: %s (%d)", attrd_uuid, hash_entry->id, hash_entry->uuid ? hash_entry->uuid : "", hash_entry->set, hash_entry->section, cib_error2string(rc), rc); } else { crm_trace("Sent delete %d: node=%s, attr=%s, id=%s, set=%s, section=%s", rc, attrd_uuid, hash_entry->id, hash_entry->uuid ? hash_entry->uuid : "", hash_entry->set, hash_entry->section); } } else { /* send update */ rc = update_attr_delegate(cib_conn, cib_none, hash_entry->section, attrd_uuid, NULL, hash_entry->set, hash_entry->uuid, hash_entry->id, hash_entry->value, FALSE, user_name); if (safe_str_neq(hash_entry->value, hash_entry->stored_value) || rc < 0) { crm_notice("Sent update %d: %s=%s", rc, hash_entry->id, hash_entry->value); } else { crm_trace("Sent update %d: %s=%s", rc, hash_entry->id, hash_entry->value); } } crm_malloc0(data, sizeof(struct attrd_callback_s)); data->attr = crm_strdup(hash_entry->id); if (hash_entry->value != NULL) { data->value = crm_strdup(hash_entry->value); } add_cib_op_callback(cib_conn, rc, FALSE, data, attrd_cib_callback); return; } void attrd_local_callback(xmlNode * msg) { static int plus_plus_len = 5; attr_hash_entry_t *hash_entry = NULL; const char *from = crm_element_value(msg, F_ORIG); const char *op = crm_element_value(msg, F_ATTRD_TASK); const char *attr = crm_element_value(msg, F_ATTRD_ATTRIBUTE); const char *value = crm_element_value(msg, F_ATTRD_VALUE); const char *host = crm_element_value(msg, F_ATTRD_HOST); if (safe_str_eq(op, "refresh")) { crm_notice("Sending full refresh (origin=%s)", from); g_hash_table_foreach(attr_hash, update_for_hash_entry, NULL); return; } if (host != NULL && safe_str_neq(host, attrd_uname)) { send_cluster_message(host, crm_msg_attrd, msg, FALSE); return; } crm_debug("%s message from %s: %s=%s", op, from, attr, crm_str(value)); hash_entry = find_hash_entry(msg); if (hash_entry == NULL) { return; } if (hash_entry->uuid == NULL) { const char *key = crm_element_value(msg, F_ATTRD_KEY); if (key) { hash_entry->uuid = crm_strdup(key); } } crm_debug("Supplied: %s, Current: %s, Stored: %s", value, hash_entry->value, hash_entry->stored_value); if (safe_str_eq(value, hash_entry->value) && safe_str_eq(value, hash_entry->stored_value)) { crm_trace("Ignoring non-change"); return; } else if (value) { int offset = 1; int int_value = 0; int value_len = strlen(value); if (value_len < (plus_plus_len + 2) || value[plus_plus_len] != '+' || (value[plus_plus_len + 1] != '+' && value[plus_plus_len + 1] != '=')) { goto set_unexpanded; } int_value = char2score(hash_entry->value); if (value[plus_plus_len + 1] != '+') { const char *offset_s = value + (plus_plus_len + 2); offset = char2score(offset_s); } int_value += offset; if (int_value > INFINITY) { int_value = INFINITY; } crm_info("Expanded %s=%s to %d", attr, value, int_value); crm_xml_add_int(msg, F_ATTRD_VALUE, int_value); value = crm_element_value(msg, F_ATTRD_VALUE); } set_unexpanded: if (safe_str_eq(value, hash_entry->value) && hash_entry->timer_id) { /* We're already waiting to set this value */ return; } crm_free(hash_entry->value); hash_entry->value = NULL; if (value != NULL) { hash_entry->value = crm_strdup(value); crm_debug("New value of %s is %s", attr, value); } stop_attrd_timer(hash_entry); if (hash_entry->timeout > 0) { hash_entry->timer_id = g_timeout_add(hash_entry->timeout, attrd_timer_callback, hash_entry); } else { attrd_trigger_update(hash_entry); } return; } gboolean attrd_timer_callback(void *user_data) { stop_attrd_timer(user_data); attrd_trigger_update(user_data); return TRUE; /* Always return true, removed cleanly by stop_attrd_timer() */ } gboolean attrd_trigger_update(attr_hash_entry_t * hash_entry) { xmlNode *msg = NULL; /* send HA message to everyone */ crm_notice("Sending flush op to all hosts for: %s (%s)", hash_entry->id, crm_str(hash_entry->value)); log_hash_entry(LOG_DEBUG_2, hash_entry, "Sending flush op to all hosts for:"); msg = create_xml_node(NULL, __FUNCTION__); crm_xml_add(msg, F_TYPE, T_ATTRD); crm_xml_add(msg, F_ORIG, attrd_uname); crm_xml_add(msg, F_ATTRD_TASK, "flush"); crm_xml_add(msg, F_ATTRD_ATTRIBUTE, hash_entry->id); crm_xml_add(msg, F_ATTRD_SET, hash_entry->set); crm_xml_add(msg, F_ATTRD_SECTION, hash_entry->section); crm_xml_add(msg, F_ATTRD_DAMPEN, hash_entry->dampen); crm_xml_add(msg, F_ATTRD_VALUE, hash_entry->value); #if ENABLE_ACL if (hash_entry->user) { crm_xml_add(msg, F_ATTRD_USER, hash_entry->user); } #endif if (hash_entry->timeout <= 0) { crm_xml_add(msg, F_ATTRD_IGNORE_LOCALLY, hash_entry->value); attrd_perform_update(hash_entry); } send_cluster_message(NULL, crm_msg_attrd, msg, FALSE); free_xml(msg); return TRUE; } diff --git a/tools/crmadmin.c b/tools/crmadmin.c index 730fc16c2e..66f2f98dc1 100644 --- a/tools/crmadmin.c +++ b/tools/crmadmin.c @@ -1,556 +1,616 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include int message_timer_id = -1; int message_timeout_ms = 30 * 1000; GMainLoop *mainloop = NULL; crm_ipc_t *crmd_channel = NULL; char *admin_uuid = NULL; void usage(const char *cmd, int exit_status); gboolean do_init(void); int do_work(void); void crmadmin_ipc_connection_destroy(gpointer user_data); int admin_msg_callback(const char *buffer, ssize_t length, gpointer userdata); char *pluralSection(const char *a_section); xmlNode *handleCibMod(void); int do_find_node_list(xmlNode * xml_node); gboolean admin_message_timeout(gpointer data); gboolean is_node_online(xmlNode * node_state); enum debug { debug_none, debug_dec, debug_inc }; gboolean BE_VERBOSE = FALSE; int expected_responses = 1; gboolean BASH_EXPORT = FALSE; gboolean DO_HEALTH = FALSE; gboolean DO_RESET = FALSE; gboolean DO_RESOURCE = FALSE; gboolean DO_ELECT_DC = FALSE; gboolean DO_WHOIS_DC = FALSE; gboolean DO_NODE_LIST = FALSE; gboolean BE_SILENT = FALSE; gboolean DO_RESOURCE_LIST = FALSE; enum debug DO_DEBUG = debug_none; const char *crmd_operation = NULL; xmlNode *msg_options = NULL; const char *standby_on_off = "on"; const char *admin_verbose = XML_BOOLEAN_FALSE; char *id = NULL; char *disconnect = NULL; char *dest_node = NULL; char *rsc_name = NULL; char *crm_option = NULL; int operation_status = 0; const char *sys_to = NULL; /* *INDENT-OFF* */ static struct crm_option long_options[] = { /* Top-level Options */ {"help", 0, 0, '?', "\tThis text"}, {"version", 0, 0, '$', "\tVersion information" }, {"quiet", 0, 0, 'q', "\tDisplay only the essential query information"}, {"verbose", 0, 0, 'V', "\tIncrease debug output"}, {"-spacer-", 1, 0, '-', "\nCommands:"}, /* daemon options */ {"debug_inc", 1, 0, 'i', "Increase the crmd's debug level on the specified host"}, {"debug_dec", 1, 0, 'd', "Decrease the crmd's debug level on the specified host"}, {"status", 1, 0, 'S', "Display the status of the specified node." }, {"-spacer-", 1, 0, '-', "\n\tResult is the node's internal FSM state which can be useful for debugging\n"}, {"dc_lookup", 0, 0, 'D', "Display the uname of the node co-ordinating the cluster."}, {"-spacer-", 1, 0, '-', "\n\tThis is an internal detail and is rarely useful to administrators except when deciding on which node to examine the logs.\n"}, {"nodes", 0, 0, 'N', "\tDisplay the uname of all member nodes"}, {"election", 0, 0, 'E', "(Advanced) Start an election for the cluster co-ordinator"}, {"kill", 1, 0, 'K', "(Advanced) Shut down the crmd (not the rest of the clusterstack ) on the specified node"}, {"health", 0, 0, 'H', NULL, 1}, {"-spacer-", 1, 0, '-', "\nAdditional Options:"}, {XML_ATTR_TIMEOUT, 1, 0, 't', "Time (in milliseconds) to wait before declaring the operation failed"}, {"bash-export", 0, 0, 'B', "Create Bash export entries of the form 'export uname=uuid'\n"}, {"-spacer-", 1, 0, '-', "Notes:"}, {"-spacer-", 1, 0, '-', " The -i,-d,-K and -E commands are rarely used and may be removed in future versions."}, {0, 0, 0, 0} }; /* *INDENT-ON* */ int main(int argc, char **argv) { int option_index = 0; int argerr = 0; int flag; crm_log_init(NULL, LOG_ERR, FALSE, TRUE, argc, argv); crm_set_options(NULL, "command [options]", long_options, "Development tool for performing some crmd-specific commands." "\n Likely to be replaced by crm_node in the future"); if (argc < 2) { crm_help('?', LSB_EXIT_EINVAL); } while (1) { flag = crm_get_option(argc, argv, &option_index); if (flag == -1) break; switch (flag) { case 'V': BE_VERBOSE = TRUE; admin_verbose = XML_BOOLEAN_TRUE; crm_bump_log_level(); break; case 't': message_timeout_ms = atoi(optarg); if (message_timeout_ms < 1) { message_timeout_ms = 30 * 1000; } break; case '$': case '?': crm_help(flag, LSB_EXIT_OK); break; case 'D': DO_WHOIS_DC = TRUE; break; case 'B': BASH_EXPORT = TRUE; break; case 'K': DO_RESET = TRUE; crm_trace("Option %c => %s", flag, optarg); dest_node = crm_strdup(optarg); crmd_operation = CRM_OP_LOCAL_SHUTDOWN; break; case 'q': BE_SILENT = TRUE; break; case 'i': DO_DEBUG = debug_inc; crm_trace("Option %c => %s", flag, optarg); dest_node = crm_strdup(optarg); break; case 'd': DO_DEBUG = debug_dec; crm_trace("Option %c => %s", flag, optarg); dest_node = crm_strdup(optarg); break; case 'S': DO_HEALTH = TRUE; crm_trace("Option %c => %s", flag, optarg); dest_node = crm_strdup(optarg); break; case 'E': DO_ELECT_DC = TRUE; break; case 'N': DO_NODE_LIST = TRUE; break; case 'H': DO_HEALTH = TRUE; break; default: printf("Argument code 0%o (%c) is not (?yet?) supported\n", flag, flag); ++argerr; break; } } if (optind < argc) { printf("non-option ARGV-elements: "); while (optind < argc) printf("%s ", argv[optind++]); printf("\n"); } if (optind > argc) { ++argerr; } if (argerr) { crm_help('?', LSB_EXIT_GENERIC); } if (do_init()) { int res = 0; res = do_work(); if (res > 0) { /* wait for the reply by creating a mainloop and running it until * the callbacks are invoked... */ mainloop = g_main_new(FALSE); crm_trace("Waiting for %d replies from the local CRM", expected_responses); message_timer_id = g_timeout_add(message_timeout_ms, admin_message_timeout, NULL); g_main_run(mainloop); } else if (res < 0) { crm_err("No message to send"); operation_status = -1; } } else { crm_warn("Init failed, could not perform requested operations"); operation_status = -2; } crm_trace("%s exiting normally", crm_system_name); return operation_status; } int do_work(void) { int ret = 1; /* construct the request */ xmlNode *msg_data = NULL; gboolean all_is_good = TRUE; msg_options = create_xml_node(NULL, XML_TAG_OPTIONS); crm_xml_add(msg_options, XML_ATTR_VERBOSE, admin_verbose); crm_xml_add(msg_options, XML_ATTR_TIMEOUT, "0"); if (DO_HEALTH == TRUE) { crm_trace("Querying the system"); sys_to = CRM_SYSTEM_DC; if (dest_node != NULL) { sys_to = CRM_SYSTEM_CRMD; crmd_operation = CRM_OP_PING; if (BE_VERBOSE) { expected_responses = 1; } crm_xml_add(msg_options, XML_ATTR_TIMEOUT, "0"); } else { crm_info("Cluster-wide health not available yet"); all_is_good = FALSE; } } else if (DO_ELECT_DC) { /* tell the local node to initiate an election */ dest_node = NULL; sys_to = CRM_SYSTEM_CRMD; crmd_operation = CRM_OP_VOTE; crm_xml_add(msg_options, XML_ATTR_TIMEOUT, "0"); ret = 0; /* no return message */ } else if (DO_WHOIS_DC) { dest_node = NULL; sys_to = CRM_SYSTEM_DC; crmd_operation = CRM_OP_PING; crm_xml_add(msg_options, XML_ATTR_TIMEOUT, "0"); } else if (DO_NODE_LIST) { cib_t *the_cib = cib_new(); xmlNode *output = NULL; enum cib_errors rc = the_cib->cmds->signon(the_cib, crm_system_name, cib_command); if (rc != cib_ok) { return -1; } output = get_cib_copy(the_cib); do_find_node_list(output); free_xml(output); the_cib->cmds->signoff(the_cib); exit(rc); } else if (DO_RESET) { /* tell dest_node to initiate the shutdown proceedure * * if dest_node is NULL, the request will be sent to the * local node */ sys_to = CRM_SYSTEM_CRMD; crm_xml_add(msg_options, XML_ATTR_TIMEOUT, "0"); ret = 0; /* no return message */ } else if (DO_DEBUG == debug_inc) { /* tell dest_node to increase its debug level * * if dest_node is NULL, the request will be sent to the * local node */ sys_to = CRM_SYSTEM_CRMD; crmd_operation = CRM_OP_DEBUG_UP; ret = 0; /* no return message */ } else if (DO_DEBUG == debug_dec) { /* tell dest_node to increase its debug level * * if dest_node is NULL, the request will be sent to the * local node */ sys_to = CRM_SYSTEM_CRMD; crmd_operation = CRM_OP_DEBUG_DOWN; ret = 0; /* no return message */ } else { crm_err("Unknown options"); all_is_good = FALSE; } if (all_is_good == FALSE) { crm_err("Creation of request failed. No message to send"); return -1; } /* send it */ if (crmd_channel == NULL) { crm_err("The IPC connection is not valid, cannot send anything"); return -1; } if (sys_to == NULL) { if (dest_node != NULL) { sys_to = CRM_SYSTEM_CRMD; } else { sys_to = CRM_SYSTEM_DC; } } { xmlNode *cmd = create_request(crmd_operation, msg_data, dest_node, sys_to, crm_system_name, admin_uuid); crm_ipc_send(crmd_channel, cmd, NULL, 0); free_xml(cmd); } return ret; } void crmadmin_ipc_connection_destroy(gpointer user_data) { crm_err("Connection to CRMd was terminated"); if (mainloop) { g_main_quit(mainloop); } else { exit(1); } } struct ipc_client_callbacks crm_callbacks = { .dispatch = admin_msg_callback, .destroy = crmadmin_ipc_connection_destroy }; gboolean do_init(void) { mainloop_ipc_t *source = mainloop_add_ipc_client(CRM_SYSTEM_CRMD, 0, NULL, &crm_callbacks); crm_malloc0(admin_uuid, 11); if (admin_uuid != NULL) { snprintf(admin_uuid, 10, "%d", getpid()); admin_uuid[10] = '\0'; } crmd_channel = mainloop_get_ipc_client(source); if (DO_RESOURCE || DO_RESOURCE_LIST || DO_NODE_LIST) { return TRUE; } else if (crmd_channel != NULL) { xmlNode *xml = create_hello_message(admin_uuid, crm_system_name, "0", "1"); crm_ipc_send(crmd_channel, xml, NULL, 0); return TRUE; } return FALSE; } +static xmlNode * +validate_crm_message(xmlNode * msg, const char *sys, const char *uuid, const char *msg_type) +{ + const char *to = NULL; + const char *type = NULL; + const char *crm_msg_reference = NULL; + xmlNode *action = NULL; + const char *true_sys; + char *local_sys = NULL; + + if (msg == NULL) { + return NULL; + } + + to = crm_element_value(msg, F_CRM_SYS_TO); + type = crm_element_value(msg, F_CRM_MSG_TYPE); + + crm_msg_reference = crm_element_value(msg, XML_ATTR_REFERENCE); + action = msg; + true_sys = sys; + + if (uuid != NULL) { + local_sys = generate_hash_key(sys, uuid); + true_sys = local_sys; + } + + if (to == NULL) { + crm_info("No sub-system defined."); + action = NULL; + } else if (true_sys != NULL && strcasecmp(to, true_sys) != 0) { + crm_trace("The message is not for this sub-system (%s != %s).", to, true_sys); + action = NULL; + } + + crm_free(local_sys); + + if (type == NULL) { + crm_info("No message type defined."); + return NULL; + + } else if (msg_type != NULL && strcasecmp(msg_type, type) != 0) { + crm_info("Expecting a (%s) message but received a (%s).", msg_type, type); + action = NULL; + } + + if (crm_msg_reference == NULL) { + crm_info("No message crm_msg_reference defined."); + action = NULL; + } +/* + if(action != NULL) + crm_trace( + "XML is valid and node with message type (%s) found.", + type); + crm_trace("Returning node (%s)", crm_element_name(action)); +*/ + + return action; +} + int admin_msg_callback(const char *buffer, ssize_t length, gpointer userdata) { static int received_responses = 0; const char *result = NULL; xmlNode *xml = string2xml(buffer); received_responses++; g_source_remove(message_timer_id); crm_log_xml_trace(xml, "ipc"); if (xml == NULL) { crm_info("XML in IPC message was not valid... " "discarding."); } else if (validate_crm_message(xml, crm_system_name, admin_uuid, XML_ATTR_RESPONSE) == FALSE) { crm_trace("Message was not a CRM response. Discarding."); } else { result = crm_element_value(xml, XML_ATTR_RESULT); if (result == NULL || strcasecmp(result, "ok") == 0) { result = "pass"; } else { result = "fail"; } if (DO_HEALTH) { xmlNode *data = get_message_xml(xml, F_CRM_DATA); const char *state = crm_element_value(data, "crmd_state"); printf("Status of %s@%s: %s (%s)\n", crm_element_value(data, XML_PING_ATTR_SYSFROM), crm_element_value(xml, F_CRM_HOST_FROM), state, crm_element_value(data, XML_PING_ATTR_STATUS)); if (BE_SILENT && state != NULL) { fprintf(stderr, "%s\n", state); } } else if (DO_WHOIS_DC) { const char *dc = crm_element_value(xml, F_CRM_HOST_FROM); printf("Designated Controller is: %s\n", dc); if (BE_SILENT && dc != NULL) { fprintf(stderr, "%s\n", dc); } exit(0); } } free_xml(xml); if (received_responses >= expected_responses) { crm_trace("Received expected number (%d) of messages from Heartbeat." " Exiting normally.", expected_responses); exit(0); } message_timer_id = g_timeout_add(message_timeout_ms, admin_message_timeout, NULL); return 0; } gboolean admin_message_timeout(gpointer data) { fprintf(stderr, "No messages received in %d seconds.. aborting\n", (int)message_timeout_ms / 1000); crm_err("No messages received in %d seconds", (int)message_timeout_ms / 1000); operation_status = -3; g_main_quit(mainloop); return FALSE; } gboolean is_node_online(xmlNode * node_state) { const char *uname = crm_element_value(node_state, XML_ATTR_UNAME); const char *join_state = crm_element_value(node_state, XML_CIB_ATTR_JOINSTATE); const char *exp_state = crm_element_value(node_state, XML_CIB_ATTR_EXPSTATE); const char *crm_state = crm_element_value(node_state, XML_CIB_ATTR_CRMDSTATE); const char *ha_state = crm_element_value(node_state, XML_CIB_ATTR_HASTATE); const char *ccm_state = crm_element_value(node_state, XML_CIB_ATTR_INCCM); if (safe_str_neq(join_state, CRMD_JOINSTATE_DOWN) && (ha_state == NULL || safe_str_eq(ha_state, "active")) && crm_is_true(ccm_state) && safe_str_eq(crm_state, "online")) { crm_trace("Node %s is online", uname); return TRUE; } crm_trace("Node %s: ha=%s ccm=%s join=%s exp=%s crm=%s", uname, crm_str(ha_state), crm_str(ccm_state), crm_str(join_state), crm_str(exp_state), crm_str(crm_state)); crm_trace("Node %s is offline", uname); return FALSE; } int do_find_node_list(xmlNode * xml_node) { int found = 0; xmlNode *node = NULL; xmlNode *nodes = get_object_root(XML_CIB_TAG_NODES, xml_node); for (node = __xml_first_child(nodes); node != NULL; node = __xml_next(node)) { if (crm_str_eq((const char *)node->name, XML_CIB_TAG_NODE, TRUE)) { if (BASH_EXPORT) { printf("export %s=%s\n", crm_element_value(node, XML_ATTR_UNAME), crm_element_value(node, XML_ATTR_ID)); } else { printf("%s node: %s (%s)\n", crm_element_value(node, XML_ATTR_TYPE), crm_element_value(node, XML_ATTR_UNAME), crm_element_value(node, XML_ATTR_ID)); } found++; } } if (found == 0) { printf("NO nodes configured\n"); } return found; }