diff --git a/lib/ais/plugin.c b/lib/ais/plugin.c index 435726cf2c..4b3361e94b 100644 --- a/lib/ais/plugin.c +++ b/lib/ais/plugin.c @@ -1,1455 +1,1449 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "plugin.h" #include "utils.h" #ifdef AIS_COROSYNC # include #endif #include #include #include #include #include #include plugin_init_type *crm_api = NULL; uint32_t plugin_has_votes = 0; uint32_t plugin_expected_votes = 1024; int use_mgmtd = 0; int plugin_log_level = LOG_DEBUG; char *local_uname = NULL; int local_uname_len = 0; uint32_t local_nodeid = 0; char *ipc_channel_name = NULL; uint64_t membership_seq = 0; pthread_t crm_wait_thread; gboolean wait_active = TRUE; gboolean have_reliable_membership_id = FALSE; GHashTable *membership_list = NULL; GHashTable *membership_notify_list = NULL; #define MAX_RESPAWN 100 #define crm_flag_none 0x00000000 #define crm_flag_members 0x00000001 struct crm_identify_msg_s { mar_req_header_t header __attribute__((aligned(8))); uint32_t id; uint32_t pid; int32_t votes; uint32_t processes; char uname[256]; char version[256]; uint64_t born_on; } __attribute__((packed)); static crm_child_t crm_children[] = { { 0, crm_proc_none, crm_flag_none, 0, 0, FALSE, "none", NULL, NULL, NULL, NULL }, { 0, crm_proc_ais, crm_flag_none, 0, 0, FALSE, "ais", NULL, NULL, NULL, NULL }, { 0, crm_proc_lrmd, crm_flag_none, 3, 0, TRUE, "lrmd", NULL, HA_LIBHBDIR"/lrmd", NULL, NULL }, { 0, crm_proc_cib, crm_flag_members, 2, 0, TRUE, "cib", HA_CCMUSER, HA_LIBHBDIR"/cib", NULL, NULL }, { 0, crm_proc_crmd, crm_flag_members, 6, 0, TRUE, "crmd", HA_CCMUSER, HA_LIBHBDIR"/crmd", NULL, NULL }, { 0, crm_proc_attrd, crm_flag_none, 4, 0, TRUE, "attrd", HA_CCMUSER, HA_LIBHBDIR"/attrd", NULL, NULL }, { 0, crm_proc_stonithd, crm_flag_none, 1, 0, TRUE, "stonithd", NULL, HA_LIBHBDIR"/stonithd", NULL, NULL }, { 0, crm_proc_pe, crm_flag_none, 5, 0, TRUE, "pengine", HA_CCMUSER, HA_LIBHBDIR"/pengine", NULL, NULL }, { 0, crm_proc_mgmtd, crm_flag_none, 7, 0, TRUE, "mgmtd", NULL, HA_LIBHBDIR"/mgmtd", NULL, NULL }, }; void send_cluster_id(void); int send_cluster_msg_raw(AIS_Message *ais_msg); char *ais_generate_membership_data(void); extern totempg_groups_handle openais_group_handle; void global_confchg_fn ( enum totem_configuration_type configuration_type, unsigned int *member_list, int member_list_entries, unsigned int *left_list, int left_list_entries, unsigned int *joined_list, int joined_list_entries, struct memb_ring_id *ring_id); #ifdef AIS_WHITETANK int crm_exec_init_fn (struct objdb_iface_ver0 *objdb); int crm_exec_exit_fn (struct objdb_iface_ver0 *objdb); int crm_config_init_fn(struct objdb_iface_ver0 *objdb); #endif #ifdef AIS_COROSYNC int crm_exec_init_fn (struct corosync_api_v1 *corosync_api); int crm_exec_exit_fn (void); int crm_config_init_fn(struct corosync_api_v1 *corosync_api); #endif int ais_ipc_client_connect_callback (void *conn); int ais_ipc_client_exit_callback (void *conn); void ais_cluster_message_swab(void *msg); void ais_cluster_message_callback(void *message, unsigned int nodeid); void ais_ipc_message_callback(void *conn, void *msg); void ais_our_nodeid(void *conn, void *msg); void ais_quorum_query(void *conn, void *msg); void ais_node_list_query(void *conn, void *msg); void ais_manage_notification(void *conn, void *msg); void ais_plugin_remove_member(void *conn, void *msg); void ais_plugin_quorum(void *conn, void *msg); void ais_cluster_id_swab(void *msg); void ais_cluster_id_callback(void *message, unsigned int nodeid); static plugin_lib_handler crm_lib_service[] = { { /* 0 */ .lib_handler_fn = ais_ipc_message_callback, .response_size = sizeof (mar_res_header_t), .response_id = CRM_MESSAGE_IPC_ACK, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 1 */ .lib_handler_fn = ais_node_list_query, .response_size = sizeof (mar_res_header_t), .response_id = CRM_MESSAGE_IPC_ACK, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 2 */ .lib_handler_fn = ais_manage_notification, .response_size = sizeof (mar_res_header_t), .response_id = CRM_MESSAGE_IPC_ACK, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 3 */ .lib_handler_fn = ais_our_nodeid, .response_size = sizeof (struct crm_ais_nodeid_resp_s), .response_id = crm_class_nodeid, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 4 */ .lib_handler_fn = ais_plugin_remove_member, .response_size = sizeof (mar_res_header_t), .response_id = CRM_MESSAGE_IPC_ACK, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 5 */ .lib_handler_fn = ais_plugin_quorum, - .response_size = sizeof (struct crm_ais_quorum_resp_s), + .response_size = sizeof (mar_res_header_t), .response_id = CRM_MESSAGE_IPC_ACK, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, }; static plugin_exec_handler crm_exec_service[] = { { /* 0 */ .exec_handler_fn = ais_cluster_message_callback, .exec_endian_convert_fn = ais_cluster_message_swab }, { /* 1 */ .exec_handler_fn = ais_cluster_id_callback, .exec_endian_convert_fn = ais_cluster_id_swab } }; static void crm_exec_dump_fn(void) { ais_err("Called after SIG_USR2"); } /* * Exports the interface for the service */ plugin_service_handler crm_service_handler = { .name = (unsigned char *)"Pacemaker Cluster Manager", .id = CRM_SERVICE, .private_data_size = 0, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED, .lib_init_fn = ais_ipc_client_connect_callback, .lib_exit_fn = ais_ipc_client_exit_callback, .exec_init_fn = crm_exec_init_fn, .exec_exit_fn = crm_exec_exit_fn, .config_init_fn = crm_config_init_fn, #ifdef AIS_WHITETANK .lib_service = crm_lib_service, .lib_service_count = sizeof (crm_lib_service) / sizeof (plugin_lib_handler), .exec_service = crm_exec_service, .exec_service_count = sizeof (crm_exec_service) / sizeof (plugin_exec_handler), #endif #ifdef AIS_COROSYNC .lib_engine = crm_lib_service, .lib_engine_count = sizeof (crm_lib_service) / sizeof (plugin_lib_handler), .exec_engine = crm_exec_service, .exec_engine_count = sizeof (crm_exec_service) / sizeof (plugin_exec_handler), #endif .confchg_fn = global_confchg_fn, .exec_dump_fn = crm_exec_dump_fn, /* void (*sync_init) (void); */ /* int (*sync_process) (void); */ /* void (*sync_activate) (void); */ /* void (*sync_abort) (void); */ }; /* * Dynamic Loader definition */ plugin_service_handler *crm_get_handler_ver0 (void); #ifdef AIS_WHITETANK struct openais_service_handler_iface_ver0 crm_service_handler_iface = { .openais_get_service_handler_ver0 = crm_get_handler_ver0 }; #endif #ifdef AIS_COROSYNC struct corosync_service_engine_iface_ver0 crm_service_handler_iface = { .corosync_get_service_engine_ver0 = crm_get_handler_ver0 }; #endif static struct lcr_iface openais_crm_ver0[1] = { { .name = "pacemaker", .version = 0, .versions_replace = 0, .versions_replace_count = 0, .dependencies = 0, .dependency_count = 0, .constructor = NULL, .destructor = NULL, .interfaces = NULL } }; static struct lcr_comp crm_comp_ver0 = { .iface_count = 1, .ifaces = openais_crm_ver0 }; plugin_service_handler *crm_get_handler_ver0 (void) { return (&crm_service_handler); } __attribute__ ((constructor)) static void register_this_component (void) { lcr_interfaces_set (&openais_crm_ver0[0], &crm_service_handler_iface); lcr_component_register (&crm_comp_ver0); } static int plugin_has_quorum(void) { if(plugin_expected_votes < (2 * plugin_has_votes) + 1) { return 1; } return 0; } #ifdef AIS_COROSYNC #include #endif /* Create our own local copy of the config so we can navigate it */ static void process_ais_conf(void) { char *value = NULL; unsigned int top_handle = 0; unsigned int local_handle = 0; ais_info("Reading configure"); top_handle = config_find_init(crm_api, "logging"); local_handle = config_find_next(crm_api, "logging", top_handle); get_config_opt(crm_api, local_handle, "debug", &value, "on"); if(ais_get_boolean(value)) { plugin_log_level = LOG_DEBUG; setenv("HA_debug", "1", 1); } else { plugin_log_level = LOG_INFO; setenv("HA_debug", "0", 1); } get_config_opt(crm_api, local_handle, "to_syslog", &value, "on"); if(ais_get_boolean(value)) { get_config_opt(crm_api, local_handle, "syslog_facility", &value, "daemon"); setenv("HA_logfacility", value, 1); } else { setenv("HA_logfacility", "none", 1); } get_config_opt(crm_api, local_handle, "to_file", &value, "off"); if(ais_get_boolean(value)) { get_config_opt(crm_api, local_handle, "logfile", &value, NULL); if(value == NULL) { ais_err("Logging to a file requested but no log file specified"); } else { setenv("HA_logfile", value, 1); } } config_find_done(crm_api, local_handle); top_handle = config_find_init(crm_api, "service"); local_handle = config_find_next(crm_api, "service", top_handle); while(local_handle) { value = NULL; crm_api->object_key_get(local_handle, "name", strlen("name"), (void**)&value, NULL); if(ais_str_eq("pacemaker", value)) { break; } local_handle = config_find_next(crm_api, "service", top_handle); } get_config_opt(crm_api, local_handle, "use_logd", &value, "no"); setenv("HA_use_logd", value, 1); get_config_opt(crm_api, local_handle, "use_mgmtd", &value, "no"); if(ais_get_boolean(value) == FALSE) { int lpc = 0; for (; lpc < SIZEOF(crm_children); lpc++) { if(crm_proc_mgmtd & crm_children[lpc].flag) { /* Disable mgmtd startup */ crm_children[lpc].start_seq = 0; break; } } } config_find_done(crm_api, local_handle); } static void crm_plugin_init(void) { int rc = 0; struct utsname us; #ifdef AIS_WHITETANK log_init ("crm"); #endif process_ais_conf(); membership_list = g_hash_table_new_full( g_direct_hash, g_direct_equal, NULL, destroy_ais_node); membership_notify_list = g_hash_table_new(g_direct_hash, g_direct_equal); setenv("HA_COMPRESSION", "bz2", 1); setenv("HA_cluster_type", "openais", 1); if(system("echo 1 > /proc/sys/kernel/core_uses_pid") != 0) { ais_perror("Could not enable /proc/sys/kernel/core_uses_pid"); } ais_info("CRM: Initialized"); log_printf(LOG_INFO, "Logging: Initialized %s\n", __PRETTY_FUNCTION__); rc = uname(&us); AIS_ASSERT(rc == 0); local_uname = ais_strdup(us.nodename); local_uname_len = strlen(local_uname); #if AIS_WHITETANK local_nodeid = totempg_my_nodeid_get(); #endif #if AIS_COROSYNC local_nodeid = crm_api->totem_nodeid_get(); #endif ais_info("Service: %d", CRM_SERVICE); ais_info("Local node id: %u", local_nodeid); ais_info("Local hostname: %s", local_uname); update_member(local_nodeid, 0, 0, 1, 0, local_uname, CRM_NODE_MEMBER, NULL); } int crm_config_init_fn(plugin_init_type *unused) { return 0; } static void *crm_wait_dispatch (void *arg) { struct timespec waitsleep = { .tv_sec = 0, .tv_nsec = 100000 /* 100 msec */ }; while(wait_active) { int lpc = 0; for (; lpc < SIZEOF(crm_children); lpc++) { if(crm_children[lpc].pid > 0) { int status; pid_t pid = wait4( crm_children[lpc].pid, &status, WNOHANG, NULL); if(pid == 0) { continue; } else if(pid < 0) { ais_perror("crm_wait_dispatch: Call to wait4(%s) failed", crm_children[lpc].name); continue; } /* cleanup */ crm_children[lpc].pid = 0; crm_children[lpc].conn = NULL; crm_children[lpc].async_conn = NULL; if(WIFSIGNALED(status)) { int sig = WTERMSIG(status); ais_err("Child process %s terminated with signal %d" " (pid=%d, core=%s)", crm_children[lpc].name, sig, pid, WCOREDUMP(status)?"true":"false"); } else if (WIFEXITED(status)) { int rc = WEXITSTATUS(status); do_ais_log(rc==0?LOG_NOTICE:LOG_ERR, "Child process %s exited (pid=%d, rc=%d)", crm_children[lpc].name, pid, rc); if(rc == 100) { ais_notice("Child process %s no longer wishes" " to be respawned", crm_children[lpc].name); crm_children[lpc].respawn = FALSE; } } crm_children[lpc].respawn_count += 1; if(crm_children[lpc].respawn_count > MAX_RESPAWN) { ais_err("Child respawn count exceeded by %s", crm_children[lpc].name); crm_children[lpc].respawn = FALSE; } if(crm_children[lpc].respawn) { ais_notice("Respawning failed child process: %s", crm_children[lpc].name); spawn_child(&(crm_children[lpc])); } else { send_cluster_id(); } } } sched_yield (); nanosleep (&waitsleep, 0); } return 0; } #include #include int crm_exec_init_fn(plugin_init_type *init_with) { int lpc = 0; int start_seq = 1; static gboolean need_init = TRUE; static int max = SIZEOF(crm_children); crm_api = init_with; if(need_init) { struct passwd *pwentry = NULL; need_init = FALSE; crm_plugin_init(); pthread_create (&crm_wait_thread, NULL, crm_wait_dispatch, NULL); pwentry = getpwnam(HA_CCMUSER); AIS_CHECK(pwentry != NULL, ais_err("Cluster user %s does not exist", HA_CCMUSER); return TRUE); mkdir(HA_VARRUNDIR, 750); mkdir(HA_VARRUNDIR"/crm", 750); mkdir(HA_VARRUNHBDIR"/rsctmp", 755); /* Used by RAs - Leave owned by root */ chown(HA_VARRUNDIR"/crm", pwentry->pw_uid, pwentry->pw_gid); chown(HA_VARRUNDIR, pwentry->pw_uid, pwentry->pw_gid); for (start_seq = 1; start_seq < max; start_seq++) { /* dont start anything with start_seq < 1 */ for (lpc = 0; lpc < max; lpc++) { if(start_seq == crm_children[lpc].start_seq) { spawn_child(&(crm_children[lpc])); } } } } ais_info("CRM: Initialized"); return 0; } /* static void ais_print_node(const char *prefix, struct totem_ip_address *host) { int len = 0; char *buffer = NULL; ais_malloc0(buffer, INET6_ADDRSTRLEN+1); inet_ntop(host->family, host->addr, buffer, INET6_ADDRSTRLEN); len = strlen(buffer); ais_info("%s: %.*s", prefix, len, buffer); ais_free(buffer); } */ #if 0 /* copied here for reference from exec/totempg.c */ char *totempg_ifaces_print (unsigned int nodeid) { static char iface_string[256 * INTERFACE_MAX]; char one_iface[64]; struct totem_ip_address interfaces[INTERFACE_MAX]; char **status; unsigned int iface_count; unsigned int i; int res; iface_string[0] = '\0'; res = totempg_ifaces_get (nodeid, interfaces, &status, &iface_count); if (res == -1) { return ("no interface found for nodeid"); } for (i = 0; i < iface_count; i++) { sprintf (one_iface, "r(%d) ip(%s), ", i, totemip_print (&interfaces[i])); strcat (iface_string, one_iface); } return (iface_string); } #endif static void ais_mark_unseen_peer_dead( gpointer key, gpointer value, gpointer user_data) { int *changed = user_data; crm_node_t *node = value; if(node->last_seen != membership_seq && ais_str_eq(CRM_NODE_LOST, node->state) == FALSE) { ais_info("Node %s was not seen in the previous transition", node->uname); *changed += update_member(node->id, 0, membership_seq, node->votes, node->processes, node->uname, CRM_NODE_LOST, NULL); } } void global_confchg_fn ( enum totem_configuration_type configuration_type, unsigned int *member_list, int member_list_entries, unsigned int *left_list, int left_list_entries, unsigned int *joined_list, int joined_list_entries, struct memb_ring_id *ring_id) { int lpc = 0; int changed = 0; int do_update = 0; AIS_ASSERT(ring_id != NULL); switch(configuration_type) { case TOTEM_CONFIGURATION_REGULAR: do_update = 1; break; case TOTEM_CONFIGURATION_TRANSITIONAL: break; } membership_seq = ring_id->seq; ais_notice("%s membership event on ring %lld: memb=%d, new=%d, lost=%d", do_update?"Stable":"Transitional", ring_id->seq, member_list_entries, joined_list_entries, left_list_entries); if(do_update == 0) { for(lpc = 0; lpc < joined_list_entries; lpc++) { const char *prefix = "new: "; uint32_t nodeid = joined_list[lpc]; ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); } for(lpc = 0; lpc < member_list_entries; lpc++) { const char *prefix = "memb:"; uint32_t nodeid = member_list[lpc]; ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); } for(lpc = 0; lpc < left_list_entries; lpc++) { const char *prefix = "lost:"; uint32_t nodeid = left_list[lpc]; ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); } return; } for(lpc = 0; lpc < joined_list_entries; lpc++) { const char *prefix = "NEW: "; uint32_t nodeid = joined_list[lpc]; crm_node_t *node = NULL; changed += update_member( nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_MEMBER, NULL); ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(nodeid)); if(node->addr == NULL) { const char *addr = totempg_ifaces_print(nodeid); node->addr = ais_strdup(addr); ais_debug("Node %u has address %s", nodeid, node->addr); } } plugin_has_votes = 0; for(lpc = 0; lpc < member_list_entries; lpc++) { const char *prefix = "MEMB:"; uint32_t nodeid = member_list[lpc]; plugin_has_votes++; changed += update_member( nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_MEMBER, NULL); ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); } for(lpc = 0; lpc < left_list_entries; lpc++) { const char *prefix = "LOST:"; uint32_t nodeid = left_list[lpc]; changed += update_member( nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_LOST, NULL); ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); } if(changed && joined_list_entries == 0 && left_list_entries == 0) { ais_err("Something strange happened: %d", changed); changed = 0; } ais_debug_2("Reaping unseen nodes..."); g_hash_table_foreach(membership_list, ais_mark_unseen_peer_dead, &changed); if(plugin_has_votes > plugin_expected_votes) { plugin_expected_votes = plugin_has_votes; changed = 1; } if(member_list_entries > 1) { /* Used to set born-on in send_cluster_id()) * We need to wait until we have at least one peer since first * membership id is based on the one before we stopped and isn't reliable */ have_reliable_membership_id = TRUE; } if(changed) { ais_debug("%d nodes changed", changed); send_member_notification(); } send_cluster_id(); } int ais_ipc_client_exit_callback (void *conn) { int lpc = 0; const char *client = NULL; void *async_conn = openais_conn_partner_get(conn); for (; lpc < SIZEOF(crm_children); lpc++) { if(crm_children[lpc].conn == conn) { if(wait_active == FALSE) { /* Make sure the shutdown loop exits */ crm_children[lpc].pid = 0; } crm_children[lpc].conn = NULL; crm_children[lpc].async_conn = NULL; client = crm_children[lpc].name; break; } } g_hash_table_remove(membership_notify_list, async_conn); do_ais_log(client?LOG_INFO:(LOG_DEBUG+1), "Client %s (conn=%p, async-conn=%p) left", client?client:"unknown-transient", conn, async_conn); return (0); } int ais_ipc_client_connect_callback (void *conn) { /* OpenAIS hasn't finished setting up the connection at this point * Sending messages now messes up the protocol! */ return (0); } /* * Executive message handlers */ void ais_cluster_message_swab(void *msg) { AIS_Message *ais_msg = msg; ais_debug_3("Performing endian conversion..."); ais_msg->id = swab32 (ais_msg->id); ais_msg->size = swab32 (ais_msg->size); ais_msg->is_compressed = swab32 (ais_msg->is_compressed); ais_msg->compressed_size = swab32 (ais_msg->compressed_size); ais_msg->host.id = swab32 (ais_msg->host.id); ais_msg->host.pid = swab32 (ais_msg->host.pid); ais_msg->host.type = swab32 (ais_msg->host.type); ais_msg->host.size = swab32 (ais_msg->host.size); ais_msg->host.local = swab32 (ais_msg->host.local); ais_msg->sender.id = swab32 (ais_msg->sender.id); ais_msg->sender.pid = swab32 (ais_msg->sender.pid); ais_msg->sender.type = swab32 (ais_msg->sender.type); ais_msg->sender.size = swab32 (ais_msg->sender.size); ais_msg->sender.local = swab32 (ais_msg->sender.local); } void ais_cluster_message_callback ( void *message, unsigned int nodeid) { AIS_Message *ais_msg = message; ais_debug_2("Message from node %u (%s)", nodeid, nodeid==local_nodeid?"local":"remote"); /* Shouldn't be required... update_member( ais_msg->sender.id, membership_seq, -1, 0, ais_msg->sender.uname, NULL); */ if(ais_msg->host.size == 0 || ais_str_eq(ais_msg->host.uname, local_uname)) { route_ais_message(ais_msg, FALSE); } else { ais_debug_3("Discarding Msg[%d] (dest=%s:%s, from=%s:%s)", ais_msg->id, ais_dest(&(ais_msg->host)), msg_type2text(ais_msg->host.type), ais_dest(&(ais_msg->sender)), msg_type2text(ais_msg->sender.type)); } } void ais_cluster_id_swab(void *msg) { struct crm_identify_msg_s *ais_msg = msg; ais_debug_3("Performing endian conversion..."); ais_msg->id = swab32 (ais_msg->id); ais_msg->pid = swab32 (ais_msg->pid); ais_msg->votes = swab32 (ais_msg->votes); ais_msg->processes = swab32 (ais_msg->processes); } void ais_cluster_id_callback (void *message, unsigned int nodeid) { int changed = 0; struct crm_identify_msg_s *msg = message; if(nodeid != msg->id) { ais_err("Invalid message: Node %u claimed to be node %d", nodeid, msg->id); return; } ais_debug("Node update: %s (%s)", msg->uname, msg->version); changed = update_member( nodeid, msg->born_on, membership_seq, msg->votes, msg->processes, msg->uname, NULL, msg->version); if(changed) { send_member_notification(); } } struct res_overlay { mar_res_header_t header __attribute((aligned(8))); char buf[4096]; }; struct res_overlay *res_overlay = NULL; static void send_ipc_ack(void *conn, int class) { if(res_overlay == NULL) { ais_malloc0(res_overlay, sizeof(struct res_overlay)); } res_overlay->header.size = crm_lib_service[class].response_size; res_overlay->header.id = crm_lib_service[class].response_id; res_overlay->header.error = SA_AIS_OK; #ifdef AIS_WHITETANK openais_response_send (conn, res_overlay, res_overlay->header.size); #endif #ifdef AIS_COROSYNC crm_api->ipc_conn_send_response (conn, res_overlay, res_overlay->header.size); #endif } /* local callbacks */ void ais_ipc_message_callback(void *conn, void *msg) { gboolean transient = TRUE; AIS_Message *ais_msg = msg; int type = ais_msg->sender.type; void *async_conn = openais_conn_partner_get(conn); ais_debug_2("Message from client %p", conn); - send_ipc_ack(conn, 0); + send_ipc_ack(conn, crm_class_cluster); ais_debug_3("type: %d local: %d conn: %p host type: %d ais: %d sender pid: %d child pid: %d size: %d", type, ais_msg->host.local, crm_children[type].conn, ais_msg->host.type, crm_msg_ais, ais_msg->sender.pid, crm_children[type].pid, ((int)SIZEOF(crm_children))); if(type > crm_msg_none && type < SIZEOF(crm_children)) { /* known child process */ transient = FALSE; } /* If this check fails, the order of crm_children probably * doesn't match that of the crm_ais_msg_types enum */ AIS_CHECK(transient || ais_msg->sender.pid == crm_children[type].pid, ais_err("Sender: %d, child[%d]: %d", ais_msg->sender.pid, type, crm_children[type].pid); return); if(transient == FALSE && type > crm_msg_none && ais_msg->host.local && crm_children[type].conn == NULL && ais_msg->host.type == crm_msg_ais) { ais_info("Recorded connection %p for %s/%d", conn, crm_children[type].name, crm_children[type].pid); crm_children[type].conn = conn; crm_children[type].async_conn = async_conn; /* Make sure they have the latest membership */ if(crm_children[type].flags & crm_flag_members) { char *update = ais_generate_membership_data(); g_hash_table_replace(membership_notify_list, async_conn, async_conn); ais_info("Sending membership update "U64T" to %s", membership_seq, crm_children[type].name); send_client_msg(async_conn, crm_class_members, crm_msg_none,update); } } ais_msg->sender.id = local_nodeid; ais_msg->sender.size = local_uname_len; memset(ais_msg->sender.uname, 0, MAX_NAME); memcpy(ais_msg->sender.uname, local_uname, ais_msg->sender.size); route_ais_message(msg, TRUE); } int crm_exec_exit_fn ( #ifdef AIS_WHITETANK struct objdb_iface_ver0 *objdb #endif #ifdef AIS_COROSYNC void #endif ) { int lpc = 0; int start_seq = 1; static int max = SIZEOF(crm_children); struct timespec waitsleep = { .tv_sec = 1, .tv_nsec = 0 }; ais_notice("Begining shutdown"); in_shutdown = TRUE; wait_active = FALSE; /* stop the wait loop */ for (start_seq = max; start_seq > 0; start_seq--) { /* dont stop anything with start_seq < 1 */ for (lpc = max - 1; lpc >= 0; lpc--) { int orig_pid = 0, iter = 0; if(start_seq != crm_children[lpc].start_seq) { continue; } orig_pid = crm_children[lpc].pid; crm_children[lpc].respawn = FALSE; stop_child(&(crm_children[lpc]), SIGTERM); while(crm_children[lpc].command && crm_children[lpc].pid) { int status; pid_t pid = 0; pid = wait4( crm_children[lpc].pid, &status, WNOHANG, NULL); if(pid == 0) { if((++iter % 30) == 0) { ais_notice("Still waiting for %s (pid=%d) to terminate...", crm_children[lpc].name, orig_pid); } sched_yield (); nanosleep (&waitsleep, 0); continue; } else if(pid < 0) { ais_perror("crm_exec_exit_fn: Call to wait4(%s) failed", crm_children[lpc].name); } /* cleanup */ crm_children[lpc].pid = 0; crm_children[lpc].conn = NULL; crm_children[lpc].async_conn = NULL; break; } ais_notice("%s (pid=%d) confirmed dead", crm_children[lpc].name, orig_pid); } } send_cluster_id(); ais_notice("Shutdown complete"); #ifndef AIS_WHITETANK logsys_flush (); #endif return 0; } struct member_loop_data { char *string; }; void member_loop_fn(gpointer key, gpointer value, gpointer user_data) { crm_node_t *node = value; struct member_loop_data *data = user_data; ais_debug_2("Dumping node %u", node->id); data->string = append_member(data->string, node); } char *ais_generate_membership_data(void) { int size = 0; struct member_loop_data data; size = 14 + 32; /* + int */ ais_malloc0(data.string, size); sprintf(data.string, "", membership_seq, plugin_has_quorum()); g_hash_table_foreach(membership_list, member_loop_fn, &data); size = strlen(data.string); data.string = realloc(data.string, size + 9) ;/* 9 = + nul */ sprintf(data.string + size, ""); return data.string; } void ais_node_list_query(void *conn, void *msg) { char *data = ais_generate_membership_data(); void *async_conn = openais_conn_partner_get(conn); /* send the ACK before we send any other messages */ - send_ipc_ack(conn, 1); + send_ipc_ack(conn, crm_class_members); if(async_conn) { send_client_msg(async_conn, crm_class_members, crm_msg_none, data); } ais_free(data); } void ais_plugin_remove_member(void *conn, void *msg) { AIS_Message *ais_msg = msg; char *data = get_ais_data(ais_msg); if(data != NULL) { char *bcast = ais_concat("remove-peer", data, ':'); send_cluster_msg(crm_msg_ais, NULL, bcast); ais_info("Sent: %s", bcast); ais_free(bcast); } - send_ipc_ack(conn, 2); + send_ipc_ack(conn, crm_class_rmpeer); ais_free(data); } -static void send_quorum_details(void *conn, int sync) +static void send_quorum_details(void *conn) { - struct crm_ais_quorum_resp_s resp; - - resp.header.size = crm_lib_service[crm_class_quorum].response_size; - resp.header.id = crm_lib_service[crm_class_quorum].response_id; - resp.header.error = SA_AIS_OK; - - resp.id = membership_seq; - resp.votes = plugin_has_votes; - resp.expected_votes = plugin_expected_votes; - resp.quorate = plugin_has_quorum(); + int size = 256; + char *data = NULL; + ais_malloc0(data, size); + + snprintf(data, size, "", + membership_seq, plugin_has_quorum()?"true":"false", + plugin_expected_votes, plugin_has_votes); - if(sync) { -#ifdef AIS_WHITETANK - openais_response_send (conn, &resp, resp.header.size); -#endif -#ifdef AIS_COROSYNC - crm_api->ipc_conn_send_response (conn, &resp, resp.header.size); -#endif - } else { -#ifdef AIS_WHITETANK - openais_dispatch_send (conn, &resp, resp.header.size); -#endif -#ifdef AIS_COROSYNC - crm_api->ipc_dispatch_send (conn, &resp, resp.header.size); -#endif - } + send_client_msg(conn, crm_class_quorum, crm_msg_none, data); + ais_free(data); } void ais_plugin_quorum(void *conn, void *msg) { AIS_Message *ais_msg = msg; char *data = get_ais_data(ais_msg); if(data != NULL) { - int value = ais_get_int(data, NULL); - if(value > 0) { + int value = 0; + + value = ais_get_int(data, NULL); + if(value > 0 && plugin_expected_votes != value) { ais_info("Expected quorum votes %d -> %d", plugin_expected_votes, value); plugin_expected_votes = value; } } - send_quorum_details(conn, TRUE); + send_ipc_ack(conn, crm_class_quorum); + + send_quorum_details(conn); ais_free(data); } void ais_manage_notification(void *conn, void *msg) { int enable = 0; AIS_Message *ais_msg = msg; char *data = get_ais_data(ais_msg); void *async_conn = openais_conn_partner_get(conn); if(ais_str_eq("true", data)) { enable = 1; } ais_info("%s node notifications for child %d (%p)", enable?"Enabling":"Disabling", ais_msg->sender.pid, async_conn); if(enable) { g_hash_table_replace(membership_notify_list, async_conn, async_conn); } else { g_hash_table_remove(membership_notify_list, async_conn); } - send_ipc_ack(conn, 2); + send_ipc_ack(conn, crm_class_notify); ais_free(data); } void ais_our_nodeid(void *conn, void *msg) { static int counter = 0; struct crm_ais_nodeid_resp_s resp; ais_debug_2("Sending local nodeid: %d to %p[%d]", local_nodeid, conn, counter); resp.header.size = crm_lib_service[crm_class_nodeid].response_size; resp.header.id = crm_lib_service[crm_class_nodeid].response_id; resp.header.error = SA_AIS_OK; resp.id = local_nodeid; resp.counter = counter++; memset(resp.uname, 0, 256); memcpy(resp.uname, local_uname, local_uname_len); #ifdef AIS_WHITETANK openais_response_send (conn, &resp, resp.header.size); #endif #ifdef AIS_COROSYNC crm_api->ipc_conn_send_response (conn, &resp, resp.header.size); #endif } static gboolean ghash_send_update(gpointer key, gpointer value, gpointer data) { - send_quorum_details(value, FALSE); + send_quorum_details(value); if(send_client_msg(value, crm_class_members, crm_msg_none, data) != 0) { /* remove it */ return TRUE; } return FALSE; } void send_member_notification(void) { char *update = ais_generate_membership_data(); ais_info("Sending membership update "U64T" to %d children", membership_seq, g_hash_table_size(membership_notify_list)); g_hash_table_foreach_remove(membership_notify_list, ghash_send_update, update); ais_free(update); } static gboolean check_message_sanity(AIS_Message *msg, char *data) { gboolean sane = TRUE; gboolean repaired = FALSE; int dest = msg->host.type; int tmp_size = msg->header.size - sizeof(AIS_Message); if(sane && msg->header.size == 0) { ais_warn("Message with no size"); sane = FALSE; } if(sane && msg->header.error != SA_AIS_OK) { ais_warn("Message header contains an error: %d", msg->header.error); sane = FALSE; } if(sane && tmp_size < 0) { /* not an AIS message */ return TRUE; } if(sane && ais_data_len(msg) != tmp_size) { int cur_size = ais_data_len(msg); repaired = TRUE; if(msg->is_compressed) { msg->compressed_size = tmp_size; } else { msg->size = tmp_size; } ais_warn("Repaired message payload size %d -> %d", cur_size, tmp_size); } if(sane && ais_data_len(msg) == 0) { ais_warn("Message with no payload"); sane = FALSE; } if(sane && data && msg->is_compressed == FALSE) { int str_size = strlen(data) + 1; if(ais_data_len(msg) != str_size) { int lpc = 0; ais_warn("Message payload is corrupted: expected %d bytes, got %d", ais_data_len(msg), str_size); sane = FALSE; for(lpc = (str_size - 10); lpc < msg->size; lpc++) { if(lpc < 0) { lpc = 0; } ais_debug_2("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]); } } } if(sane == FALSE) { ais_err("Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } else if(repaired) { ais_err("Repaired message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } else { ais_debug_3("Verified message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } return sane; } gboolean route_ais_message(AIS_Message *msg, gboolean local_origin) { int rc = 0; int dest = msg->host.type; const char *reason = "unknown"; - + static int service_id = SERVICE_ID_MAKE(CRM_SERVICE, 0); + ais_debug_3("Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, local_origin?"false":"true", ais_data_len(msg)); if(local_origin == FALSE) { if(msg->host.size == 0 || ais_str_eq(local_uname, msg->host.uname)) { msg->host.local = TRUE; } } if(check_message_sanity(msg, msg->data) == FALSE) { /* Dont send this message to anyone */ return FALSE; } if(msg->host.local) { void *conn = NULL; const char *lookup = NULL; if(dest == crm_msg_ais) { process_ais_message(msg); return TRUE; } else if(dest == crm_msg_lrmd) { /* lrmd messages are routed via the crm */ dest = crm_msg_crmd; } else if(dest == crm_msg_te) { /* te messages are routed via the crm */ dest = crm_msg_crmd; } AIS_CHECK(dest > 0 && dest < SIZEOF(crm_children), ais_err("Invalid destination: %d", dest); log_ais_message(LOG_ERR, msg); return FALSE; ); lookup = msg_type2text(dest); conn = crm_children[dest].async_conn; /* the cluster fails in weird and wonderfully obscure ways when this is not true */ AIS_ASSERT(ais_str_eq(lookup, crm_children[dest].name)); + if(msg->header.id == service_id) { + msg->header.id = 0; /* reset this back to zero for IPC messages */ + + } else if(msg->header.id != 0) { + ais_err("reset header id back to zero from %d", msg->header.id); + msg->header.id = 0; /* reset this back to zero for IPC messages */ + } + rc = send_client_ipc(conn, msg); } else if(local_origin) { /* forward to other hosts */ ais_debug_3("Forwarding to cluster"); reason = "cluster delivery failed"; rc = send_cluster_msg_raw(msg); - - } else { - ais_debug_3("Ignoring..."); } if(rc != 0) { ais_warn("Sending message to %s.%s failed: %s (rc=%d)", ais_dest(&(msg->host)), msg_type2text(dest), reason, rc); log_ais_message(LOG_DEBUG, msg); return FALSE; } return TRUE; } int send_cluster_msg_raw(AIS_Message *ais_msg) { int rc = 0; struct iovec iovec; static uint32_t msg_id = 0; AIS_Message *bz2_msg = NULL; AIS_ASSERT(local_nodeid != 0); if(ais_msg->header.size != (sizeof(AIS_Message) + ais_data_len(ais_msg))) { ais_err("Repairing size mismatch: %u + %d = %d", (unsigned int)sizeof(AIS_Message), ais_data_len(ais_msg), ais_msg->header.size); ais_msg->header.size = sizeof(AIS_Message) + ais_data_len(ais_msg); } if(ais_msg->id == 0) { msg_id++; AIS_CHECK(msg_id != 0 /* detect wrap-around */, msg_id++; ais_err("Message ID wrapped around")); ais_msg->id = msg_id; } ais_msg->header.error = SA_AIS_OK; ais_msg->header.id = SERVICE_ID_MAKE(CRM_SERVICE, 0); ais_msg->sender.id = local_nodeid; ais_msg->sender.size = local_uname_len; memset(ais_msg->sender.uname, 0, MAX_NAME); memcpy(ais_msg->sender.uname, local_uname, ais_msg->sender.size); iovec.iov_base = (char *)ais_msg; iovec.iov_len = ais_msg->header.size; ais_debug_3("Sending message (size=%u)", (unsigned int)iovec.iov_len); rc = totempg_groups_mcast_joined ( openais_group_handle, &iovec, 1, TOTEMPG_SAFE); if(rc == 0 && ais_msg->is_compressed == FALSE) { ais_debug_2("Message sent: %.80s", ais_msg->data); } AIS_CHECK(rc == 0, ais_err("Message not sent (%d): %.120s", rc, ais_msg->data)); ais_free(bz2_msg); return rc; } #define min(x,y) (x)<(y)?(x):(y) void send_cluster_id(void) { int rc = 0; int lpc = 0; int len = 0; struct iovec iovec; struct crm_identify_msg_s *msg = NULL; static uint64_t local_born_on = 0; AIS_ASSERT(local_nodeid != 0); if(local_born_on == 0 && have_reliable_membership_id) { local_born_on = membership_seq; } ais_malloc0(msg, sizeof(struct crm_identify_msg_s)); msg->header.size = sizeof(struct crm_identify_msg_s); msg->id = local_nodeid; /* msg->header.error = SA_AIS_OK; */ msg->header.id = SERVICE_ID_MAKE(CRM_SERVICE, 1); len = min(local_uname_len, MAX_NAME-1); memset(msg->uname, 0, MAX_NAME); memcpy(msg->uname, local_uname, len); len = min(strlen(VERSION), MAX_NAME-1); memset(msg->version, 0, MAX_NAME); memcpy(msg->version, VERSION, len); msg->votes = 1; msg->pid = getpid(); msg->processes = crm_proc_ais; msg->born_on = local_born_on; for (lpc = 0; lpc < SIZEOF(crm_children); lpc++) { if(crm_children[lpc].pid != 0) { msg->processes |= crm_children[lpc].flag; } } ais_debug("Local update: id=%u, born="U64T", seq="U64T"", local_nodeid, local_born_on, membership_seq); update_member( local_nodeid, local_born_on, membership_seq, msg->votes, msg->processes, NULL, NULL, VERSION); iovec.iov_base = (char *)msg; iovec.iov_len = msg->header.size; rc = totempg_groups_mcast_joined ( openais_group_handle, &iovec, 1, TOTEMPG_SAFE); AIS_CHECK(rc == 0, ais_err("Message not sent (%d)", rc)); ais_free(msg); } static gboolean ghash_send_removal(gpointer key, gpointer value, gpointer data) { - send_quorum_details(value, FALSE); + send_quorum_details(value); if(send_client_msg(value, crm_class_rmpeer, crm_msg_none, data) != 0) { /* remove it */ return TRUE; } return FALSE; } static void ais_remove_peer(char *node_id) { uint32_t id = ais_get_int(node_id, NULL); crm_node_t *node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(id)); if(node == NULL) { ais_info("Peer %u is unknown", id); } else if(ais_str_eq(CRM_NODE_MEMBER, node->state)) { ais_warn("Peer %u/%s is still active", id, node->uname); } else if(g_hash_table_remove(membership_list, GUINT_TO_POINTER(id))) { plugin_expected_votes--; ais_notice("Removed dead peer %u from the membership list", id); ais_info("Sending removal of %u to %d children", id, g_hash_table_size(membership_notify_list)); g_hash_table_foreach_remove(membership_notify_list, ghash_send_removal, node_id); } else { ais_warn("Peer %u/%s was not removed", id, node->uname); } } gboolean process_ais_message(AIS_Message *msg) { int len = ais_data_len(msg); char *data = get_ais_data(msg); do_ais_log(LOG_DEBUG, "Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d): %.90s", msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->sender.uname==local_uname?"false":"true", ais_data_len(msg), data); if(data && len > 12 && strncmp("remove-peer:", data, 12) == 0) { char *node = data+12; ais_remove_peer(node); } ais_free(data); return TRUE; } diff --git a/lib/common/ais.c b/lib/common/ais.c index 292d31c6b2..e6611fe52e 100644 --- a/lib/common/ais.c +++ b/lib/common/ais.c @@ -1,770 +1,775 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include "stack.h" #include #include enum crm_ais_msg_types text2msg_type(const char *text) { int type = crm_msg_none; CRM_CHECK(text != NULL, return type); if(safe_str_eq(text, "ais")) { type = crm_msg_ais; } else if(safe_str_eq(text, "crm_plugin")) { type = crm_msg_ais; } else if(safe_str_eq(text, CRM_SYSTEM_CIB)) { type = crm_msg_cib; } else if(safe_str_eq(text, CRM_SYSTEM_CRMD)) { type = crm_msg_crmd; } else if(safe_str_eq(text, CRM_SYSTEM_DC)) { type = crm_msg_crmd; } else if(safe_str_eq(text, CRM_SYSTEM_TENGINE)) { type = crm_msg_te; } else if(safe_str_eq(text, CRM_SYSTEM_PENGINE)) { type = crm_msg_pe; } else if(safe_str_eq(text, CRM_SYSTEM_LRMD)) { type = crm_msg_lrmd; } else if(safe_str_eq(text, CRM_SYSTEM_STONITHD)) { type = crm_msg_stonithd; } else if(safe_str_eq(text, "attrd")) { type = crm_msg_attrd; } else { crm_debug_2("Unknown message type: %s", text); } return type; } char *get_ais_data(AIS_Message *msg) { int rc = BZ_OK; char *uncompressed = NULL; unsigned int new_size = msg->size + 1; if(msg->is_compressed == FALSE) { crm_debug_2("Returning uncompressed message data"); uncompressed = strdup(msg->data); } else { crm_debug_2("Decompressing message data"); crm_malloc0(uncompressed, new_size); rc = BZ2_bzBuffToBuffDecompress( uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0); CRM_ASSERT(rc = BZ_OK); CRM_ASSERT(new_size == msg->size); } return uncompressed; } #if SUPPORT_AIS int ais_fd_sync = -1; int ais_fd_async = -1; /* never send messages via this channel */ void *ais_ipc_ctx = NULL; GFDSource *ais_source = NULL; GFDSource *ais_source_sync = NULL; gboolean get_ais_nodeid(uint32_t *id, char **uname) { struct iovec iov; int retries = 0; int rc = SA_AIS_OK; mar_res_header_t header; struct crm_ais_nodeid_resp_s answer; header.error = SA_AIS_OK; header.id = crm_class_nodeid; header.size = sizeof(mar_res_header_t); CRM_CHECK(id != NULL, return FALSE); CRM_CHECK(uname != NULL, return FALSE); iov.iov_base = &header; iov.iov_len = header.size; retry: errno = 0; #ifdef TRADITIONAL_AIS_IPC rc = saSendReceiveReply(ais_fd_sync, &header, header.size, &answer, sizeof (struct crm_ais_nodeid_resp_s)); #else rc = openais_msg_send_reply_receive( ais_ipc_ctx, &iov, 1, &answer, sizeof (answer)); #endif if(rc == SA_AIS_OK) { CRM_CHECK(answer.header.size == sizeof (struct crm_ais_nodeid_resp_s), crm_err("Odd message: id=%d, size=%d, error=%d", answer.header.id, answer.header.size, answer.header.error)); CRM_CHECK(answer.header.id == crm_class_nodeid, crm_err("Bad response id: %d", answer.header.id)); } if(rc == SA_AIS_ERR_TRY_AGAIN && retries < 20) { retries++; crm_info("Peer overloaded: Re-sending message (Attempt %d of 20)", retries); mssleep(retries * 100); /* Proportional back off */ goto retry; } if(rc != SA_AIS_OK) { crm_err("Sending nodeid request: FAILED (rc=%d): %s", rc, ais_error2text(rc)); return FALSE; } else if(answer.header.error != SA_AIS_OK) { crm_err("Bad response from peer: (rc=%d): %s", rc, ais_error2text(rc)); return FALSE; } crm_info("Server details: id=%u uname=%s", answer.id, answer.uname); *id = answer.id; *uname = crm_strdup(answer.uname); return TRUE; } gboolean send_ais_text(int class, const char *data, gboolean local, const char *node, enum crm_ais_msg_types dest) { static int msg_id = 0; static int local_pid = 0; int retries = 0; int rc = SA_AIS_OK; int buf_len = sizeof(mar_res_header_t); char *buf = NULL; struct iovec iov; mar_res_header_t *header; AIS_Message *ais_msg = NULL; enum crm_ais_msg_types sender = text2msg_type(crm_system_name); /* There are only 6 handlers registered to crm_lib_service in plugin.c */ CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class); return FALSE); + if(data == NULL) { + data = ""; + } + if(local_pid == 0) { local_pid = getpid(); } crm_malloc0(ais_msg, sizeof(AIS_Message)); ais_msg->id = msg_id++; ais_msg->header.id = class; ais_msg->header.error = SA_AIS_OK; ais_msg->host.type = dest; ais_msg->host.local = local; if(node) { ais_msg->host.size = strlen(node); memset(ais_msg->host.uname, 0, MAX_NAME); memcpy(ais_msg->host.uname, node, ais_msg->host.size); ais_msg->host.id = 0; } else { ais_msg->host.size = 0; memset(ais_msg->host.uname, 0, MAX_NAME); ais_msg->host.id = 0; } ais_msg->sender.type = sender; ais_msg->sender.pid = local_pid; ais_msg->sender.size = 0; memset(ais_msg->sender.uname, 0, MAX_NAME); ais_msg->sender.id = 0; - if(data) { - ais_msg->size = 1 + strlen(data); - } + ais_msg->size = 1 + strlen(data); - if(data == NULL) { - /* nothing to do */ - } else if(ais_msg->size < CRM_BZ2_THRESHOLD) { + if(ais_msg->size < CRM_BZ2_THRESHOLD) { failback: crm_realloc(ais_msg, sizeof(AIS_Message) + ais_msg->size); memcpy(ais_msg->data, data, ais_msg->size); } else { char *compressed = NULL; char *uncompressed = crm_strdup(data); unsigned int len = (ais_msg->size * 1.1) + 600; /* recomended size */ crm_debug_5("Compressing message payload"); crm_malloc(compressed, len); rc = BZ2_bzBuffToBuffCompress( compressed, &len, uncompressed, ais_msg->size, CRM_BZ2_BLOCKS, 0, CRM_BZ2_WORK); crm_free(uncompressed); if(rc != BZ_OK) { crm_err("Compression failed: %d", rc); crm_free(compressed); goto failback; } crm_realloc(ais_msg, sizeof(AIS_Message) + len + 1); memcpy(ais_msg->data, compressed, len); ais_msg->data[len] = 0; crm_free(compressed); ais_msg->is_compressed = TRUE; ais_msg->compressed_size = len; crm_debug_2("Compression details: %d -> %d", ais_msg->size, ais_data_len(ais_msg)); } ais_msg->header.size = sizeof(AIS_Message) + ais_data_len(ais_msg); crm_debug_3("Sending%s message %d to %s.%s (data=%d, total=%d)", ais_msg->is_compressed?" compressed":"", ais_msg->id, ais_dest(&(ais_msg->host)), msg_type2text(dest), ais_data_len(ais_msg), ais_msg->header.size); iov.iov_base = ais_msg; iov.iov_len = ais_msg->header.size; retry: errno = 0; crm_realloc(buf, buf_len); #ifdef TRADITIONAL_AIS_IPC rc = saSendReceiveReply(ais_fd_sync, ais_msg, ais_msg->header.size, buf, buf_len); #else rc = openais_msg_send_reply_receive(ais_ipc_ctx, &iov, 1, buf, buf_len); #endif header = (mar_res_header_t *)buf; if(rc == SA_AIS_ERR_TRY_AGAIN && retries < 20) { retries++; crm_info("Peer overloaded: Re-sending message (Attempt %d of 20)", retries); mssleep(retries * 100); /* Proportional back off */ goto retry; } else if(rc == SA_AIS_OK) { CRM_CHECK_AND_STORE(header->size == sizeof (mar_res_header_t), crm_err("Odd message: id=%d, size=%d, class=%d, error=%d", header->id, header->size, class, header->error)); if(buf_len < header->size) { crm_err("Increasing buffer length to %d and retrying", header->size); buf_len = header->size + 1; goto retry; } else if(header->id == crm_class_nodeid && header->size == sizeof (struct crm_ais_nodeid_resp_s)){ struct crm_ais_nodeid_resp_s *answer = (struct crm_ais_nodeid_resp_s *)header; crm_err("Server details: id=%u uname=%s counter=%u", answer->id, answer->uname, answer->counter); } else { CRM_CHECK_AND_STORE(header->id == CRM_MESSAGE_IPC_ACK, crm_err("Bad response id (%d) for request (%d)", header->id, ais_msg->header.id)); CRM_CHECK(header->error == SA_AIS_OK, rc = header->error); } } if(rc != SA_AIS_OK) { crm_perror(LOG_ERR,"Sending message %d: FAILED (rc=%d): %s", ais_msg->id, rc, ais_error2text(rc)); ais_fd_async = -1; } else { crm_debug_4("Message %d: sent", ais_msg->id); } crm_free(buf); crm_free(ais_msg); return (rc == SA_AIS_OK); } gboolean send_ais_message(xmlNode *msg, gboolean local, const char *node, enum crm_ais_msg_types dest) { gboolean rc = TRUE; char *data = NULL; if(ais_fd_async < 0 || ais_source == NULL) { crm_err("Not connected to AIS"); return FALSE; } data = dump_xml_unformatted(msg); rc = send_ais_text(0, data, local, node, dest); crm_free(data); return rc; } void terminate_ais_connection(void) { #ifndef TRADITIONAL_AIS_IPC if(ais_ipc_ctx) { openais_service_disconnect(ais_ipc_ctx); } #else if(ais_fd_sync > 0) { close(ais_fd_sync); } if(ais_fd_async > 0) { close(ais_fd_async); } #endif crm_notice("Disconnected from AIS"); /* G_main_del_fd(ais_source); */ /* G_main_del_fd(ais_source_sync); */ } int ais_membership_timer = 0; gboolean ais_membership_force = FALSE; static gboolean ais_membership_dampen(gpointer data) { crm_debug_2("Requesting cluster membership after stabilization delay"); send_ais_text(crm_class_members, __FUNCTION__, TRUE, NULL, crm_msg_ais); ais_membership_force = TRUE; ais_membership_timer = 0; return FALSE; /* never repeat automatically */ } gboolean ais_dispatch(int sender, gpointer user_data) { char *data = NULL; char *uncompressed = NULL; int rc = SA_AIS_OK; AIS_Message *msg = NULL; gboolean (*dispatch)(AIS_Message*,char*,int) = user_data; #ifdef TRADITIONAL_AIS_IPC mar_res_header_t *header = NULL; static int header_len = sizeof(mar_res_header_t); crm_malloc0(header, header_len); errno = 0; rc = saRecvRetry(sender, header, header_len); if (rc != SA_AIS_OK) { crm_perror(LOG_ERR, "Receiving message header failed: (%d/%d) %s", rc, errno, ais_error2text(rc)); goto bail; } else if(header->size == header_len) { crm_err("Empty message: id=%d, size=%d, error=%d, header_len=%d", header->id, header->size, header->error, header_len); goto done; } else if(header->size == 0 || header->size < header_len) { crm_err("Mangled header: size=%d, header=%d, error=%d", header->size, header_len, header->error); goto done; } else if(header->error != SA_AIS_OK) { crm_err("Header contined error: %d", header->error); } crm_debug_2("Looking for %d (%d - %d) more bytes", header->size - header_len, header->size, header_len); crm_realloc(header, header->size); /* Use a char* so we can store the remainder into an offset */ data = (char*)header; errno = 0; rc = saRecvRetry(sender, data+header_len, header->size - header_len); #else crm_malloc0(data, 1000000); rc = openais_dispatch_recv (ais_ipc_ctx, data, 0); #endif if (rc != SA_AIS_OK) { crm_perror(LOG_ERR,"Receiving message body failed: (%d) %s", rc, ais_error2text(rc)); goto bail; } msg = (AIS_Message*)data; - - if(msg->id == crm_class_quorum) { - struct crm_ais_quorum_resp_s *resp = (struct crm_ais_quorum_resp_s *)data; - crm_debug("known: %u, available: %u: %s", - resp->expected_votes, resp->votes, resp->quorate?"true":"false"); - - if(resp->quorate != crm_have_quorum) { - crm_notice("Membership "U64T": quorum %s", - resp->id, resp->quorate?"attained":"lost"); - - } else { - crm_debug_2("Membership "U64T": quorum %s", - resp->id, resp->quorate?"retained":"lost"); - } - - crm_have_quorum = resp->quorate; - goto dispatch; - } - crm_debug_3("Got new%s message (size=%d, %d, %d)", msg->is_compressed?" compressed":"", ais_data_len(msg), msg->size, msg->compressed_size); data = msg->data; if(msg->is_compressed && msg->size > 0) { int rc = BZ_OK; unsigned int new_size = msg->size + 1; if(check_message_sanity(msg, NULL) == FALSE) { goto badmsg; } crm_debug_5("Decompressing message data"); crm_malloc0(uncompressed, new_size); rc = BZ2_bzBuffToBuffDecompress( uncompressed, &new_size, data, msg->compressed_size, 1, 0); if(rc != BZ_OK) { crm_err("Decompression failed: %d", rc); goto badmsg; } CRM_ASSERT(rc == BZ_OK); CRM_ASSERT(new_size == msg->size); data = uncompressed; } else if(check_message_sanity(msg, data) == FALSE) { goto badmsg; } else if(safe_str_eq("identify", data)) { int pid = getpid(); char *pid_s = crm_itoa(pid); send_ais_text(0, pid_s, TRUE, NULL, crm_msg_ais); crm_free(pid_s); goto done; } + if(msg->header.id != crm_class_members) { + crm_update_peer(msg->sender.id, 0,0,0,0, msg->sender.uname, msg->sender.uname, NULL, NULL); + } + if(msg->header.id == crm_class_rmpeer) { uint32_t id = crm_int_helper(data, NULL); crm_info("Removing peer %s/%u", data, id); reap_crm_member(id); goto done; - } - - if(msg->header.id == crm_class_members) { - xmlNode *xml = string2xml(data); - - if(xml != NULL) { - gboolean do_ask = FALSE; - gboolean do_process = TRUE; - - int new_size = 0; - unsigned long long seq = 0; - int current_size = crm_active_members(); - const char *reason = "unknown"; - const char *value = crm_element_value(xml, "id"); + } else if(msg->header.id == crm_class_quorum) { + xmlNode *xml = string2xml(data); + const char *value = NULL; + gboolean quorate = FALSE; + + if(xml == NULL) { + crm_err("Invalid quorate update: %s", data); + goto badmsg; + } - seq = crm_int_helper(value, NULL); - crm_debug_2("Received membership %llu", seq); + value = crm_element_value(xml, "quorate"); + if(crm_is_true(value)) { + quorate = TRUE; + } + + value = crm_element_value(xml, "id"); + if(quorate != crm_have_quorum) { + crm_notice("Membership %s: quorum %s", value, quorate?"attained":"lost"); - xml_child_iter(xml, node, - const char *state = crm_element_value(node, "state"); - if(safe_str_eq(state, CRM_NODE_MEMBER)) { - new_size++; - } - ); + } else { + crm_debug_2("Membership %s: quorum %s", value, quorate?"retained":"lost"); + } + + crm_have_quorum = quorate; + free_xml(xml); - if(ais_membership_force) { - /* always process */ - crm_debug_2("Processing delayed membership change"); + } else if(msg->header.id == crm_class_members) { + gboolean do_ask = FALSE; + gboolean do_process = TRUE; + + int new_size = 0; + unsigned long long seq = 0; + int current_size = crm_active_members(); + + const char *reason = "unknown"; + const char *value = NULL; + xmlNode *xml = string2xml(data); + if(xml == NULL) { + crm_err("Invalid peer update: %s", data); + goto badmsg; + } + + value = crm_element_value(xml, "id"); + seq = crm_int_helper(value, NULL); + crm_debug_2("Received membership %llu", seq); + + xml_child_iter(xml, node, + const char *state = crm_element_value(node, "state"); + if(safe_str_eq(state, CRM_NODE_MEMBER)) { + new_size++; + } + ); + + if(ais_membership_force) { + /* always process */ + crm_debug_2("Processing delayed membership change"); + #if 0 - } else if(current_size == 0 && new_size == 1) { - do_ask = TRUE; - do_process = FALSE; - reason = "We've come up alone"; + } else if(current_size == 0 && new_size == 1) { + do_ask = TRUE; + do_process = FALSE; + reason = "We've come up alone"; #endif - - } else if(new_size < (current_size/2)) { - do_process = FALSE; + + } else if(new_size < (current_size/2)) { + do_process = FALSE; + reason = "We've lost more than half our peers"; + + if(ais_membership_timer == 0) { reason = "We've lost more than half our peers"; - - if(ais_membership_timer == 0) { - reason = "We've lost more than half our peers"; - crm_log_xml_debug(xml, __PRETTY_FUNCTION__); - do_ask = TRUE; - } + crm_log_xml_debug(xml, __PRETTY_FUNCTION__); + do_ask = TRUE; + } + } + + if(do_process) { + static long long last = 0; + /* if there is a timer running - let it run + * there is no harm in getting an extra membership message + */ + crm_peer_seq = seq; + + /* Skip resends */ + if(last < seq) { + crm_info("Processing membership %llu", seq); } - if(do_process) { - static long long last = 0; - /* if there is a timer running - let it run - * there is no harm in getting an extra membership message - */ - crm_peer_seq = seq; - - /* Skip resends */ - if(last < seq) { - crm_info("Processing membership %llu", seq); - } - /* crm_log_xml_debug(xml, __PRETTY_FUNCTION__); */ - if(ais_membership_force) { - ais_membership_force = FALSE; - } - - xml_child_iter(xml, node, crm_update_ais_node(node, seq)); - last = seq; - - } else if(do_ask) { - dispatch = NULL; - crm_warn("Pausing to allow membership stability (size %d -> %d): %s", - current_size, new_size, reason); - ais_membership_timer = Gmain_timeout_add(4*1000, ais_membership_dampen, NULL); - - /* process node additions */ - xml_child_iter(xml, node, - const char *state = crm_element_value(node, "state"); - if(crm_str_eq(state, CRM_NODE_MEMBER, FALSE)) { - crm_update_ais_node(node, seq); - } - ); - - } else { - dispatch = NULL; - crm_warn("Membership is still unstable (size %d -> %d): %s", - current_size, new_size, reason); + if(ais_membership_force) { + ais_membership_force = FALSE; } + xml_child_iter(xml, node, crm_update_ais_node(node, seq)); + last = seq; + + } else if(do_ask) { + dispatch = NULL; + crm_warn("Pausing to allow membership stability (size %d -> %d): %s", + current_size, new_size, reason); + ais_membership_timer = Gmain_timeout_add(4*1000, ais_membership_dampen, NULL); + + /* process node additions */ + xml_child_iter(xml, node, + const char *state = crm_element_value(node, "state"); + if(crm_str_eq(state, CRM_NODE_MEMBER, FALSE)) { + crm_update_ais_node(node, seq); + } + ); + } else { - crm_warn("Invalid peer update: %s", data); + dispatch = NULL; + crm_warn("Membership is still unstable (size %d -> %d): %s", + current_size, new_size, reason); } - + free_xml(xml); - - } else { - const char *uuid = msg->sender.uname; - crm_update_peer(msg->sender.id, 0,0,0,0, uuid, msg->sender.uname, NULL, NULL); } - dispatch: if(dispatch != NULL) { dispatch(msg, data, sender); } done: crm_free(uncompressed); crm_free(msg); return TRUE; badmsg: crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):" " min=%d, total=%d, size=%d, bz2_size=%d", msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, (int)sizeof(AIS_Message), msg->header.size, msg->size, msg->compressed_size); goto done; bail: crm_err("AIS connection failed"); return FALSE; } static void ais_destroy(gpointer user_data) { crm_err("AIS connection terminated"); ais_fd_sync = -1; exit(1); } gboolean init_ais_connection( gboolean (*dispatch)(AIS_Message*,char*,int), void (*destroy)(gpointer), char **our_uuid, char **our_uname, int *nodeid) { int pid = 0; int retries = 0; int rc = SA_AIS_OK; char *pid_s = NULL; struct utsname name; uint32_t local_nodeid = 0; char *local_uname = NULL; retry: crm_info("Creating connection to our AIS plugin"); #ifdef TRADITIONAL_AIS_IPC rc = saServiceConnect (&ais_fd_sync, &ais_fd_async, CRM_SERVICE); #else rc = openais_service_connect(CRM_SERVICE, &ais_ipc_ctx); if(ais_ipc_ctx) { ais_fd_async = openais_fd_get(ais_ipc_ctx); } else if(rc == SA_AIS_OK) { crm_err("No context created, but connection reported 'ok'"); rc = SA_AIS_ERR_LIBRARY; } #endif if (rc != SA_AIS_OK) { crm_info("Connection to our AIS plugin (%d) failed: %s (%d)", CRM_SERVICE, ais_error2text(rc), rc); } switch(rc) { case SA_AIS_OK: break; case SA_AIS_ERR_TRY_AGAIN: if(retries < 30) { sleep(1); retries++; goto retry; } crm_err("Retry count exceeded"); return FALSE; default: return FALSE; } if(destroy == NULL) { crm_debug("Using the default destroy handler"); destroy = ais_destroy; } crm_info("AIS connection established"); pid = getpid(); pid_s = crm_itoa(pid); send_ais_text(0, pid_s, TRUE, NULL, crm_msg_ais); crm_free(pid_s); crm_peer_init(); get_ais_nodeid(&local_nodeid, &local_uname); if(uname(&name) < 0) { crm_perror(LOG_ERR,"uname(2) call failed"); exit(100); } if(safe_str_neq(name.nodename, local_uname)) { crm_crit("Node name mismatch! OpenAIS supplied %s, our lookup returned %s", local_uname, name.nodename); crm_notice("Node name mismatches usually occur when assigned automatically by DHCP servers"); crm_notice("If this node was part of the cluster with a different name," " you will need to remove the old entry with crm_node --remove"); } if(our_uuid != NULL) { *our_uuid = crm_strdup(local_uname); } if(our_uname != NULL) { *our_uname = local_uname; } if(nodeid != NULL) { *nodeid = local_nodeid; } if(local_nodeid != 0) { /* Ensure the local node always exists */ crm_update_peer(local_nodeid, 0, 0, 0, 0, local_uname, local_uname, NULL, NULL); } if(dispatch) { ais_source = G_main_add_fd( G_PRIORITY_HIGH, ais_fd_async, FALSE, ais_dispatch, dispatch, destroy); } return TRUE; } gboolean check_message_sanity(AIS_Message *msg, char *data) { gboolean sane = TRUE; gboolean repaired = FALSE; int dest = msg->host.type; int tmp_size = msg->header.size - sizeof(AIS_Message); if(sane && msg->header.size == 0) { crm_warn("Message with no size"); sane = FALSE; } if(sane && msg->header.error != SA_AIS_OK) { crm_warn("Message header contains an error: %d", msg->header.error); sane = FALSE; } if(sane && ais_data_len(msg) != tmp_size) { int cur_size = ais_data_len(msg); repaired = TRUE; if(msg->is_compressed) { msg->compressed_size = tmp_size; } else { msg->size = tmp_size; } crm_warn("Repaired message payload size %d -> %d", cur_size, tmp_size); } if(sane && ais_data_len(msg) == 0) { crm_warn("Message with no payload"); sane = FALSE; } if(sane && data && msg->is_compressed == FALSE) { int str_size = strlen(data) + 1; if(ais_data_len(msg) != str_size) { int lpc = 0; crm_warn("Message payload is corrupted: expected %d bytes, got %d", ais_data_len(msg), str_size); sane = FALSE; for(lpc = (str_size - 10); lpc < msg->size; lpc++) { if(lpc < 0) { lpc = 0; } crm_debug("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]); } } } if(sane == FALSE) { crm_err("Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } else if(repaired) { crm_err("Repaired message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } else { crm_debug_3("Verfied message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } return sane; } #endif