diff --git a/lib/ais/plugin.c b/lib/ais/plugin.c index 2cc4d9956e..01147adde2 100644 --- a/lib/ais/plugin.c +++ b/lib/ais/plugin.c @@ -1,1756 +1,1756 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USAA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "plugin.h" #include "utils.h" #include #include #include #include #include #include #include #include #include struct corosync_api_v1 *pcmk_api = NULL; uint32_t plugin_has_votes = 0; uint32_t plugin_expected_votes = 2; int use_mgmtd = 0; int plugin_log_level = LOG_DEBUG; char *local_uname = NULL; int local_uname_len = 0; char *local_cname = NULL; int local_cname_len = 0; uint32_t local_nodeid = 0; char *ipc_channel_name = NULL; static uint64_t local_born_on = 0; uint64_t membership_seq = 0; pthread_t pcmk_wait_thread; gboolean use_mcp = FALSE; gboolean wait_active = TRUE; gboolean have_reliable_membership_id = FALSE; GHashTable *ipc_client_list = NULL; GHashTable *membership_list = NULL; GHashTable *membership_notify_list = NULL; #define MAX_RESPAWN 100 #define LOOPBACK_ID 16777343 #define crm_flag_none 0x00000000 #define crm_flag_members 0x00000001 struct crm_identify_msg_s { cs_ipc_header_request_t header __attribute__ ((aligned(8))); uint32_t id; uint32_t pid; int32_t votes; uint32_t processes; char uname[256]; char version[256]; uint64_t born_on; } __attribute__ ((packed)); /* *INDENT-OFF* */ static crm_child_t pcmk_children[] = { { 0, crm_proc_none, crm_flag_none, 0, 0, FALSE, "none", NULL, NULL, NULL, NULL }, { 0, crm_proc_plugin, crm_flag_none, 0, 0, FALSE, "ais", NULL, NULL, NULL, NULL }, { 0, crm_proc_lrmd, crm_flag_none, 3, 0, TRUE, "lrmd", NULL, CRM_DAEMON_DIR"/lrmd", NULL, NULL }, { 0, crm_proc_cib, crm_flag_members, 1, 0, TRUE, "cib", CRM_DAEMON_USER, CRM_DAEMON_DIR"/cib", NULL, NULL }, { 0, crm_proc_crmd, crm_flag_members, 6, 0, TRUE, "crmd", CRM_DAEMON_USER, CRM_DAEMON_DIR"/crmd", NULL, NULL }, { 0, crm_proc_attrd, crm_flag_none, 4, 0, TRUE, "attrd", CRM_DAEMON_USER, CRM_DAEMON_DIR"/attrd", NULL, NULL }, { 0, crm_proc_stonithd, crm_flag_none, 0, 0, TRUE, "stonithd", NULL, "/bin/false", NULL, NULL }, { 0, crm_proc_pe, crm_flag_none, 5, 0, TRUE, "pengine", CRM_DAEMON_USER, CRM_DAEMON_DIR"/pengine", NULL, NULL }, { 0, crm_proc_mgmtd, crm_flag_none, 7, 0, TRUE, "mgmtd", NULL, HB_DAEMON_DIR"/mgmtd", NULL, NULL }, { 0, crm_proc_stonith_ng, crm_flag_none, 2, 0, TRUE, "stonith-ng", NULL, CRM_DAEMON_DIR"/stonithd", NULL, NULL }, }; /* *INDENT-ON* */ void send_cluster_id(void); -int send_cluster_msg_raw(const AIS_Message * ais_msg); +int send_plugin_msg_raw(const AIS_Message * ais_msg); char *pcmk_generate_membership_data(void); gboolean check_message_sanity(const AIS_Message * msg, const char *data); typedef const void ais_void_ptr; int pcmk_shutdown(void); void pcmk_peer_update(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id); int pcmk_startup(struct corosync_api_v1 *corosync_api); int pcmk_config_init(struct corosync_api_v1 *corosync_api); int pcmk_ipc_exit(void *conn); int pcmk_ipc_connect(void *conn); void pcmk_ipc(void *conn, ais_void_ptr * msg); void pcmk_exec_dump(void); void pcmk_cluster_swab(void *msg); void pcmk_cluster_callback(ais_void_ptr * message, unsigned int nodeid); void pcmk_nodeid(void *conn, ais_void_ptr * msg); void pcmk_nodes(void *conn, ais_void_ptr * msg); void pcmk_notify(void *conn, ais_void_ptr * msg); void pcmk_remove_member(void *conn, ais_void_ptr * msg); void pcmk_quorum(void *conn, ais_void_ptr * msg); void pcmk_cluster_id_swab(void *msg); void pcmk_cluster_id_callback(ais_void_ptr * message, unsigned int nodeid); void ais_remove_peer(char *node_id); static uint32_t get_process_list(void) { int lpc = 0; uint32_t procs = crm_proc_plugin; if (use_mcp) { return 0; } for (lpc = 0; lpc < SIZEOF(pcmk_children); lpc++) { if (pcmk_children[lpc].pid != 0) { procs |= pcmk_children[lpc].flag; } } return procs; } static struct corosync_lib_handler pcmk_lib_service[] = { { /* 0 */ .lib_handler_fn = pcmk_ipc, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED, }, { /* 1 */ .lib_handler_fn = pcmk_nodes, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED, }, { /* 2 */ .lib_handler_fn = pcmk_notify, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED, }, { /* 3 */ .lib_handler_fn = pcmk_nodeid, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED, }, { /* 4 */ .lib_handler_fn = pcmk_remove_member, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED, }, { /* 5 */ .lib_handler_fn = pcmk_quorum, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED, }, }; static struct corosync_exec_handler pcmk_exec_service[] = { { /* 0 */ .exec_handler_fn = pcmk_cluster_callback, .exec_endian_convert_fn = pcmk_cluster_swab}, { /* 1 */ .exec_handler_fn = pcmk_cluster_id_callback, .exec_endian_convert_fn = pcmk_cluster_id_swab} }; /* * Exports the interface for the service */ /* *INDENT-OFF* */ struct corosync_service_engine pcmk_service_handler = { .name = (char *)"Pacemaker Cluster Manager "PACKAGE_VERSION, .id = PCMK_SERVICE_ID, .private_data_size = 0, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED, .allow_inquorate = CS_LIB_ALLOW_INQUORATE, .lib_init_fn = pcmk_ipc_connect, .lib_exit_fn = pcmk_ipc_exit, .exec_init_fn = pcmk_startup, .exec_exit_fn = pcmk_shutdown, .config_init_fn = pcmk_config_init, .priority = 50, .lib_engine = pcmk_lib_service, .lib_engine_count = sizeof (pcmk_lib_service) / sizeof (struct corosync_lib_handler), .exec_engine = pcmk_exec_service, .exec_engine_count = sizeof (pcmk_exec_service) / sizeof (struct corosync_exec_handler), .confchg_fn = pcmk_peer_update, .exec_dump_fn = pcmk_exec_dump, /* void (*sync_init) (void); */ /* int (*sync_process) (void); */ /* void (*sync_activate) (void); */ /* void (*sync_abort) (void); */ }; /* * Dynamic Loader definition */ struct corosync_service_engine *pcmk_get_handler_ver0 (void); struct corosync_service_engine_iface_ver0 pcmk_service_handler_iface = { .corosync_get_service_engine_ver0 = pcmk_get_handler_ver0 }; static struct lcr_iface openais_pcmk_ver0[2] = { { .name = "pacemaker", .version = 0, .versions_replace = 0, .versions_replace_count = 0, .dependencies = 0, .dependency_count = 0, .constructor = NULL, .destructor = NULL, .interfaces = NULL }, { .name = "pacemaker", .version = 1, .versions_replace = 0, .versions_replace_count = 0, .dependencies = 0, .dependency_count = 0, .constructor = NULL, .destructor = NULL, .interfaces = NULL } }; static struct lcr_comp pcmk_comp_ver0 = { .iface_count = 2, .ifaces = openais_pcmk_ver0 }; /* *INDENT-ON* */ struct corosync_service_engine * pcmk_get_handler_ver0(void) { return (&pcmk_service_handler); } __attribute__ ((constructor)) static void register_this_component(void) { lcr_interfaces_set(&openais_pcmk_ver0[0], &pcmk_service_handler_iface); lcr_interfaces_set(&openais_pcmk_ver0[1], &pcmk_service_handler_iface); lcr_component_register(&pcmk_comp_ver0); } static int plugin_has_quorum(void) { if ((plugin_expected_votes >> 1) < plugin_has_votes) { return 1; } return 0; } static void update_expected_votes(int value) { if (value < plugin_has_votes) { /* Never drop below the number of connected nodes */ ais_info("Cannot update expected quorum votes %d -> %d:" " value cannot be less that the current number of votes", plugin_expected_votes, value); } else if (plugin_expected_votes != value) { ais_info("Expected quorum votes %d -> %d", plugin_expected_votes, value); plugin_expected_votes = value; } } /* Create our own local copy of the config so we can navigate it */ static void process_ais_conf(void) { char *value = NULL; gboolean any_log = FALSE; hdb_handle_t top_handle = 0; hdb_handle_t local_handle = 0; ais_info("Reading configure"); top_handle = config_find_init(pcmk_api, "logging"); local_handle = config_find_next(pcmk_api, "logging", top_handle); get_config_opt(pcmk_api, local_handle, "debug", &value, "on"); if (ais_get_boolean(value)) { plugin_log_level = LOG_DEBUG; pcmk_env.debug = "1"; } else { plugin_log_level = LOG_INFO; pcmk_env.debug = "0"; } get_config_opt(pcmk_api, local_handle, "to_logfile", &value, "off"); if (ais_get_boolean(value)) { get_config_opt(pcmk_api, local_handle, "logfile", &value, NULL); if (value == NULL) { ais_err("Logging to a file requested but no log file specified"); } else { uid_t pcmk_uid = geteuid(); uid_t pcmk_gid = getegid(); FILE *logfile = fopen(value, "a"); if (logfile) { int ignore = 0; int logfd = fileno(logfile); pcmk_env.logfile = value; /* Ensure the file has the correct permissions */ ignore = fchown(logfd, pcmk_uid, pcmk_gid); ignore = fchmod(logfd, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); fprintf(logfile, "Set r/w permissions for uid=%d, gid=%d on %s\n", pcmk_uid, pcmk_gid, value); fflush(logfile); fsync(logfd); fclose(logfile); any_log = TRUE; } else { ais_err("Couldn't create logfile: %s", value); } } } get_config_opt(pcmk_api, local_handle, "to_syslog", &value, "on"); if (any_log && ais_get_boolean(value) == FALSE) { ais_info("User configured file based logging and explicitly disabled syslog."); value = "none"; } else { if (ais_get_boolean(value) == FALSE) { ais_err ("Please enable some sort of logging, either 'to_file: on' or 'to_syslog: on'."); ais_err("If you use file logging, be sure to also define a value for 'logfile'"); } get_config_opt(pcmk_api, local_handle, "syslog_facility", &value, "daemon"); } pcmk_env.syslog = value; config_find_done(pcmk_api, local_handle); top_handle = config_find_init(pcmk_api, "quorum"); local_handle = config_find_next(pcmk_api, "quorum", top_handle); get_config_opt(pcmk_api, local_handle, "provider", &value, NULL); if (value && ais_str_eq("quorum_cman", value)) { pcmk_env.quorum = "cman"; } else { pcmk_env.quorum = "pcmk"; } top_handle = config_find_init(pcmk_api, "service"); local_handle = config_find_next(pcmk_api, "service", top_handle); while (local_handle) { value = NULL; pcmk_api->object_key_get(local_handle, "name", strlen("name"), (void **)&value, NULL); if (ais_str_eq("pacemaker", value)) { break; } local_handle = config_find_next(pcmk_api, "service", top_handle); } get_config_opt(pcmk_api, local_handle, "ver", &value, "0"); if (ais_str_eq(value, "1")) { ais_info("Enabling MCP mode: Use the Pacemaker init script to complete Pacemaker startup"); use_mcp = TRUE; } get_config_opt(pcmk_api, local_handle, "clustername", &local_cname, "pcmk"); local_cname_len = strlen(local_cname); get_config_opt(pcmk_api, local_handle, "use_logd", &value, "no"); pcmk_env.use_logd = value; get_config_opt(pcmk_api, local_handle, "use_mgmtd", &value, "no"); if (ais_get_boolean(value) == FALSE) { int lpc = 0; for (; lpc < SIZEOF(pcmk_children); lpc++) { if (crm_proc_mgmtd & pcmk_children[lpc].flag) { /* Disable mgmtd startup */ pcmk_children[lpc].start_seq = 0; break; } } } config_find_done(pcmk_api, local_handle); } int pcmk_config_init(struct corosync_api_v1 *unused) { return 0; } static void * pcmk_wait_dispatch(void *arg) { struct timespec waitsleep = { .tv_sec = 1, .tv_nsec = 0 }; while (wait_active) { int lpc = 0; for (; lpc < SIZEOF(pcmk_children); lpc++) { if (pcmk_children[lpc].pid > 0) { int status; pid_t pid = wait4(pcmk_children[lpc].pid, &status, WNOHANG, NULL); if (pid == 0) { continue; } else if (pid < 0) { ais_perror("Call to wait4(%s) failed", pcmk_children[lpc].name); continue; } /* cleanup */ pcmk_children[lpc].pid = 0; pcmk_children[lpc].conn = NULL; pcmk_children[lpc].async_conn = NULL; if (WIFSIGNALED(status)) { int sig = WTERMSIG(status); ais_err("Child process %s terminated with signal %d" " (pid=%d, core=%s)", pcmk_children[lpc].name, sig, pid, WCOREDUMP(status) ? "true" : "false"); } else if (WIFEXITED(status)) { int rc = WEXITSTATUS(status); do_ais_log(rc == 0 ? LOG_NOTICE : LOG_ERR, "Child process %s exited (pid=%d, rc=%d)", pcmk_children[lpc].name, pid, rc); if (rc == 100) { ais_notice("Child process %s no longer wishes" " to be respawned", pcmk_children[lpc].name); pcmk_children[lpc].respawn = FALSE; } } /* Broadcast the fact that one of our processes died * * Try to get some logging of the cause out first though * because we're probably about to get fenced * * Potentially do this only if respawn_count > N * to allow for local recovery */ send_cluster_id(); pcmk_children[lpc].respawn_count += 1; if (pcmk_children[lpc].respawn_count > MAX_RESPAWN) { ais_err("Child respawn count exceeded by %s", pcmk_children[lpc].name); pcmk_children[lpc].respawn = FALSE; } if (pcmk_children[lpc].respawn) { ais_notice("Respawning failed child process: %s", pcmk_children[lpc].name); spawn_child(&(pcmk_children[lpc])); } send_cluster_id(); } } sched_yield(); nanosleep(&waitsleep, 0); } return 0; } static uint32_t pcmk_update_nodeid(void) { int last = local_nodeid; local_nodeid = pcmk_api->totem_nodeid_get(); if (last != local_nodeid) { if (last == 0) { ais_info("Local node id: %u", local_nodeid); } else { char *last_s = NULL; ais_malloc0(last_s, 32); ais_warn("Detected local node id change: %u -> %u", last, local_nodeid); snprintf(last_s, 31, "%u", last); ais_remove_peer(last_s); ais_free(last_s); } update_member(local_nodeid, 0, 0, 1, 0, local_uname, CRM_NODE_MEMBER, NULL); } return local_nodeid; } static void build_path(const char *path_c, mode_t mode) { int offset = 1, len = 0; char *path = ais_strdup(path_c); AIS_CHECK(path != NULL, return); for (len = strlen(path); offset < len; offset++) { if (path[offset] == '/') { path[offset] = 0; if (mkdir(path, mode) < 0 && errno != EEXIST) { ais_perror("Could not create directory '%s'", path); break; } path[offset] = '/'; } } if (mkdir(path, mode) < 0 && errno != EEXIST) { ais_perror("Could not create directory '%s'", path); } ais_free(path); } int pcmk_startup(struct corosync_api_v1 *init_with) { int rc = 0; int lpc = 0; int start_seq = 1; struct utsname us; struct rlimit cores; static int max = SIZEOF(pcmk_children); uid_t pcmk_uid = 0; gid_t pcmk_gid = 0; uid_t root_uid = -1; uid_t cs_uid = geteuid(); pcmk_user_lookup("root", &root_uid, NULL); pcmk_api = init_with; pcmk_env.debug = "0"; pcmk_env.logfile = NULL; pcmk_env.use_logd = "false"; pcmk_env.syslog = "daemon"; if (cs_uid != root_uid) { ais_err("Corosync must be configured to start as 'root'," " otherwise Pacemaker cannot manage services." " Expected %d got %d", root_uid, cs_uid); return -1; } process_ais_conf(); membership_list = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, destroy_ais_node); membership_notify_list = g_hash_table_new(g_direct_hash, g_direct_equal); ipc_client_list = g_hash_table_new(g_direct_hash, g_direct_equal); ais_info("CRM: Initialized"); log_printf(LOG_INFO, "Logging: Initialized %s\n", __PRETTY_FUNCTION__); rc = getrlimit(RLIMIT_CORE, &cores); if (rc < 0) { ais_perror("Cannot determine current maximum core size."); } else { if (cores.rlim_max == 0 && geteuid() == 0) { cores.rlim_max = RLIM_INFINITY; } else { ais_info("Maximum core file size is: %lu", cores.rlim_max); } cores.rlim_cur = cores.rlim_max; rc = setrlimit(RLIMIT_CORE, &cores); if (rc < 0) { ais_perror("Core file generation will remain disabled." " Core files are an important diagnositic tool," " please consider enabling them by default."); } #if 0 /* system() is not thread-safe, can't call from here * Actually, its a pretty hacky way to try and achieve this anyway */ if (system("echo 1 > /proc/sys/kernel/core_uses_pid") != 0) { ais_perror("Could not enable /proc/sys/kernel/core_uses_pid"); } #endif } if (pcmk_user_lookup(CRM_DAEMON_USER, &pcmk_uid, &pcmk_gid) < 0) { ais_err("Cluster user %s does not exist, aborting Pacemaker startup", CRM_DAEMON_USER); return TRUE; } rc = mkdir(CRM_STATE_DIR, 0750); rc = chown(CRM_STATE_DIR, pcmk_uid, pcmk_gid); /* Used by stonithd */ build_path(HA_STATE_DIR "/heartbeat", 0755); /* Used by RAs - Leave owned by root */ build_path(CRM_RSCTMP_DIR, 0755); rc = uname(&us); AIS_ASSERT(rc == 0); local_uname = ais_strdup(us.nodename); local_uname_len = strlen(local_uname); ais_info("Service: %d", PCMK_SERVICE_ID); ais_info("Local hostname: %s", local_uname); pcmk_update_nodeid(); if (use_mcp == FALSE) { pthread_create(&pcmk_wait_thread, NULL, pcmk_wait_dispatch, NULL); for (start_seq = 1; start_seq < max; start_seq++) { /* dont start anything with start_seq < 1 */ for (lpc = 0; lpc < max; lpc++) { if (start_seq == pcmk_children[lpc].start_seq) { spawn_child(&(pcmk_children[lpc])); } } } } return 0; } /* static void ais_print_node(const char *prefix, struct totem_ip_address *host) { int len = 0; char *buffer = NULL; ais_malloc0(buffer, INET6_ADDRSTRLEN+1); inet_ntop(host->family, host->addr, buffer, INET6_ADDRSTRLEN); len = strlen(buffer); ais_info("%s: %.*s", prefix, len, buffer); ais_free(buffer); } */ #if 0 /* copied here for reference from exec/totempg.c */ char * totempg_ifaces_print(unsigned int nodeid) { static char iface_string[256 * INTERFACE_MAX]; char one_iface[64]; struct totem_ip_address interfaces[INTERFACE_MAX]; char **status; unsigned int iface_count; unsigned int i; int res; iface_string[0] = '\0'; res = totempg_ifaces_get(nodeid, interfaces, &status, &iface_count); if (res == -1) { return ("no interface found for nodeid"); } for (i = 0; i < iface_count; i++) { sprintf(one_iface, "r(%d) ip(%s), ", i, totemip_print(&interfaces[i])); strcat(iface_string, one_iface); } return (iface_string); } #endif static void ais_mark_unseen_peer_dead(gpointer key, gpointer value, gpointer user_data) { int *changed = user_data; crm_node_t *node = value; if (node->last_seen != membership_seq && ais_str_eq(CRM_NODE_LOST, node->state) == FALSE) { ais_info("Node %s was not seen in the previous transition", node->uname); *changed += update_member(node->id, 0, membership_seq, node->votes, node->processes, node->uname, CRM_NODE_LOST, NULL); } } void pcmk_peer_update(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id) { int lpc = 0; int changed = 0; int do_update = 0; AIS_ASSERT(ring_id != NULL); switch (configuration_type) { case TOTEM_CONFIGURATION_REGULAR: do_update = 1; break; case TOTEM_CONFIGURATION_TRANSITIONAL: break; } membership_seq = ring_id->seq; ais_notice("%s membership event on ring %lld: memb=%ld, new=%ld, lost=%ld", do_update ? "Stable" : "Transitional", ring_id->seq, (long)member_list_entries, (long)joined_list_entries, (long)left_list_entries); if (do_update == 0) { for (lpc = 0; lpc < joined_list_entries; lpc++) { const char *prefix = "new: "; uint32_t nodeid = joined_list[lpc]; ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); } for (lpc = 0; lpc < member_list_entries; lpc++) { const char *prefix = "memb:"; uint32_t nodeid = member_list[lpc]; ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); } for (lpc = 0; lpc < left_list_entries; lpc++) { const char *prefix = "lost:"; uint32_t nodeid = left_list[lpc]; ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); } return; } for (lpc = 0; lpc < joined_list_entries; lpc++) { const char *prefix = "NEW: "; uint32_t nodeid = joined_list[lpc]; crm_node_t *node = NULL; changed += update_member(nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_MEMBER, NULL); ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(nodeid)); if (node->addr == NULL) { const char *addr = totempg_ifaces_print(nodeid); node->addr = ais_strdup(addr); ais_debug("Node %u has address %s", nodeid, node->addr); } } for (lpc = 0; lpc < member_list_entries; lpc++) { const char *prefix = "MEMB:"; uint32_t nodeid = member_list[lpc]; changed += update_member(nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_MEMBER, NULL); ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); } for (lpc = 0; lpc < left_list_entries; lpc++) { const char *prefix = "LOST:"; uint32_t nodeid = left_list[lpc]; changed += update_member(nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_LOST, NULL); ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); } if (changed && joined_list_entries == 0 && left_list_entries == 0) { ais_err("Something strange happened: %d", changed); changed = 0; } ais_trace("Reaping unseen nodes..."); g_hash_table_foreach(membership_list, ais_mark_unseen_peer_dead, &changed); if (member_list_entries > 1) { /* Used to set born-on in send_cluster_id()) * We need to wait until we have at least one peer since first * membership id is based on the one before we stopped and isn't reliable */ have_reliable_membership_id = TRUE; } if (changed) { ais_debug("%d nodes changed", changed); pcmk_update_nodeid(); send_member_notification(); } send_cluster_id(); } int pcmk_ipc_exit(void *conn) { int lpc = 0; const char *client = NULL; void *async_conn = conn; for (; lpc < SIZEOF(pcmk_children); lpc++) { if (pcmk_children[lpc].conn == conn) { if (wait_active == FALSE) { /* Make sure the shutdown loop exits */ pcmk_children[lpc].pid = 0; } pcmk_children[lpc].conn = NULL; pcmk_children[lpc].async_conn = NULL; client = pcmk_children[lpc].name; break; } } g_hash_table_remove(membership_notify_list, async_conn); g_hash_table_remove(ipc_client_list, async_conn); if (client) { do_ais_log(LOG_INFO, "Client %s (conn=%p, async-conn=%p) left", client, conn, async_conn); } else { do_ais_log((LOG_DEBUG + 1), "Client %s (conn=%p, async-conn=%p) left", "unknown-transient", conn, async_conn); } return (0); } int pcmk_ipc_connect(void *conn) { /* OpenAIS hasn't finished setting up the connection at this point * Sending messages now messes up the protocol! */ return (0); } /* * Executive message handlers */ void pcmk_cluster_swab(void *msg) { AIS_Message *ais_msg = msg; ais_trace("Performing endian conversion..."); ais_msg->id = swab32(ais_msg->id); ais_msg->size = swab32(ais_msg->size); ais_msg->is_compressed = swab32(ais_msg->is_compressed); ais_msg->compressed_size = swab32(ais_msg->compressed_size); ais_msg->host.id = swab32(ais_msg->host.id); ais_msg->host.pid = swab32(ais_msg->host.pid); ais_msg->host.type = swab32(ais_msg->host.type); ais_msg->host.size = swab32(ais_msg->host.size); ais_msg->host.local = swab32(ais_msg->host.local); ais_msg->sender.id = swab32(ais_msg->sender.id); ais_msg->sender.pid = swab32(ais_msg->sender.pid); ais_msg->sender.type = swab32(ais_msg->sender.type); ais_msg->sender.size = swab32(ais_msg->sender.size); ais_msg->sender.local = swab32(ais_msg->sender.local); ais_msg->header.size = swab32(ais_msg->header.size); ais_msg->header.id = swab32(ais_msg->header.id); ais_msg->header.error = swab32(ais_msg->header.error); } void pcmk_cluster_callback(ais_void_ptr * message, unsigned int nodeid) { const AIS_Message *ais_msg = message; ais_trace("Message from node %u (%s)", nodeid, nodeid == local_nodeid ? "local" : "remote"); /* Shouldn't be required... update_member( ais_msg->sender.id, membership_seq, -1, 0, ais_msg->sender.uname, NULL); */ if (ais_msg->host.size == 0 || ais_str_eq(ais_msg->host.uname, local_uname)) { route_ais_message(ais_msg, FALSE); } else { ais_trace("Discarding Msg[%d] (dest=%s:%s, from=%s:%s)", ais_msg->id, ais_dest(&(ais_msg->host)), msg_type2text(ais_msg->host.type), ais_dest(&(ais_msg->sender)), msg_type2text(ais_msg->sender.type)); } } void pcmk_cluster_id_swab(void *msg) { struct crm_identify_msg_s *ais_msg = msg; ais_trace("Performing endian conversion..."); ais_msg->id = swab32(ais_msg->id); ais_msg->pid = swab32(ais_msg->pid); ais_msg->votes = swab32(ais_msg->votes); ais_msg->processes = swab32(ais_msg->processes); ais_msg->born_on = swab64(ais_msg->born_on); ais_msg->header.size = swab32(ais_msg->header.size); ais_msg->header.id = swab32(ais_msg->header.id); } void pcmk_cluster_id_callback(ais_void_ptr * message, unsigned int nodeid) { int changed = 0; const struct crm_identify_msg_s *msg = message; if (nodeid != msg->id) { ais_err("Invalid message: Node %u claimed to be node %d", nodeid, msg->id); return; } ais_debug("Node update: %s (%s)", msg->uname, msg->version); changed = update_member(nodeid, msg->born_on, membership_seq, msg->votes, msg->processes, msg->uname, NULL, msg->version); if (changed) { send_member_notification(); } } struct res_overlay { cs_ipc_header_response_t header __attribute((aligned(8))); char buf[4096]; }; struct res_overlay *res_overlay = NULL; static void send_ipc_ack(void *conn) { if (res_overlay == NULL) { ais_malloc0(res_overlay, sizeof(struct res_overlay)); } res_overlay->header.id = CRM_MESSAGE_IPC_ACK; res_overlay->header.size = sizeof(cs_ipc_header_response_t); res_overlay->header.error = CS_OK; pcmk_api->ipc_response_send(conn, res_overlay, res_overlay->header.size); } /* local callbacks */ void pcmk_ipc(void *conn, ais_void_ptr * msg) { AIS_Message *mutable; int type = 0, size = 0; gboolean transient = TRUE; const AIS_Message *ais_msg = (const AIS_Message *)msg; void *async_conn = conn; ais_trace("Message from client %p", conn); if (check_message_sanity(msg, ((const AIS_Message *)msg)->data) == FALSE) { /* The message is corrupted - ignore */ send_ipc_ack(conn); msg = NULL; return; } /* Make a copy of the message here and ACK it * The message is only valid until a response is sent * but the response must also be sent _before_ we send anything else */ mutable = ais_msg_copy(ais_msg); AIS_ASSERT(check_message_sanity(mutable, mutable->data)); size = mutable->header.size; /* ais_malloc0(ais_msg, size); */ /* memcpy(ais_msg, msg, size); */ type = mutable->sender.type; ais_trace ("type: %d local: %d conn: %p host type: %d ais: %d sender pid: %d child pid: %d size: %d", type, mutable->host.local, pcmk_children[type].conn, mutable->host.type, crm_msg_ais, mutable->sender.pid, pcmk_children[type].pid, ((int)SIZEOF(pcmk_children))); if (type > crm_msg_none && type < SIZEOF(pcmk_children)) { /* known child process */ transient = FALSE; } #if 0 /* If this check fails, the order of pcmk_children probably * doesn't match that of the crm_ais_msg_types enum */ AIS_CHECK(transient || mutable->sender.pid == pcmk_children[type].pid, ais_err("Sender: %d, child[%d]: %d", mutable->sender.pid, type, pcmk_children[type].pid); ais_free(mutable); return); #endif if (transient == FALSE && type > crm_msg_none && mutable->host.local && pcmk_children[type].conn == NULL && mutable->host.type == crm_msg_ais) { AIS_CHECK(mutable->sender.type != mutable->sender.pid, ais_err("Pid=%d, type=%d", mutable->sender.pid, mutable->sender.type)); ais_info("Recorded connection %p for %s/%d", conn, pcmk_children[type].name, pcmk_children[type].pid); pcmk_children[type].conn = conn; pcmk_children[type].async_conn = async_conn; /* Make sure they have the latest membership */ if (pcmk_children[type].flags & crm_flag_members) { char *update = pcmk_generate_membership_data(); g_hash_table_replace(membership_notify_list, async_conn, async_conn); ais_info("Sending membership update " U64T " to %s", membership_seq, pcmk_children[type].name); send_client_msg(async_conn, crm_class_members, crm_msg_none, update); } } else if (transient) { AIS_CHECK(mutable->sender.type == mutable->sender.pid, ais_err("Pid=%d, type=%d", mutable->sender.pid, mutable->sender.type)); g_hash_table_replace(ipc_client_list, async_conn, GUINT_TO_POINTER(mutable->sender.pid)); } mutable->sender.id = local_nodeid; mutable->sender.size = local_uname_len; memset(mutable->sender.uname, 0, MAX_NAME); memcpy(mutable->sender.uname, local_uname, mutable->sender.size); route_ais_message(mutable, TRUE); send_ipc_ack(conn); msg = NULL; ais_free(mutable); } int pcmk_shutdown(void) { int lpc = 0; static int phase = 0; static int max_wait = 0; static time_t next_log = 0; static int max = SIZEOF(pcmk_children); if (use_mcp) { if (pcmk_children[crm_msg_crmd].conn || pcmk_children[crm_msg_stonith_ng].conn) { time_t now = time(NULL); if (now > next_log) { next_log = now + 300; ais_notice ("Preventing Corosync shutdown. Please ensure Pacemaker is stopped first."); } return -1; } ais_notice("Unloading Pacemaker plugin"); return 0; } if (phase == 0) { ais_notice("Shuting down Pacemaker"); phase = max; } wait_active = FALSE; /* stop the wait loop */ for (; phase > 0; phase--) { /* dont stop anything with start_seq < 1 */ for (lpc = max - 1; lpc >= 0; lpc--) { if (phase != pcmk_children[lpc].start_seq) { continue; } if (pcmk_children[lpc].pid) { pid_t pid = 0; int status = 0; time_t now = time(NULL); if (pcmk_children[lpc].respawn) { max_wait = 5; /* 5 * 30s = 2.5 minutes... plenty once the crmd is gone */ next_log = now + 30; pcmk_children[lpc].respawn = FALSE; stop_child(&(pcmk_children[lpc]), SIGTERM); } pid = wait4(pcmk_children[lpc].pid, &status, WNOHANG, NULL); if (pid < 0) { ais_perror("Call to wait4(%s/%d) failed - treating it as stopped", pcmk_children[lpc].name, pcmk_children[lpc].pid); } else if (pid == 0) { if (now >= next_log) { max_wait--; next_log = now + 30; ais_notice("Still waiting for %s (pid=%d, seq=%d) to terminate...", pcmk_children[lpc].name, pcmk_children[lpc].pid, pcmk_children[lpc].start_seq); if (max_wait <= 0 && phase < pcmk_children[crm_msg_crmd].start_seq) { ais_err("Child %s taking too long to terminate, sending SIGKILL", pcmk_children[lpc].name); stop_child(&(pcmk_children[lpc]), SIGKILL); } } /* Return control to corosync */ return -1; } } /* cleanup */ ais_notice("%s confirmed stopped", pcmk_children[lpc].name); pcmk_children[lpc].async_conn = NULL; pcmk_children[lpc].conn = NULL; pcmk_children[lpc].pid = 0; } } send_cluster_id(); ais_notice("Shutdown complete"); /* TODO: Add back the logsys flush call once its written */ return 0; } struct member_loop_data { char *string; }; static void member_vote_count_fn(gpointer key, gpointer value, gpointer user_data) { crm_node_t *node = value; if (ais_str_eq(CRM_NODE_MEMBER, node->state)) { plugin_has_votes += node->votes; } } void member_loop_fn(gpointer key, gpointer value, gpointer user_data) { crm_node_t *node = value; struct member_loop_data *data = user_data; ais_trace("Dumping node %u", node->id); data->string = append_member(data->string, node); } char * pcmk_generate_membership_data(void) { int size = 0; struct member_loop_data data; size = 256; ais_malloc0(data.string, size); /* Ensure the list of active processes is up-to-date */ update_member(local_nodeid, 0, 0, -1, get_process_list(), local_uname, CRM_NODE_MEMBER, NULL); plugin_has_votes = 0; g_hash_table_foreach(membership_list, member_vote_count_fn, NULL); if (plugin_has_votes > plugin_expected_votes) { update_expected_votes(plugin_has_votes); } snprintf(data.string, size, "", membership_seq, plugin_has_quorum()? "true" : "false", plugin_expected_votes, plugin_has_votes); g_hash_table_foreach(membership_list, member_loop_fn, &data); size = strlen(data.string); data.string = realloc(data.string, size + 9); /* 9 = + nul */ sprintf(data.string + size, ""); return data.string; } void pcmk_nodes(void *conn, ais_void_ptr * msg) { char *data = pcmk_generate_membership_data(); void *async_conn = conn; /* send the ACK before we send any other messages * - but after we no longer need to access the message */ send_ipc_ack(conn); msg = NULL; if (async_conn) { send_client_msg(async_conn, crm_class_members, crm_msg_none, data); } ais_free(data); } void pcmk_remove_member(void *conn, ais_void_ptr * msg) { const AIS_Message *ais_msg = msg; char *data = get_ais_data(ais_msg); send_ipc_ack(conn); msg = NULL; if (data != NULL) { char *bcast = ais_concat("remove-peer", data, ':'); - send_cluster_msg(crm_msg_ais, NULL, bcast); + send_plugin_msg(crm_msg_ais, NULL, bcast); ais_info("Sent: %s", bcast); ais_free(bcast); } ais_free(data); } static void send_quorum_details(void *conn) { int size = 256; char *data = NULL; ais_malloc0(data, size); snprintf(data, size, "", membership_seq, plugin_has_quorum()? "true" : "false", plugin_expected_votes, plugin_has_votes); send_client_msg(conn, crm_class_quorum, crm_msg_none, data); ais_free(data); } void pcmk_quorum(void *conn, ais_void_ptr * msg) { char *dummy = NULL; const AIS_Message *ais_msg = msg; char *data = get_ais_data(ais_msg); send_ipc_ack(conn); msg = NULL; /* Make sure the current number of votes is accurate */ dummy = pcmk_generate_membership_data(); ais_free(dummy); /* Calls without data just want the current quorum details */ if (data != NULL && strlen(data) > 0) { int value = ais_get_int(data, NULL); update_expected_votes(value); } send_quorum_details(conn); ais_free(data); } void pcmk_notify(void *conn, ais_void_ptr * msg) { const AIS_Message *ais_msg = msg; char *data = get_ais_data(ais_msg); void *async_conn = conn; int enable = 0; int sender = ais_msg->sender.pid; send_ipc_ack(conn); msg = NULL; if (ais_str_eq("true", data)) { enable = 1; } ais_info("%s node notifications for child %d (%p)", enable ? "Enabling" : "Disabling", sender, async_conn); if (enable) { g_hash_table_replace(membership_notify_list, async_conn, async_conn); } else { g_hash_table_remove(membership_notify_list, async_conn); } ais_free(data); } void pcmk_nodeid(void *conn, ais_void_ptr * msg) { static int counter = 0; struct crm_ais_nodeid_resp_s resp; ais_trace("Sending local nodeid: %d to %p[%d]", local_nodeid, conn, counter); resp.header.id = crm_class_nodeid; resp.header.size = sizeof(struct crm_ais_nodeid_resp_s); resp.header.error = CS_OK; resp.id = local_nodeid; resp.counter = counter++; memset(resp.uname, 0, MAX_NAME); memcpy(resp.uname, local_uname, local_uname_len); memset(resp.cname, 0, MAX_NAME); memcpy(resp.cname, local_cname, local_cname_len); pcmk_api->ipc_response_send(conn, &resp, resp.header.size); } static gboolean ghash_send_update(gpointer key, gpointer value, gpointer data) { if (send_client_msg(value, crm_class_members, crm_msg_none, data) != 0) { /* remove it */ return TRUE; } return FALSE; } void send_member_notification(void) { char *update = pcmk_generate_membership_data(); ais_info("Sending membership update " U64T " to %d children", membership_seq, g_hash_table_size(membership_notify_list)); g_hash_table_foreach_remove(membership_notify_list, ghash_send_update, update); ais_free(update); } gboolean check_message_sanity(const AIS_Message * msg, const char *data) { gboolean sane = TRUE; gboolean repaired = FALSE; int dest = msg->host.type; int tmp_size = msg->header.size - sizeof(AIS_Message); if (sane && msg->header.size == 0) { ais_err("Message with no size"); sane = FALSE; } if (sane && msg->header.error != CS_OK) { ais_err("Message header contains an error: %d", msg->header.error); sane = FALSE; } AIS_CHECK(msg->header.size > sizeof(AIS_Message), ais_err("Message %d size too small: %d < %zu", msg->header.id, msg->header.size, sizeof(AIS_Message)); return FALSE); if (sane && ais_data_len(msg) != tmp_size) { ais_warn("Message payload size is incorrect: expected %d, got %d", ais_data_len(msg), tmp_size); sane = TRUE; } if (sane && ais_data_len(msg) == 0) { ais_err("Message with no payload"); sane = FALSE; } if (sane && data && msg->is_compressed == FALSE) { int str_size = strlen(data) + 1; if (ais_data_len(msg) != str_size) { int lpc = 0; ais_err("Message payload is corrupted: expected %d bytes, got %d", ais_data_len(msg), str_size); sane = FALSE; for (lpc = (str_size - 10); lpc < msg->size; lpc++) { if (lpc < 0) { lpc = 0; } ais_trace("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]); } } } if (sane == FALSE) { AIS_CHECK(sane, ais_err ("Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size)); } else if (repaired) { ais_err ("Repaired message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } else { ais_trace ("Verified message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } return sane; } static int delivered_transient = 0; static void deliver_transient_msg(gpointer key, gpointer value, gpointer user_data) { int pid = GPOINTER_TO_INT(value); AIS_Message *mutable = user_data; if (pid == mutable->host.type) { int rc = send_client_ipc(key, mutable); delivered_transient++; ais_info("Sent message to %s.%d (rc=%d)", ais_dest(&(mutable->host)), pid, rc); if (rc != 0) { ais_warn("Sending message to %s.%d failed (rc=%d)", ais_dest(&(mutable->host)), pid, rc); log_ais_message(LOG_DEBUG, mutable); } } } gboolean route_ais_message(const AIS_Message * msg, gboolean local_origin) { int rc = 0; int dest = msg->host.type; const char *reason = "unknown"; AIS_Message *mutable = ais_msg_copy(msg); static int service_id = SERVICE_ID_MAKE(PCMK_SERVICE_ID, 0); ais_trace("Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d)", mutable->id, ais_dest(&(mutable->host)), msg_type2text(dest), ais_dest(&(mutable->sender)), msg_type2text(mutable->sender.type), mutable->sender.pid, local_origin ? "false" : "true", ais_data_len((mutable))); if (local_origin == FALSE) { if (mutable->host.size == 0 || ais_str_eq(local_uname, mutable->host.uname)) { mutable->host.local = TRUE; } } if (check_message_sanity(mutable, mutable->data) == FALSE) { /* Dont send this message to anyone */ rc = 1; goto bail; } if (mutable->host.local) { void *conn = NULL; const char *lookup = NULL; if (dest == crm_msg_ais) { process_ais_message(mutable); goto bail; } else if (dest == crm_msg_lrmd) { /* lrmd messages are routed via the crm */ dest = crm_msg_crmd; } else if (dest == crm_msg_te) { /* te messages are routed via the crm */ dest = crm_msg_crmd; } else if (dest >= SIZEOF(pcmk_children)) { /* Transient client */ delivered_transient = 0; g_hash_table_foreach(ipc_client_list, deliver_transient_msg, mutable); if (delivered_transient) { ais_trace("Sent message to %d transient clients: %d", delivered_transient, dest); goto bail; } else { /* try the crmd */ ais_trace("Sending message to transient client %d via crmd", dest); dest = crm_msg_crmd; } } else if (dest == 0) { ais_err("Invalid destination: %d", dest); log_ais_message(LOG_ERR, mutable); log_printf(LOG_ERR, "%s", get_ais_data(mutable)); rc = 1; goto bail; } lookup = msg_type2text(dest); conn = pcmk_children[dest].async_conn; /* the cluster fails in weird and wonderfully obscure ways when this is not true */ AIS_ASSERT(ais_str_eq(lookup, pcmk_children[dest].name)); if (mutable->header.id == service_id) { mutable->header.id = 0; /* reset this back to zero for IPC messages */ } else if (mutable->header.id != 0) { ais_err("reset header id back to zero from %d", mutable->header.id); mutable->header.id = 0; /* reset this back to zero for IPC messages */ } reason = "ipc delivery failed"; rc = send_client_ipc(conn, mutable); } else if (local_origin) { /* forward to other hosts */ ais_trace("Forwarding to cluster"); reason = "cluster delivery failed"; - rc = send_cluster_msg_raw(mutable); + rc = send_plugin_msg_raw(mutable); } if (rc != 0) { ais_warn("Sending message to %s.%s failed: %s (rc=%d)", ais_dest(&(mutable->host)), msg_type2text(dest), reason, rc); log_ais_message(LOG_DEBUG, mutable); } bail: ais_free(mutable); return rc == 0 ? TRUE : FALSE; } int -send_cluster_msg_raw(const AIS_Message * ais_msg) +send_plugin_msg_raw(const AIS_Message * ais_msg) { int rc = 0; struct iovec iovec; static uint32_t msg_id = 0; AIS_Message *mutable = ais_msg_copy(ais_msg); AIS_ASSERT(local_nodeid != 0); AIS_ASSERT(ais_msg->header.size == (sizeof(AIS_Message) + ais_data_len(ais_msg))); if (mutable->id == 0) { msg_id++; AIS_CHECK(msg_id != 0 /* detect wrap-around */ , msg_id++; ais_err("Message ID wrapped around")); mutable->id = msg_id; } mutable->header.error = CS_OK; mutable->header.id = SERVICE_ID_MAKE(PCMK_SERVICE_ID, 0); mutable->sender.id = local_nodeid; mutable->sender.size = local_uname_len; memset(mutable->sender.uname, 0, MAX_NAME); memcpy(mutable->sender.uname, local_uname, mutable->sender.size); iovec.iov_base = (char *)mutable; iovec.iov_len = mutable->header.size; ais_trace("Sending message (size=%u)", (unsigned int)iovec.iov_len); rc = pcmk_api->totem_mcast(&iovec, 1, TOTEMPG_SAFE); if (rc == 0 && mutable->is_compressed == FALSE) { ais_trace("Message sent: %.80s", mutable->data); } AIS_CHECK(rc == 0, ais_err("Message not sent (%d): %.120s", rc, mutable->data)); ais_free(mutable); return rc; } #define min(x,y) (x)<(y)?(x):(y) void send_cluster_id(void) { int rc = 0; int len = 0; time_t now = time(NULL); struct iovec iovec; struct crm_identify_msg_s *msg = NULL; static time_t started = 0; static uint64_t first_seq = 0; AIS_ASSERT(local_nodeid != 0); if (started == 0) { started = now; first_seq = membership_seq; } if (local_born_on == 0) { if (started + 15 < now) { ais_debug("Born-on set to: " U64T " (age)", first_seq); local_born_on = first_seq; } else if (have_reliable_membership_id) { ais_debug("Born-on set to: " U64T " (peer)", membership_seq); local_born_on = membership_seq; } else { ais_debug("Leaving born-on unset: " U64T, membership_seq); } } ais_malloc0(msg, sizeof(struct crm_identify_msg_s)); msg->header.size = sizeof(struct crm_identify_msg_s); msg->id = local_nodeid; /* msg->header.error = CS_OK; */ msg->header.id = SERVICE_ID_MAKE(PCMK_SERVICE_ID, 1); len = min(local_uname_len, MAX_NAME - 1); memset(msg->uname, 0, MAX_NAME); memcpy(msg->uname, local_uname, len); len = min(strlen(VERSION), MAX_NAME - 1); memset(msg->version, 0, MAX_NAME); memcpy(msg->version, VERSION, len); msg->votes = 1; msg->pid = getpid(); msg->processes = get_process_list(); msg->born_on = local_born_on; ais_debug("Local update: id=%u, born=" U64T ", seq=" U64T "", local_nodeid, local_born_on, membership_seq); update_member(local_nodeid, local_born_on, membership_seq, msg->votes, msg->processes, NULL, NULL, VERSION); iovec.iov_base = (char *)msg; iovec.iov_len = msg->header.size; rc = pcmk_api->totem_mcast(&iovec, 1, TOTEMPG_SAFE); AIS_CHECK(rc == 0, ais_err("Message not sent (%d)", rc)); ais_free(msg); } static gboolean ghash_send_removal(gpointer key, gpointer value, gpointer data) { send_quorum_details(value); if (send_client_msg(value, crm_class_rmpeer, crm_msg_none, data) != 0) { /* remove it */ return TRUE; } return FALSE; } void ais_remove_peer(char *node_id) { uint32_t id = ais_get_int(node_id, NULL); crm_node_t *node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(id)); if (node == NULL) { ais_info("Peer %u is unknown", id); } else if (ais_str_eq(CRM_NODE_MEMBER, node->state)) { ais_warn("Peer %u/%s is still active", id, node->uname); } else if (g_hash_table_remove(membership_list, GUINT_TO_POINTER(id))) { plugin_expected_votes--; ais_notice("Removed dead peer %u from the membership list", id); ais_info("Sending removal of %u to %d children", id, g_hash_table_size(membership_notify_list)); g_hash_table_foreach_remove(membership_notify_list, ghash_send_removal, node_id); } else { ais_warn("Peer %u/%s was not removed", id, node->uname); } } gboolean process_ais_message(const AIS_Message * msg) { int len = ais_data_len(msg); char *data = get_ais_data(msg); do_ais_log(LOG_DEBUG, "Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d): %.90s", msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->sender.uname == local_uname ? "false" : "true", ais_data_len(msg), data); if (data && len > 12 && strncmp("remove-peer:", data, 12) == 0) { char *node = data + 12; ais_remove_peer(node); } ais_free(data); return TRUE; } static void member_dump_fn(gpointer key, gpointer value, gpointer user_data) { crm_node_t *node = value; ais_info(" node id:%u, uname=%s state=%s processes=%.16x born=" U64T " seen=" U64T " addr=%s version=%s", node->id, node->uname ? node->uname : "-unknown-", node->state, node->processes, node->born, node->last_seen, node->addr ? node->addr : "-unknown-", node->version ? node->version : "-unknown-"); } void pcmk_exec_dump(void) { /* Called after SIG_USR2 */ process_ais_conf(); ais_info("Local id: %u, uname: %s, born: " U64T, local_nodeid, local_uname, local_born_on); ais_info("Membership id: " U64T ", quorate: %s, expected: %u, actual: %u", membership_seq, plugin_has_quorum()? "true" : "false", plugin_expected_votes, plugin_has_votes); g_hash_table_foreach(membership_list, member_dump_fn, NULL); } diff --git a/lib/ais/plugin.h b/lib/ais/plugin.h index fa817cd48a..14c2f871e4 100755 --- a/lib/ais/plugin.h +++ b/lib/ais/plugin.h @@ -1,25 +1,25 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #ifndef AIS_CRM_PLUGIN__H # define AIS_CRM_PLUGIN__H extern GHashTable *membership_notify_list; -extern int send_cluster_msg_raw(const AIS_Message * ais_msg); +extern int send_plugin_msg_raw(const AIS_Message * ais_msg); #endif diff --git a/lib/ais/utils.c b/lib/ais/utils.c index 016e48d256..32400937ad 100644 --- a/lib/ais/utils.c +++ b/lib/ais/utils.c @@ -1,752 +1,752 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "./utils.h" #include "./plugin.h" struct pcmk_env_s pcmk_env; void log_ais_message(int level, const AIS_Message * msg) { char *data = get_ais_data(msg); qb_log_from_external_source(__func__, __FILE__, "Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d): %.90s", level, __LINE__, 0, msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->sender.uname == local_uname ? "false" : "true", ais_data_len(msg), data); /* do_ais_log(level, */ /* "Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d): %.90s", */ /* msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), */ /* ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), */ /* msg->sender.pid, */ /* msg->sender.uname==local_uname?"false":"true", */ /* ais_data_len(msg), data); */ ais_free(data); } /* static gboolean ghash_find_by_uname(gpointer key, gpointer value, gpointer user_data) { crm_node_t *node = value; int id = GPOINTER_TO_INT(user_data); if (node->id == id) { return TRUE; } return FALSE; } */ static int ais_string_to_boolean(const char *s) { int rc = 0; if (s == NULL) { return rc; } if (strcasecmp(s, "true") == 0 || strcasecmp(s, "on") == 0 || strcasecmp(s, "yes") == 0 || strcasecmp(s, "y") == 0 || strcasecmp(s, "1") == 0) { rc = 1; } return rc; } static char *opts_default[] = { NULL, NULL }; static char *opts_vgrind[] = { NULL, NULL, NULL, NULL, NULL }; gboolean spawn_child(crm_child_t * child) { int lpc = 0; uid_t uid = 0; struct rlimit oflimits; gboolean use_valgrind = FALSE; gboolean use_callgrind = FALSE; const char *devnull = "/dev/null"; const char *env_valgrind = getenv("PCMK_valgrind_enabled"); const char *env_callgrind = getenv("PCMK_callgrind_enabled"); if (child->command == NULL) { ais_info("Nothing to do for child \"%s\"", child->name); return TRUE; } if (ais_string_to_boolean(env_callgrind)) { use_callgrind = TRUE; use_valgrind = TRUE; } else if (env_callgrind != NULL && strstr(env_callgrind, child->name)) { use_callgrind = TRUE; use_valgrind = TRUE; } else if (ais_string_to_boolean(env_valgrind)) { use_valgrind = TRUE; } else if (env_valgrind != NULL && strstr(env_valgrind, child->name)) { use_valgrind = TRUE; } if (use_valgrind && strlen(VALGRIND_BIN) == 0) { ais_warn("Cannot enable valgrind for %s:" " The location of the valgrind binary is unknown", child->name); use_valgrind = FALSE; } if (child->uid) { if (pcmk_user_lookup(child->uid, &uid, NULL) < 0) { ais_err("Invalid uid (%s) specified for %s", child->uid, child->name); return FALSE; } } child->pid = fork(); AIS_ASSERT(child->pid != -1); if (child->pid > 0) { /* parent */ ais_info("Forked child %d for process %s%s", child->pid, child->name, use_valgrind ? " (valgrind enabled: " VALGRIND_BIN ")" : ""); } else { /* Setup the two alternate arg arrarys */ opts_vgrind[0] = ais_strdup(VALGRIND_BIN); if (use_callgrind) { opts_vgrind[1] = ais_strdup("--tool=callgrind"); opts_vgrind[2] = ais_strdup("--callgrind-out-file=" CRM_STATE_DIR "/callgrind.out.%p"); opts_vgrind[3] = ais_strdup(child->command); opts_vgrind[4] = NULL; } else { opts_vgrind[1] = ais_strdup(child->command); opts_vgrind[2] = NULL; opts_vgrind[3] = NULL; opts_vgrind[4] = NULL; } opts_default[0] = ais_strdup(child->command);; #if 0 /* Dont set the group for now - it prevents connection to the cluster */ if (gid && setgid(gid) < 0) { ais_perror("Could not set group to %d", gid); } #endif if (uid) { struct passwd *pwent = getpwuid(uid); if(pwent == NULL) { ais_perror("Cannot get password entry of uid: %d", uid); } else if (initgroups(pwent->pw_name, pwent->pw_gid) < 0) { ais_perror("Cannot initalize groups for %s (uid=%d)", pwent->pw_name, uid); } } if (uid && setuid(uid) < 0) { ais_perror("Could not set user to %d (%s)", uid, child->uid); } /* Close all open file descriptors */ getrlimit(RLIMIT_NOFILE, &oflimits); for (; lpc < oflimits.rlim_cur; lpc++) { close(lpc); } (void)open(devnull, O_RDONLY); /* Stdin: fd 0 */ (void)open(devnull, O_WRONLY); /* Stdout: fd 1 */ (void)open(devnull, O_WRONLY); /* Stderr: fd 2 */ /* *INDENT-OFF* */ setenv("HA_COMPRESSION", "bz2", 1); setenv("HA_cluster_type", "openais", 1); setenv("HA_debug", pcmk_env.debug, 1); setenv("HA_logfacility", pcmk_env.syslog, 1); setenv("HA_LOGFACILITY", pcmk_env.syslog, 1); setenv("HA_use_logd", pcmk_env.use_logd, 1); setenv("HA_quorum_type", pcmk_env.quorum, 1); /* *INDENT-ON* */ if (pcmk_env.logfile) { setenv("HA_debugfile", pcmk_env.logfile, 1); } if (use_valgrind) { (void)execvp(VALGRIND_BIN, opts_vgrind); } else { (void)execvp(child->command, opts_default); } ais_perror("FATAL: Cannot exec %s", child->command); exit(100); } return TRUE; } gboolean stop_child(crm_child_t * child, int signal) { if (signal == 0) { signal = SIGTERM; } if (child->command == NULL) { ais_info("Nothing to do for child \"%s\"", child->name); return TRUE; } ais_debug("Stopping CRM child \"%s\"", child->name); if (child->pid <= 0) { ais_trace("Client %s not running", child->name); return TRUE; } errno = 0; if (kill(child->pid, signal) == 0) { ais_notice("Sent -%d to %s: [%d]", signal, child->name, child->pid); } else { ais_perror("Sent -%d to %s: [%d]", signal, child->name, child->pid); } return TRUE; } void destroy_ais_node(gpointer data) { crm_node_t *node = data; ais_info("Destroying entry for node %u", node->id); ais_free(node->addr); ais_free(node->uname); ais_free(node->state); ais_free(node); } int update_member(unsigned int id, uint64_t born, uint64_t seq, int32_t votes, uint32_t procs, const char *uname, const char *state, const char *version) { int changed = 0; crm_node_t *node = NULL; node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(id)); if (node == NULL) { ais_malloc0(node, sizeof(crm_node_t)); ais_info("Creating entry for node %u born on " U64T "", id, seq); node->id = id; node->addr = NULL; node->state = ais_strdup("unknown"); g_hash_table_insert(membership_list, GUINT_TO_POINTER(id), node); node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(id)); } AIS_ASSERT(node != NULL); if (seq != 0) { node->last_seen = seq; } if (born != 0 && node->born != born) { changed = TRUE; node->born = born; ais_info("%p Node %u (%s) born on: " U64T, node, id, uname, born); } if (version != NULL) { ais_free(node->version); node->version = ais_strdup(version); } if (uname != NULL) { if (node->uname == NULL || ais_str_eq(node->uname, uname) == FALSE) { ais_info("%p Node %u now known as %s (was: %s)", node, id, uname, node->uname); ais_free(node->uname); node->uname = ais_strdup(uname); changed = TRUE; } } if (procs != 0 && procs != node->processes) { ais_info("Node %s now has process list: %.32x (%u)", node->uname, procs, procs); node->processes = procs; changed = TRUE; } if (votes >= 0 && votes != node->votes) { ais_info("Node %s now has %d quorum votes (was %d)", node->uname, votes, node->votes); node->votes = votes; changed = TRUE; } if (state != NULL) { if (node->state == NULL || ais_str_eq(node->state, state) == FALSE) { ais_free(node->state); node->state = ais_strdup(state); ais_info("Node %u/%s is now: %s", id, node->uname ? node->uname : "unknown", state); changed = TRUE; } } return changed; } void delete_member(uint32_t id, const char *uname) { if (uname == NULL) { g_hash_table_remove(membership_list, GUINT_TO_POINTER(id)); return; } ais_err("Deleting by uname is not yet supported"); } const char * member_uname(uint32_t id) { crm_node_t *node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(id)); if (node == NULL) { return ".unknown."; } if (node->uname == NULL) { return ".pending."; } return node->uname; } char * append_member(char *data, crm_node_t * node) { int size = 1; /* nul */ int offset = 0; static int fixed_len = 4 + 8 + 7 + 6 + 6 + 7 + 11; if (data) { size = strlen(data); } offset = size; size += fixed_len; size += 32; /* node->id */ size += 100; /* node->seq, node->born */ size += strlen(node->state); if (node->uname) { size += (7 + strlen(node->uname)); } if (node->addr) { size += (6 + strlen(node->addr)); } if (node->version) { size += (9 + strlen(node->version)); } data = realloc(data, size); offset += snprintf(data + offset, size - offset, "id); if (node->uname) { offset += snprintf(data + offset, size - offset, "uname=\"%s\" ", node->uname); } offset += snprintf(data + offset, size - offset, "state=\"%s\" ", node->state); offset += snprintf(data + offset, size - offset, "born=\"" U64T "\" ", node->born); offset += snprintf(data + offset, size - offset, "seen=\"" U64T "\" ", node->last_seen); offset += snprintf(data + offset, size - offset, "votes=\"%d\" ", node->votes); offset += snprintf(data + offset, size - offset, "processes=\"%u\" ", node->processes); if (node->addr) { offset += snprintf(data + offset, size - offset, "addr=\"%s\" ", node->addr); } if (node->version) { offset += snprintf(data + offset, size - offset, "version=\"%s\" ", node->version); } offset += snprintf(data + offset, size - offset, "/>"); return data; } void swap_sender(AIS_Message * msg) { int tmp = 0; char tmp_s[256]; tmp = msg->host.type; msg->host.type = msg->sender.type; msg->sender.type = tmp; tmp = msg->host.type; msg->host.size = msg->sender.type; msg->sender.type = tmp; memcpy(tmp_s, msg->host.uname, 256); memcpy(msg->host.uname, msg->sender.uname, 256); memcpy(msg->sender.uname, tmp_s, 256); } char * get_ais_data(const AIS_Message * msg) { int rc = BZ_OK; char *uncompressed = NULL; unsigned int new_size = msg->size + 1; if (msg->is_compressed == FALSE) { uncompressed = strdup(msg->data); } else { ais_malloc0(uncompressed, new_size); rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, (char *)msg->data, msg->compressed_size, 1, 0); if (rc != BZ_OK) { ais_info("rc=%d, new=%u expected=%u", rc, new_size, msg->size); } AIS_ASSERT(rc == BZ_OK); AIS_ASSERT(new_size == msg->size); } return uncompressed; } int -send_cluster_msg(enum crm_ais_msg_types type, const char *host, const char *data) +send_plugin_msg(enum crm_ais_msg_types type, const char *host, const char *data) { int rc = 0; int data_len = 0; AIS_Message *ais_msg = NULL; int total_size = sizeof(AIS_Message); AIS_ASSERT(local_nodeid != 0); if (data != NULL) { data_len = 1 + strlen(data); total_size += data_len; } ais_malloc0(ais_msg, total_size); ais_msg->header.size = total_size; ais_msg->header.error = CS_OK; ais_msg->header.id = 0; ais_msg->size = data_len; ais_msg->sender.type = crm_msg_ais; if (data != NULL) { memcpy(ais_msg->data, data, data_len); } ais_msg->host.type = type; ais_msg->host.id = 0; if (host) { ais_msg->host.size = strlen(host); memset(ais_msg->host.uname, 0, MAX_NAME); memcpy(ais_msg->host.uname, host, ais_msg->host.size); /* ais_msg->host.id = nodeid_lookup(host); */ } else { ais_msg->host.type = type; ais_msg->host.size = 0; memset(ais_msg->host.uname, 0, MAX_NAME); } - rc = send_cluster_msg_raw(ais_msg); + rc = send_plugin_msg_raw(ais_msg); ais_free(ais_msg); return rc; } extern struct corosync_api_v1 *pcmk_api; int send_client_ipc(void *conn, const AIS_Message * ais_msg) { int rc = -1; if (conn == NULL) { rc = -2; } else if (!libais_connection_active(conn)) { ais_warn("Connection no longer active"); rc = -3; /* } else if ((queue->size - 1) == queue->used) { */ /* ais_err("Connection is throttled: %d", queue->size); */ } else { #if SUPPORT_COROSYNC rc = pcmk_api->ipc_dispatch_send(conn, ais_msg, ais_msg->header.size); #endif } return rc; } int send_client_msg(void *conn, enum crm_ais_msg_class class, enum crm_ais_msg_types type, const char *data) { int rc = 0; int data_len = 0; int total_size = sizeof(AIS_Message); AIS_Message *ais_msg = NULL; static int msg_id = 0; AIS_ASSERT(local_nodeid != 0); msg_id++; AIS_ASSERT(msg_id != 0 /* wrap-around */ ); if (data != NULL) { data_len = 1 + strlen(data); } total_size += data_len; ais_malloc0(ais_msg, total_size); ais_msg->id = msg_id; ais_msg->header.id = class; ais_msg->header.size = total_size; ais_msg->header.error = CS_OK; ais_msg->size = data_len; if (data != NULL) { memcpy(ais_msg->data, data, data_len); } ais_msg->host.size = 0; ais_msg->host.type = type; memset(ais_msg->host.uname, 0, MAX_NAME); ais_msg->host.id = 0; ais_msg->sender.type = crm_msg_ais; ais_msg->sender.size = local_uname_len; memset(ais_msg->sender.uname, 0, MAX_NAME); memcpy(ais_msg->sender.uname, local_uname, ais_msg->sender.size); ais_msg->sender.id = local_nodeid; rc = send_client_ipc(conn, ais_msg); if (rc != 0) { ais_warn("Sending message to %s failed: %d", msg_type2text(type), rc); log_ais_message(LOG_DEBUG, ais_msg); } ais_free(ais_msg); return rc; } char * ais_concat(const char *prefix, const char *suffix, char join) { int len = 0; char *new_str = NULL; AIS_ASSERT(prefix != NULL); AIS_ASSERT(suffix != NULL); len = strlen(prefix) + strlen(suffix) + 2; ais_malloc0(new_str, (len)); sprintf(new_str, "%s%c%s", prefix, join, suffix); new_str[len - 1] = 0; return new_str; } hdb_handle_t config_find_init(struct corosync_api_v1 * config, char *name) { hdb_handle_t local_handle = 0; #if SUPPORT_COROSYNC config->object_find_create(OBJECT_PARENT_HANDLE, name, strlen(name), &local_handle); ais_info("Local handle: %lld for %s", (long long)local_handle, name); #endif return local_handle; } hdb_handle_t config_find_next(struct corosync_api_v1 * config, char *name, hdb_handle_t top_handle) { int rc = 0; hdb_handle_t local_handle = 0; #if SUPPORT_COROSYNC rc = config->object_find_next(top_handle, &local_handle); #endif if (rc < 0) { ais_info("No additional configuration supplied for: %s", name); local_handle = 0; } else { ais_info("Processing additional %s options...", name); } return local_handle; } void config_find_done(struct corosync_api_v1 *config, hdb_handle_t local_handle) { #if SUPPORT_COROSYNC config->object_find_destroy(local_handle); #endif } int get_config_opt(struct corosync_api_v1 *config, hdb_handle_t object_service_handle, char *key, char **value, const char *fallback) { char *env_key = NULL; *value = NULL; if (object_service_handle > 0) { config->object_key_get(object_service_handle, key, strlen(key), (void **)value, NULL); } if (*value) { ais_info("Found '%s' for option: %s", *value, key); return 0; } env_key = ais_concat("HA", key, '_'); *value = getenv(env_key); ais_free(env_key); if (*value) { ais_info("Found '%s' in ENV for option: %s", *value, key); return 0; } if (fallback) { ais_info("Defaulting to '%s' for option: %s", fallback, key); *value = ais_strdup(fallback); } else { ais_info("No default for option: %s", key); } return -1; } int ais_get_boolean(const char *value) { if (value == NULL) { return 0; } else if (strcasecmp(value, "true") == 0 || strcasecmp(value, "on") == 0 || strcasecmp(value, "yes") == 0 || strcasecmp(value, "y") == 0 || strcasecmp(value, "1") == 0) { return 1; } return 0; } long long ais_get_int(const char *text, char **end_text) { long long result = -1; char *local_end_text = NULL; errno = 0; if (text != NULL) { #ifdef ANSI_ONLY if (end_text != NULL) { result = strtol(text, end_text, 10); } else { result = strtol(text, &local_end_text, 10); } #else if (end_text != NULL) { result = strtoll(text, end_text, 10); } else { result = strtoll(text, &local_end_text, 10); } #endif if (errno == EINVAL) { ais_err("Conversion of %s failed", text); result = -1; } else if (errno == ERANGE) { ais_err("Conversion of %s was clipped: %lld", text, result); } else if (errno != 0) { ais_perror("Conversion of %s failed:", text); } if (local_end_text != NULL && local_end_text[0] != '\0') { ais_err("Characters left over after parsing '%s': '%s'", text, local_end_text); } } return result; } #define PW_BUFFER_LEN 500 int pcmk_user_lookup(const char *name, uid_t * uid, gid_t * gid) { int rc = -1; char *buffer = NULL; struct passwd pwd; struct passwd *pwentry = NULL; ais_malloc0(buffer, PW_BUFFER_LEN); getpwnam_r(name, &pwd, buffer, PW_BUFFER_LEN, &pwentry); if (pwentry) { rc = 0; if (uid) { *uid = pwentry->pw_uid; } if (gid) { *gid = pwentry->pw_gid; } ais_debug("Cluster user %s has uid=%d gid=%d", name, pwentry->pw_uid, pwentry->pw_gid); } else { ais_err("Cluster user %s does not exist", name); } ais_free(buffer); return rc; } diff --git a/lib/ais/utils.h b/lib/ais/utils.h index 19afc49d28..f576d4e4df 100644 --- a/lib/ais/utils.h +++ b/lib/ais/utils.h @@ -1,266 +1,266 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #ifndef AIS_CRM_UTILS__H # define AIS_CRM_UTILS__H # include # ifdef AIS_WHITETANK # include # include # define OPENAIS_EXTERNAL_SERVICE insane_ais_header_hack_in__totem_h # include # include # include # include # include # include # include # define COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED OPENAIS_FLOW_CONTROL_NOT_REQUIRED /* New names for old things */ typedef unsigned int hdb_handle_t; # define corosync_api_v1 objdb_iface_ver0 # define corosync_lib_handler openais_lib_handler # define corosync_exec_handler openais_exec_handler # define corosync_service_engine openais_service_handler extern int openais_response_send(void *conn, void *msg, int mlen); extern int openais_dispatch_send(void *conn, void *msg, int mlen); # endif # if !CS_USES_LIBQB # include # endif # if LIBQB_LOGGING # include # endif # ifdef SUPPORT_COROSYNC # include # include # include # include # include # include # include LOGSYS_DECLARE_SUBSYS("pcmk"); # endif /* #include "plugin.h" */ # define SIZEOF(a) (sizeof(a) / sizeof(a[0])) typedef struct crm_child_s { int pid; long flag; long flags; int start_seq; int respawn_count; gboolean respawn; const char *name; const char *uid; const char *command; void *conn; void *async_conn; } crm_child_t; extern void destroy_ais_node(gpointer data); extern void delete_member(uint32_t id, const char *uname); extern int update_member(unsigned int id, uint64_t born, uint64_t seq, int32_t votes, uint32_t procs, const char *uname, const char *state, const char *version); extern const char *member_uname(uint32_t id); extern char *append_member(char *data, crm_node_t * node); extern void member_loop_fn(gpointer key, gpointer value, gpointer user_data); extern gboolean stop_child(crm_child_t * child, int signal); extern gboolean spawn_child(crm_child_t * child); extern void swap_sender(AIS_Message * msg); extern char *get_ais_data(const AIS_Message * msg); extern gboolean route_ais_message(const AIS_Message * msg, gboolean local); extern gboolean process_ais_message(const AIS_Message * msg); -extern int send_cluster_msg(enum crm_ais_msg_types type, const char *host, const char *data); +extern int send_plugin_msg(enum crm_ais_msg_types type, const char *host, const char *data); extern int send_client_msg(void *conn, enum crm_ais_msg_class class, enum crm_ais_msg_types type, const char *data); extern void send_member_notification(void); extern void log_ais_message(int level, const AIS_Message * msg); extern hdb_handle_t config_find_init(struct corosync_api_v1 *config, char *name); extern hdb_handle_t config_find_next(struct corosync_api_v1 *config, char *name, hdb_handle_t top_handle); extern void config_find_done(struct corosync_api_v1 *config, hdb_handle_t local_handle); extern int get_config_opt(struct corosync_api_v1 *config, hdb_handle_t object_service_handle, char *key, char **value, const char *fallback); extern int ais_get_boolean(const char *s); extern long long ais_get_int(const char *text, char **end_text); extern char *ais_concat(const char *prefix, const char *suffix, char join); extern int send_client_ipc(void *conn, const AIS_Message * ais_msg); extern GHashTable *membership_list; extern pthread_t crm_wait_thread; extern int plugin_log_level; extern char *local_uname; extern int local_uname_len; extern unsigned int local_nodeid; static inline const char * level2char(int level) { switch (level) { case LOG_CRIT: return "CRIT"; case LOG_ERR: return "ERROR"; case LOG_WARNING: return "WARN"; case LOG_NOTICE: return "notice"; case LOG_INFO: return "info"; } return "debug"; } # if LIBQB_LOGGING # define do_ais_log(level, fmt, args...) do { \ if(plugin_log_level < (level)) { \ continue; \ } else if((level) > LOG_DEBUG) { \ log_printf(LOG_DEBUG, "debug%d: %s: " fmt, \ level-LOG_INFO, __PRETTY_FUNCTION__ , ##args); \ } else { \ qb_log_from_external_source(__func__, __FILE__, \ fmt, level, __LINE__, 0, ##args); \ } \ } while(0) # else # define do_ais_log(level, fmt, args...) do { \ if(plugin_log_level < (level)) { \ continue; \ } else if((level) > LOG_DEBUG) { \ log_printf(LOG_DEBUG, "debug%d: %s: " fmt, \ level-LOG_INFO, __PRETTY_FUNCTION__ , ##args); \ } else { \ log_printf(level, "%s: %s: " fmt, level2char(level), \ __PRETTY_FUNCTION__ , ##args); \ } \ } while(0) # endif # define ais_perror(fmt, args...) log_printf( \ LOG_ERR, "%s: " fmt ": (%d) %s", \ __PRETTY_FUNCTION__ , ##args, errno, strerror(errno)) # define ais_crit(fmt, args...) do_ais_log(LOG_CRIT, fmt , ##args) # define ais_err(fmt, args...) do_ais_log(LOG_ERR, fmt , ##args) # define ais_warn(fmt, args...) do_ais_log(LOG_WARNING, fmt , ##args) # define ais_notice(fmt, args...) do_ais_log(LOG_NOTICE, fmt , ##args) # define ais_info(fmt, args...) do_ais_log(LOG_INFO, fmt , ##args) # define ais_debug(fmt, args...) do_ais_log(LOG_DEBUG, fmt , ##args) # define ais_trace(fmt, args...) do_ais_log(LOG_DEBUG+1, fmt , ##args) # define ais_malloc0(malloc_obj, length) do { \ malloc_obj = malloc(length); \ if(malloc_obj == NULL) { \ abort(); \ } \ memset(malloc_obj, 0, length); \ } while(0) # define ais_free(obj) do { \ if(obj) { \ free(obj); \ obj = NULL; \ } \ } while(0) # define AIS_ASSERT(expr) if((expr) == FALSE) { \ ais_crit("Assertion failure line %d: %s", __LINE__, #expr); \ abort(); \ } # define AIS_CHECK(expr, failure_action) if((expr) == FALSE) { \ int p = fork(); \ if(p == 0) { abort(); } \ ais_err("Child %d spawned to record non-fatal assertion failure line %d: %s", p, __LINE__, #expr); \ failure_action; \ } static inline char * ais_strdup(const char *src) { char *dup = NULL; if (src == NULL) { return NULL; } ais_malloc0(dup, strlen(src) + 1); return strcpy(dup, src); } static inline gboolean ais_str_eq(const char *a, const char *b) { if (a == NULL || b == NULL) { return FALSE; } else if (a == b) { return TRUE; } else if (strcasecmp(a, b) == 0) { return TRUE; } return FALSE; } static inline int libais_connection_active(void *conn) { if (conn != NULL) { return TRUE; } return FALSE; } struct pcmk_env_s { const char *debug; const char *syslog; const char *logfile; const char *use_logd; const char *quorum; }; extern struct pcmk_env_s pcmk_env; extern int pcmk_user_lookup(const char *name, uid_t * uid, gid_t * gid); #endif