diff --git a/crm/ais/plugin.c b/crm/ais/plugin.c index 7c15df9c2d..a44c08f9e0 100644 --- a/crm/ais/plugin.c +++ b/crm/ais/plugin.c @@ -1,1138 +1,1162 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "plugin.h" #include "utils.h" #include #define OPENAIS_EXTERNAL_SERVICE insane_ais_header_hack_in__totem_h #include #include #include #include #include #include #include #include #include #include #include #include int plugin_log_level = LOG_DEBUG; char *local_uname = NULL; int local_uname_len = 0; unsigned int local_nodeid = 0; char *ipc_channel_name = NULL; unsigned long long membership_seq = 0; pthread_t crm_wait_thread; gboolean wait_active = TRUE; GHashTable *membership_list = NULL; #define MAX_RESPAWN 100 #define crm_flag_none 0x00000000 #define crm_flag_members 0x00000001 struct crm_identify_msg_s { mar_req_header_t header __attribute__((aligned(8))); uint32_t id; uint32_t pid; int32_t votes; uint32_t processes; char uname[256]; char version[256]; } __attribute__((packed)); static crm_child_t crm_children[] = { { 0, crm_proc_none, crm_flag_none, 0, FALSE, "none", 0, NULL, NULL }, { 0, crm_proc_ais, crm_flag_none, 0, FALSE, "ais", 0, NULL, NULL }, { 0, crm_proc_lrmd, crm_flag_none, 0, TRUE, "lrmd", 0, HA_LIBHBDIR"/lrmd", NULL }, { 0, crm_proc_cib, crm_flag_members, 0, TRUE, "cib", HA_CCMUID, HA_LIBHBDIR"/cib", NULL }, { 0, crm_proc_crmd, crm_flag_members, 0, TRUE, "crmd", HA_CCMUID, HA_LIBHBDIR"/crmd", NULL }, { 0, crm_proc_attrd,crm_flag_none, 0, TRUE, "attrd", HA_CCMUID, HA_LIBHBDIR"/attrd", NULL }, }; void send_cluster_id(void); int send_cluster_msg_raw(AIS_Message *ais_msg); char *ais_generate_membership_data(void); extern totempg_groups_handle openais_group_handle; void global_confchg_fn ( enum totem_configuration_type configuration_type, unsigned int *member_list, int member_list_entries, unsigned int *left_list, int left_list_entries, unsigned int *joined_list, int joined_list_entries, struct memb_ring_id *ring_id); int crm_exec_exit_fn (struct objdb_iface_ver0 *objdb); int crm_exec_init_fn (struct objdb_iface_ver0 *objdb); int crm_config_init_fn(struct objdb_iface_ver0 *objdb); int ais_ipc_client_connect_callback (void *conn); int ais_ipc_client_exit_callback (void *conn); void ais_cluster_message_swab(void *msg); void ais_cluster_message_callback(void *message, unsigned int nodeid); void ais_ipc_message_callback(void *conn, void *msg); void ais_quorum_query(void *conn, void *msg); void ais_node_list_query(void *conn, void *msg); void ais_manage_notification(void *conn, void *msg); void ais_cluster_id_swab(void *msg); void ais_cluster_id_callback(void *message, unsigned int nodeid); static struct openais_lib_handler crm_lib_service[] = { { /* 0 */ .lib_handler_fn = ais_ipc_message_callback, .response_size = sizeof (mar_res_header_t), .response_id = CRM_MESSAGE_IPC_ACK, .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED }, { /* 1 */ .lib_handler_fn = ais_node_list_query, .response_size = sizeof (mar_res_header_t), .response_id = CRM_MESSAGE_IPC_ACK, .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED }, { /* 2 */ .lib_handler_fn = ais_manage_notification, .response_size = sizeof (mar_res_header_t), .response_id = CRM_MESSAGE_IPC_ACK, .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED }, }; static struct openais_exec_handler crm_exec_service[] = { { /* 0 */ .exec_handler_fn = ais_cluster_message_callback, .exec_endian_convert_fn = ais_cluster_message_swab }, { /* 1 */ .exec_handler_fn = ais_cluster_id_callback, .exec_endian_convert_fn = ais_cluster_id_swab } }; static void crm_exec_dump_fn(void) { ENTER(""); ais_err("Called after SIG_USR2"); LEAVE(""); } /* * Exports the interface for the service */ struct openais_service_handler crm_service_handler = { .name = "LHA Cluster Manager", .id = CRM_SERVICE, .private_data_size = 0, .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED, .lib_init_fn = ais_ipc_client_connect_callback, .lib_exit_fn = ais_ipc_client_exit_callback, .lib_service = crm_lib_service, .lib_service_count = sizeof (crm_lib_service) / sizeof (struct openais_lib_handler), .exec_init_fn = crm_exec_init_fn, .exec_exit_fn = crm_exec_exit_fn, .exec_service = crm_exec_service, .exec_service_count = sizeof (crm_exec_service) / sizeof (struct openais_exec_handler), .config_init_fn = crm_config_init_fn, .confchg_fn = global_confchg_fn, .exec_dump_fn = crm_exec_dump_fn, /* void (*sync_init) (void); */ /* int (*sync_process) (void); */ /* void (*sync_activate) (void); */ /* void (*sync_abort) (void); */ }; /* * Dynamic Loader definition */ struct openais_service_handler *crm_get_handler_ver0 (void); static struct openais_service_handler_iface_ver0 crm_service_handler_iface = { .openais_get_service_handler_ver0 = crm_get_handler_ver0 }; static struct lcr_iface openais_crm_ver0[1] = { { .name = "lha_crm", .version = 0, .versions_replace = 0, .versions_replace_count = 0, .dependencies = 0, .dependency_count = 0, .constructor = NULL, .destructor = NULL, .interfaces = NULL } }; static struct lcr_comp crm_comp_ver0 = { .iface_count = 1, .ifaces = openais_crm_ver0 }; struct openais_service_handler *crm_get_handler_ver0 (void) { return (&crm_service_handler); } __attribute__ ((constructor)) static void register_this_component (void) { lcr_interfaces_set (&openais_crm_ver0[0], &crm_service_handler_iface); lcr_component_register (&crm_comp_ver0); } /* IMPL */ int crm_config_init_fn(struct objdb_iface_ver0 *objdb) { int rc = 0; struct utsname us; + char *value = NULL; + unsigned int object_service_handle; membership_list = g_hash_table_new_full( g_direct_hash, g_direct_equal, NULL, destroy_ais_node); - setenv("HA_debug", "1", 1); - setenv("HA_logfacility", "daemon", 1); - setenv("HA_cluster_type", "openais", 1); setenv("HA_COMPRESSION", "bz2", 1); + setenv("HA_cluster_type", "openais", 1); + + objdb->object_find_reset (OBJECT_PARENT_HANDLE); + + if (objdb->object_find ( + OBJECT_PARENT_HANDLE, "pacemaker", strlen ("pacemaker"), + &object_service_handle) != 0) { + object_service_handle = 0; + ais_info("No configuration suplpied for pacemaker"); + } - plugin_log_level = LOG_DEBUG; + objdb_get_string( + objdb, object_service_handle, "logfacility", &value, "daemon"); + setenv("HA_logfacility", value, 1); + + objdb_get_string(objdb, object_service_handle, "initdead", &value, "20"); + setenv("HA_initdead", value, 1); + + objdb_get_string(objdb, object_service_handle, "debug", &value, "1"); + setenv("HA_debug", value, 1); + + rc = atoi(value); + plugin_log_level = LOG_INFO+rc; + + if(system("echo 1 > /proc/sys/kernel/core_uses_pid") != 0) { + ais_perror("Could not enable /proc/sys/kernel/core_uses_pid"); + } ais_info("CRM: Initialized"); log_printf(LOG_INFO, "Logging: Initialized %s\n", __PRETTY_FUNCTION__); rc = uname(&us); AIS_ASSERT(rc == 0); local_uname = ais_strdup(us.nodename); local_uname_len = strlen(local_uname); ais_info("Local hostname: %s", local_uname); local_nodeid = totempg_my_nodeid_get(); update_member(local_nodeid, 0, 1, 0, local_uname, CRM_NODE_LOST); LEAVE(""); return 0; } static void *crm_wait_dispatch (void *arg) { struct timespec waitsleep = { .tv_sec = 0, .tv_nsec = 100000 /* 100 msec */ }; while(wait_active) { int lpc = 0; for (; lpc < SIZEOF(crm_children); lpc++) { if(crm_children[lpc].pid > 0) { int status; pid_t pid = wait4( crm_children[lpc].pid, &status, WNOHANG, NULL); if(pid == 0) { continue; } else if(pid < 0) { ais_perror("crm_wait_dispatch: Call to wait4(%s) failed", crm_children[lpc].name); continue; } /* cleanup */ crm_children[lpc].pid = 0; crm_children[lpc].conn = NULL; crm_children[lpc].async_conn = NULL; if(WIFSIGNALED(status)) { int sig = WTERMSIG(status); ais_warn("Child process %s terminated with signal %d" " (pid=%d, core=%s)", crm_children[lpc].name, sig, pid, WCOREDUMP(status)?"true":"false"); } else if (WIFEXITED(status)) { int rc = WEXITSTATUS(status); ais_notice("Child process %s exited (pid=%d, rc=%d)", crm_children[lpc].name, pid, rc); if(rc == 100) { ais_notice("Child process %s no longer wishes" " to be respawned", crm_children[lpc].name); crm_children[lpc].respawn = FALSE; } } crm_children[lpc].respawn_count += 1; if(crm_children[lpc].respawn_count > MAX_RESPAWN) { ais_notice("Child respawn count exceeded by %s", crm_children[lpc].name); crm_children[lpc].respawn = FALSE; } if(crm_children[lpc].respawn) { ais_info("Respawning failed child process: %s", crm_children[lpc].name); spawn_child(&(crm_children[lpc])); } else { send_cluster_id(); } } } sched_yield (); nanosleep (&waitsleep, 0); } return 0; } int crm_exec_init_fn (struct objdb_iface_ver0 *objdb) { int lpc = 0; ENTER(""); pthread_create (&crm_wait_thread, NULL, crm_wait_dispatch, NULL); for (; lpc < SIZEOF(crm_children); lpc++) { spawn_child(&(crm_children[lpc])); } ais_info("CRM: Initialized"); LEAVE(""); return 0; } /* static void ais_print_node(const char *prefix, struct totem_ip_address *host) { int len = 0; char *buffer = NULL; ais_malloc0(buffer, INET6_ADDRSTRLEN+1); inet_ntop(host->family, host->addr, buffer, INET6_ADDRSTRLEN); len = strlen(buffer); ais_info("%s: %.*s", prefix, len, buffer); ais_free(buffer); } */ #if 0 /* copied here for reference from exec/totempg.c */ char *totempg_ifaces_print (unsigned int nodeid) { static char iface_string[256 * INTERFACE_MAX]; char one_iface[64]; struct totem_ip_address interfaces[INTERFACE_MAX]; char **status; unsigned int iface_count; unsigned int i; int res; iface_string[0] = '\0'; res = totempg_ifaces_get (nodeid, interfaces, &status, &iface_count); if (res == -1) { return ("no interface found for nodeid"); } for (i = 0; i < iface_count; i++) { sprintf (one_iface, "r(%d) ip(%s) ", i, totemip_print (&interfaces[i])); strcat (iface_string, one_iface); } return (iface_string); } #endif static void ais_mark_unseen_peer_dead( gpointer key, gpointer value, gpointer user_data) { int *changed = user_data; crm_node_t *node = value; if(node->last_seen != membership_seq && ais_str_eq(CRM_NODE_LOST, node->state) == FALSE) { ais_info("Node %s was not seen in the previous transition", node->uname); *changed += update_member(node->id, membership_seq, node->votes, node->processes, node->uname, CRM_NODE_LOST); ais_info("Node %s marked dead", node->uname); } } void global_confchg_fn ( enum totem_configuration_type configuration_type, unsigned int *member_list, int member_list_entries, unsigned int *left_list, int left_list_entries, unsigned int *joined_list, int joined_list_entries, struct memb_ring_id *ring_id) { int lpc = 0; int changed = 0; int do_update = 0; ENTER(""); AIS_ASSERT(ring_id != NULL); switch(configuration_type) { case TOTEM_CONFIGURATION_REGULAR: do_update = 1; break; case TOTEM_CONFIGURATION_TRANSITIONAL: break; } membership_seq = ring_id->seq; ais_notice("%s membership event on ring %lld: memb=%d, new=%d, lost=%d", do_update?"Stable":"Transitional", ring_id->seq, member_list_entries, joined_list_entries, left_list_entries); if(do_update == 0) { for(lpc = 0; lpc < joined_list_entries; lpc++) { const char *prefix = "new: "; uint32_t nodeid = joined_list[lpc]; ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); } for(lpc = 0; lpc < member_list_entries; lpc++) { const char *prefix = "memb:"; uint32_t nodeid = member_list[lpc]; ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); } for(lpc = 0; lpc < left_list_entries; lpc++) { const char *prefix = "lost:"; uint32_t nodeid = left_list[lpc]; ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); } return; } for(lpc = 0; lpc < joined_list_entries; lpc++) { const char *prefix = "NEW: "; uint32_t nodeid = joined_list[lpc]; crm_node_t *node = NULL; changed += update_member( nodeid, membership_seq, -1, 0, NULL, CRM_NODE_MEMBER); ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(nodeid)); if(node->addr == NULL) { const char *addr = totempg_ifaces_print(nodeid); node->addr = ais_strdup(addr); ais_debug("Node %u has address %s", nodeid, node->addr); } } for(lpc = 0; lpc < member_list_entries; lpc++) { const char *prefix = "MEMB:"; uint32_t nodeid = member_list[lpc]; changed += update_member( nodeid, membership_seq, -1, 0, NULL, CRM_NODE_MEMBER); ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); } for(lpc = 0; lpc < left_list_entries; lpc++) { const char *prefix = "LOST:"; uint32_t nodeid = left_list[lpc]; changed += update_member( nodeid, membership_seq, -1, 0, NULL, CRM_NODE_LOST); ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid); } if(do_update) { ais_debug_2("Reaping unseen nodes..."); g_hash_table_foreach( membership_list, ais_mark_unseen_peer_dead, &changed); } if(changed) { ais_debug("%d nodes changed", changed); send_member_notification(); } send_cluster_id(); LEAVE(""); } int ais_ipc_client_exit_callback (void *conn) { int lpc = 0; const char *client = NULL; ENTER("Client=%p", conn); for (; lpc < SIZEOF(crm_children); lpc++) { if(crm_children[lpc].conn == conn) { crm_children[lpc].conn = NULL; crm_children[lpc].async_conn = NULL; client = crm_children[lpc].name; break; } } ais_info("Client %p/%s left", conn, client?client:"unknown-transient"); LEAVE(""); return (0); } int ais_ipc_client_connect_callback (void *conn) { void *async_conn = openais_conn_partner_get(conn); ENTER("Client=%p", conn); ais_debug("Client %p/%p joined", conn, async_conn); if(async_conn) { send_client_msg(async_conn, crm_class_cluster, crm_msg_none, "identify"); } else { ais_err("No async connection"); } LEAVE(""); return (0); } /* * Executive message handlers */ void ais_cluster_message_swab(void *msg) { AIS_Message *ais_msg = msg; ENTER(""); ais_debug_3("Performing endian conversion..."); ais_msg->id = swab32 (ais_msg->id); ais_msg->is_compressed = swab32 (ais_msg->is_compressed); ais_msg->compressed_size = swab32 (ais_msg->compressed_size); ais_msg->size = swab32 (ais_msg->size); ais_msg->host.id = swab32 (ais_msg->host.id); ais_msg->host.pid = swab32 (ais_msg->host.pid); ais_msg->host.type = swab32 (ais_msg->host.type); ais_msg->host.size = swab32 (ais_msg->host.size); ais_msg->host.local = swab32 (ais_msg->host.local); ais_msg->sender.id = swab32 (ais_msg->sender.id); ais_msg->sender.pid = swab32 (ais_msg->sender.pid); ais_msg->sender.type = swab32 (ais_msg->sender.type); ais_msg->sender.size = swab32 (ais_msg->sender.size); ais_msg->sender.local = swab32 (ais_msg->sender.local); LEAVE(""); } void ais_cluster_message_callback ( void *message, unsigned int nodeid) { AIS_Message *ais_msg = message; ENTER("Node=%u (%s)", nodeid, nodeid==local_nodeid?"local":"remote"); ais_debug_2("Message from node %u (%s)", nodeid, nodeid==local_nodeid?"local":"remote"); /* Shouldn't be required... update_member( ais_msg->sender.id, membership_seq, -1, 0, ais_msg->sender.uname, NULL); */ if(ais_msg->host.size == 0 || ais_str_eq(ais_msg->host.uname, local_uname)) { route_ais_message(ais_msg, FALSE); } else { ais_debug_3("Discarding Msg[%d] (dest=%s:%s, from=%s:%s)", ais_msg->id, ais_dest(&(ais_msg->host)), msg_type2text(ais_msg->host.type), ais_dest(&(ais_msg->sender)), msg_type2text(ais_msg->sender.type)); } LEAVE(""); } void ais_cluster_id_swab(void *msg) { struct crm_identify_msg_s *ais_msg = msg; ENTER(""); ais_debug_3("Performing endian conversion..."); ais_msg->id = swab32 (ais_msg->id); ais_msg->pid = swab32 (ais_msg->pid); ais_msg->votes = swab32 (ais_msg->votes); ais_msg->processes = swab32 (ais_msg->processes); LEAVE(""); } void ais_cluster_id_callback (void *message, unsigned int nodeid) { int changed = 0; struct crm_identify_msg_s *msg = message; if(nodeid != msg->id) { ais_err("Invalid message: Node %u claimed to be node %d", nodeid, msg->id); return; } ais_debug("Node update: %s (%s)", msg->uname, msg->version); changed = update_member( nodeid, membership_seq, msg->votes, msg->processes, msg->uname, NULL); if(changed) { send_member_notification(); } } struct res_overlay { mar_res_header_t header __attribute((aligned(8))); char buf[4096]; }; struct res_overlay *res_overlay = NULL; static void send_ipc_ack(void *conn, int class) { if(res_overlay == NULL) { ais_malloc0(res_overlay, sizeof(struct res_overlay)); } res_overlay->header.size = crm_lib_service[class].response_size; res_overlay->header.id = crm_lib_service[class].response_id; res_overlay->header.error = 0; openais_conn_send_response (conn, res_overlay, res_overlay->header.size); } /* local callbacks */ void ais_ipc_message_callback(void *conn, void *msg) { AIS_Message *ais_msg = msg; int type = ais_msg->sender.type; void *async_conn = openais_conn_partner_get(conn); ENTER("Client=%p", conn); ais_debug_2("Message from client %p", conn); if(type > 0 && ais_msg->host.local && crm_children[type].conn == NULL && ais_msg->host.type == crm_msg_ais && ais_msg->sender.pid == crm_children[type].pid && type < SIZEOF(crm_children)) { ais_info("Recorded connection %p for %s/%d", conn, crm_children[type].name, crm_children[type].pid); crm_children[type].conn = conn; crm_children[type].async_conn = async_conn; /* Make sure they have the latest membership */ if(crm_children[type].flags & crm_flag_members) { char *update = ais_generate_membership_data(); ais_info("Sending membership update %llu to %s", membership_seq, crm_children[type].name); send_client_msg(async_conn, crm_class_members, crm_msg_none,update); } } ais_msg->sender.id = local_nodeid; ais_msg->sender.size = local_uname_len; memset(ais_msg->sender.uname, 0, MAX_NAME); memcpy(ais_msg->sender.uname, local_uname, ais_msg->sender.size); route_ais_message(msg, TRUE); send_ipc_ack(conn, 0); LEAVE(""); } int crm_exec_exit_fn (struct objdb_iface_ver0 *objdb) { int lpc = 0; struct timespec waitsleep = { .tv_sec = 1, .tv_nsec = 0 }; ENTER(""); ais_notice("Begining shutdown"); in_shutdown = TRUE; wait_active = FALSE; /* stop the wait loop */ for (lpc = SIZEOF(crm_children) - 1; lpc > 0; lpc--) { crm_children[lpc].respawn = FALSE; stop_child(&(crm_children[lpc]), SIGTERM); while(crm_children[lpc].command && crm_children[lpc].pid) { int status; pid_t pid = 0; pid = wait4( crm_children[lpc].pid, &status, WNOHANG, NULL); if(pid == 0) { sched_yield (); nanosleep (&waitsleep, 0); continue; } else if(pid < 0) { ais_perror("crm_wait_dispatch: Call to wait4(%s) failed", crm_children[lpc].name); } ais_notice("%s (pid=%d) confirmed dead", crm_children[lpc].name, crm_children[lpc].pid); /* cleanup */ crm_children[lpc].pid = 0; crm_children[lpc].conn = NULL; crm_children[lpc].async_conn = NULL; break; } } send_cluster_id(); ais_notice("Shutdown complete"); LEAVE(""); logsys_flush (); return 0; } struct member_loop_data { char *string; }; void member_loop_fn(gpointer key, gpointer value, gpointer user_data) { crm_node_t *node = value; struct member_loop_data *data = user_data; ais_debug_2("Dumping node %u", node->id); data->string = append_member(data->string, node); } char *ais_generate_membership_data(void) { int size = 0; struct member_loop_data data; size = 14 + 32; /* + int */ ais_malloc0(data.string, size); sprintf(data.string, "", membership_seq); g_hash_table_foreach(membership_list, member_loop_fn, &data); size = strlen(data.string); data.string = realloc(data.string, size + 9) ;/* 9 = + nul */ sprintf(data.string + size, ""); return data.string; } void ais_node_list_query(void *conn, void *msg) { char *data = ais_generate_membership_data(); void *async_conn = openais_conn_partner_get(conn); ais_debug_4("members: %s", data); if(async_conn) { send_client_msg(async_conn, crm_class_members, crm_msg_none, data); } ais_free(data); send_ipc_ack(conn, 1); } void ais_manage_notification(void *conn, void *msg) { int lpc = 0; int enable = 0; AIS_Message *ais_msg = msg; char *data = get_ais_data(ais_msg); if(ais_str_eq("true", data)) { enable = 1; } for (; lpc < SIZEOF(crm_children); lpc++) { if(crm_children[lpc].conn == conn) { ais_info("%s node notifications for %s", enable?"Enabling":"Disabling", crm_children[lpc].name); if(enable) { crm_children[lpc].flags |= crm_flag_members; } else { crm_children[lpc].flags |= crm_flag_members; crm_children[lpc].flags ^= crm_flag_members; } break; } } send_ipc_ack(conn, 2); } void send_member_notification(void) { int lpc = 0; char *update = ais_generate_membership_data(); for (; lpc < SIZEOF(crm_children); lpc++) { if(crm_children[lpc].flags & crm_flag_members) { if(crm_children[lpc].async_conn == NULL) { continue; } ais_info("Sending membership update %llu to %s", membership_seq, crm_children[lpc].name); send_client_msg(crm_children[lpc].async_conn, crm_class_members, crm_msg_none, update); } } ais_free(update); } static gboolean check_message_sanity(AIS_Message *msg, char *data) { gboolean sane = TRUE; gboolean repaired = FALSE; int dest = msg->host.type; int tmp_size = msg->header.size - sizeof(AIS_Message); if(sane && msg->header.size == 0) { ais_err("Message with no size"); sane = FALSE; } if(sane && msg->header.error != 0) { ais_err("Message header contains an error: %d", msg->header.error); sane = FALSE; } if(sane && ais_data_len(msg) != tmp_size) { int cur_size = ais_data_len(msg); repaired = TRUE; if(msg->is_compressed) { msg->compressed_size = tmp_size; } else { msg->size = tmp_size; } ais_err("Repaired message payload size %d -> %d", cur_size, tmp_size); } if(sane && ais_data_len(msg) == 0) { ais_err("Message with no payload"); sane = FALSE; } if(sane && data && msg->is_compressed == FALSE) { int str_size = strlen(data) + 1; if(ais_data_len(msg) != str_size) { int lpc = 0; ais_err("Message payload is corrupted: expected %d bytes, got %d", ais_data_len(msg), str_size); sane = FALSE; for(lpc = (str_size - 10); lpc < msg->size; lpc++) { if(lpc < 0) { lpc = 0; } ais_warn("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]); } } } if(sane == FALSE) { ais_err("Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } else if(repaired) { ais_err("Repaired message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } else { ais_debug("Verified message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } return sane; } gboolean route_ais_message(AIS_Message *msg, gboolean local_origin) { int rc = 0; int level = LOG_WARNING; int dest = msg->host.type; ais_debug_3("Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, local_origin?"false":"true", ais_data_len(msg)); if(local_origin == FALSE) { if(msg->host.size == 0 || ais_str_eq(local_uname, msg->host.uname)) { msg->host.local = TRUE; } } if(check_message_sanity(msg, msg->data) == FALSE) { /* Dont send this message to anyone */ return FALSE; } if(msg->host.local) { void *conn = NULL; const char *lookup = NULL; if(dest == crm_msg_ais) { process_ais_message(msg); return TRUE; } else if(dest == crm_msg_lrmd) { /* lrmd messages are routed via the crm */ dest = crm_msg_crmd; } else if(dest == crm_msg_te) { /* te messages are routed via the crm - for now */ dest = crm_msg_crmd; } if(in_shutdown) { level = LOG_INFO; } AIS_CHECK(dest > 0 && dest < SIZEOF(crm_children), ais_err("Invalid destination: %d", dest); log_ais_message(LOG_ERR, msg); return FALSE; ); rc = 1; lookup = msg_type2text(dest); conn = crm_children[dest].async_conn; /* the cluster fails in weird and wonderfully obscure ways when this is not true */ AIS_ASSERT(ais_str_eq(lookup, crm_children[dest].name)); if (conn == NULL) { do_ais_log(level, "No connection to %s", crm_children[dest].name); } else if (!libais_connection_active(conn)) { do_ais_log(level, "Connection to %s is no longer active", crm_children[dest].name); crm_children[dest].async_conn = NULL; /* } else if ((queue->size - 1) == queue->used) { */ /* ais_err("Connection is throttled: %d", queue->size); */ } else { level = LOG_ERR; ais_debug_3("Delivering locally to %s (size=%d)", crm_children[dest].name, msg->header.size); rc = openais_conn_send_response(conn, msg, msg->header.size); } } else if(local_origin) { /* forward to other hosts */ ais_debug_3("Forwarding to cluster"); rc = send_cluster_msg_raw(msg); } else { ais_debug_3("Ignoring..."); } if(rc != 0) { do_ais_log(level, "Sending message to %s.%s failed (rc=%d)", ais_dest(&(msg->host)), msg_type2text(dest), rc); log_ais_message(level, msg); return FALSE; } return TRUE; } int send_cluster_msg_raw(AIS_Message *ais_msg) { int rc = 0; struct iovec iovec; static uint32_t msg_id = 0; AIS_Message *bz2_msg = NULL; ENTER(""); AIS_ASSERT(local_nodeid != 0); if(ais_msg->header.size != (sizeof(AIS_Message) + ais_data_len(ais_msg))) { ais_err("Repairing size mismatch: %u + %d = %d", (unsigned int)sizeof(AIS_Message), ais_data_len(ais_msg), ais_msg->header.size); ais_msg->header.size = sizeof(AIS_Message) + ais_data_len(ais_msg); } if(ais_msg->id == 0) { msg_id++; AIS_CHECK(msg_id != 0 /* detect wrap-around */, msg_id++; ais_err("Message ID wrapped around")); ais_msg->id = msg_id; } ais_msg->header.id = SERVICE_ID_MAKE(CRM_SERVICE, 0); ais_msg->sender.id = local_nodeid; ais_msg->sender.size = local_uname_len; memset(ais_msg->sender.uname, 0, MAX_NAME); memcpy(ais_msg->sender.uname, local_uname, ais_msg->sender.size); iovec.iov_base = (char *)ais_msg; iovec.iov_len = ais_msg->header.size; #if 0 if(ais_msg->is_compressed == FALSE && ais_msg->size > 1024) { char *compressed = NULL; unsigned int len = (ais_msg->size * 1.1) + 600; /* recomended size */ ais_debug_2("Creating compressed message"); ais_malloc0(compressed, len); rc = BZ2_bzBuffToBuffCompress( compressed, &len, ais_msg->data, ais_msg->size, 3, 0, 30); if(rc != BZ_OK) { ais_err("Compression failed: %d", rc); ais_free(compressed); goto send; } ais_malloc0(bz2_msg, sizeof(AIS_Message) + len + 1); memcpy(bz2_msg, ais_msg, sizeof(AIS_Message)); memcpy(bz2_msg->data, compressed, len); ais_free(compressed); bz2_msg->is_compressed = TRUE; bz2_msg->compressed_size = len; bz2_msg->header.size = sizeof(AIS_Message) + ais_data_len(bz2_msg); ais_debug("Compression details: %d -> %d", bz2_msg->size, ais_data_len(bz2_msg)); iovec.iov_base = (char *)bz2_msg; iovec.iov_len = bz2_msg->header.size; } send: #endif ais_debug_3("Sending message (size=%u)", (unsigned int)iovec.iov_len); rc = totempg_groups_mcast_joined ( openais_group_handle, &iovec, 1, TOTEMPG_SAFE); if(rc == 0 && ais_msg->is_compressed == FALSE) { ais_debug_2("Message sent: %.80s", ais_msg->data); } AIS_CHECK(rc == 0, ais_err("Message not sent (%d)", rc)); ais_free(bz2_msg); LEAVE(""); return rc; } #define min(x,y) (x)<(y)?(x):(y) void send_cluster_id(void) { int rc = 0; int lpc = 0; int len = 0; struct iovec iovec; struct crm_identify_msg_s *msg = NULL; ENTER(""); AIS_ASSERT(local_nodeid != 0); ais_malloc0(msg, sizeof(struct crm_identify_msg_s)); msg->header.size = sizeof(struct crm_identify_msg_s); msg->id = local_nodeid; msg->header.id = SERVICE_ID_MAKE(CRM_SERVICE, 1); len = min(local_uname_len, MAX_NAME-1); memset(msg->uname, 0, MAX_NAME); memcpy(msg->uname, local_uname, len); len = min(strlen(VERSION), MAX_NAME-1); memset(msg->version, 0, MAX_NAME); memcpy(msg->version, VERSION, len); msg->votes = 1; msg->pid = getpid(); msg->processes = crm_proc_ais; for (lpc = 0; lpc < SIZEOF(crm_children); lpc++) { if(crm_children[lpc].pid != 0) { msg->processes |= crm_children[lpc].flag; } } ais_debug("Local update: %u", local_nodeid); update_member( local_nodeid, membership_seq, msg->votes, msg->processes, NULL, NULL); iovec.iov_base = (char *)msg; iovec.iov_len = msg->header.size; rc = totempg_groups_mcast_joined ( openais_group_handle, &iovec, 1, TOTEMPG_SAFE); AIS_CHECK(rc == 0, ais_err("Message not sent (%d)", rc)); ais_free(msg); LEAVE(""); }