diff --git a/lib/common/ais.c b/lib/common/ais.c index e7f49c17ce..f67777bbb7 100644 --- a/lib/common/ais.c +++ b/lib/common/ais.c @@ -1,668 +1,664 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include "stack.h" #include #include enum crm_ais_msg_types text2msg_type(const char *text) { int type = crm_msg_none; CRM_CHECK(text != NULL, return type); if(safe_str_eq(text, "ais")) { type = crm_msg_ais; } else if(safe_str_eq(text, "crm_plugin")) { type = crm_msg_ais; } else if(safe_str_eq(text, CRM_SYSTEM_CIB)) { type = crm_msg_cib; } else if(safe_str_eq(text, CRM_SYSTEM_CRMD)) { type = crm_msg_crmd; } else if(safe_str_eq(text, CRM_SYSTEM_DC)) { type = crm_msg_crmd; } else if(safe_str_eq(text, CRM_SYSTEM_TENGINE)) { type = crm_msg_te; } else if(safe_str_eq(text, CRM_SYSTEM_PENGINE)) { type = crm_msg_pe; } else if(safe_str_eq(text, CRM_SYSTEM_LRMD)) { type = crm_msg_lrmd; } else if(safe_str_eq(text, CRM_SYSTEM_STONITHD)) { type = crm_msg_stonithd; } else if(safe_str_eq(text, "attrd")) { type = crm_msg_attrd; } else { crm_debug_2("Unknown message type: %s", text); } return type; } char *get_ais_data(AIS_Message *msg) { int rc = BZ_OK; char *uncompressed = NULL; unsigned int new_size = msg->size; if(msg->is_compressed == FALSE) { crm_debug_2("Returning uncompressed message data"); uncompressed = strdup(msg->data); } else { crm_debug_2("Decompressing message data"); crm_malloc0(uncompressed, new_size); rc = BZ2_bzBuffToBuffDecompress( uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0); CRM_ASSERT(rc = BZ_OK); CRM_ASSERT(new_size == msg->size); } return uncompressed; } #if SUPPORT_AIS int ais_fd_sync = -1; int ais_fd_async = -1; /* never send messages via this channel */ GFDSource *ais_source = NULL; GFDSource *ais_source_sync = NULL; int32_t get_ais_nodeid(void) { int retries = 0; int rc = SA_AIS_OK; mar_res_header_t header; struct crm_ais_nodeid_resp_s answer; header.id = crm_class_nodeid; header.size = sizeof(mar_res_header_t); retry: errno = 0; rc = saSendReceiveReply(ais_fd_sync, &header, header.size, &answer, sizeof (struct crm_ais_nodeid_resp_s)); if(rc == SA_AIS_OK) { CRM_CHECK(answer.header.size == sizeof (struct crm_ais_nodeid_resp_s), crm_err("Odd message: id=%d, size=%d, error=%d", answer.header.id, answer.header.size, answer.header.error)); CRM_CHECK(answer.header.id == CRM_MESSAGE_NODEID_RESP, crm_err("Bad response id")); } if(rc == SA_AIS_ERR_TRY_AGAIN && retries < 20) { retries++; crm_info("Peer overloaded: Re-sending message (Attempt %d of 20)", retries); mssleep(retries * 100); /* Proportional back off */ goto retry; } if(rc != SA_AIS_OK) { crm_err("Sending nodeid request: FAILED (rc=%d): %s", rc, ais_error2text(rc)); return 0; } else if(answer.header.error != SA_AIS_OK) { crm_err("Bad response from peer: (rc=%d): %s", rc, ais_error2text(rc)); return 0; } return answer.id; } gboolean send_ais_text(int class, const char *data, gboolean local, const char *node, enum crm_ais_msg_types dest) { int retries = 0; static int msg_id = 0; static int local_pid = 0; int rc = SA_AIS_OK; mar_res_header_t header; AIS_Message *ais_msg = NULL; enum crm_ais_msg_types sender = text2msg_type(crm_system_name); if(local_pid == 0) { local_pid = getpid(); } CRM_CHECK(data != NULL, return FALSE); crm_malloc0(ais_msg, sizeof(AIS_Message)); ais_msg->id = msg_id++; ais_msg->header.id = class; ais_msg->host.type = dest; ais_msg->host.local = local; if(node) { ais_msg->host.size = strlen(node); memset(ais_msg->host.uname, 0, MAX_NAME); memcpy(ais_msg->host.uname, node, ais_msg->host.size); ais_msg->host.id = 0; } else { ais_msg->host.size = 0; memset(ais_msg->host.uname, 0, MAX_NAME); ais_msg->host.id = 0; } ais_msg->sender.type = sender; ais_msg->sender.pid = local_pid; ais_msg->sender.size = 0; memset(ais_msg->sender.uname, 0, MAX_NAME); ais_msg->sender.id = 0; ais_msg->size = 1 + strlen(data); if(ais_msg->size < CRM_BZ2_THRESHOLD) { failback: crm_realloc(ais_msg, sizeof(AIS_Message) + ais_msg->size); memcpy(ais_msg->data, data, ais_msg->size); } else { char *compressed = NULL; char *uncompressed = crm_strdup(data); unsigned int len = (ais_msg->size * 1.1) + 600; /* recomended size */ crm_debug_5("Compressing message payload"); crm_malloc(compressed, len); rc = BZ2_bzBuffToBuffCompress( compressed, &len, uncompressed, ais_msg->size, CRM_BZ2_BLOCKS, 0, CRM_BZ2_WORK); crm_free(uncompressed); if(rc != BZ_OK) { crm_err("Compression failed: %d", rc); crm_free(compressed); goto failback; } crm_realloc(ais_msg, sizeof(AIS_Message) + len + 1); memcpy(ais_msg->data, compressed, len); ais_msg->data[len] = 0; crm_free(compressed); ais_msg->is_compressed = TRUE; ais_msg->compressed_size = len; crm_debug_2("Compression details: %d -> %d", ais_msg->size, ais_data_len(ais_msg)); } ais_msg->header.size = sizeof(AIS_Message) + ais_data_len(ais_msg); crm_debug_3("Sending%s message %d to %s.%s (data=%d, total=%d)", ais_msg->is_compressed?" compressed":"", ais_msg->id, ais_dest(&(ais_msg->host)), msg_type2text(dest), ais_data_len(ais_msg), ais_msg->header.size); retry: errno = 0; rc = saSendReceiveReply(ais_fd_sync, ais_msg, ais_msg->header.size, &header, sizeof (mar_res_header_t)); if(rc == SA_AIS_OK) { CRM_CHECK(header.size == sizeof (mar_res_header_t), crm_err("Odd message: id=%d, size=%d, error=%d", header.id, header.size, header.error)); CRM_CHECK(header.id == CRM_MESSAGE_IPC_ACK, crm_err("Bad response id")); CRM_CHECK(header.error == SA_AIS_OK, rc = header.error); } if(rc == SA_AIS_ERR_TRY_AGAIN && retries < 20) { retries++; crm_info("Peer overloaded: Re-sending message (Attempt %d of 20)", retries); mssleep(retries * 100); /* Proportional back off */ goto retry; } if(rc != SA_AIS_OK) { crm_perror(LOG_ERR,"Sending message %d: FAILED (rc=%d): %s", ais_msg->id, rc, ais_error2text(rc)); ais_fd_async = -1; } else { crm_debug_4("Message %d: sent", ais_msg->id); } crm_free(ais_msg); return (rc == SA_AIS_OK); } gboolean send_ais_message(xmlNode *msg, gboolean local, const char *node, enum crm_ais_msg_types dest) { gboolean rc = TRUE; char *data = NULL; if(ais_fd_async < 0 || ais_source == NULL) { crm_err("Not connected to AIS"); return FALSE; } data = dump_xml_unformatted(msg); rc = send_ais_text(0, data, local, node, dest); crm_free(data); return rc; } void terminate_ais_connection(void) { close(ais_fd_sync); close(ais_fd_async); crm_notice("Disconnected from AIS"); /* G_main_del_fd(ais_source); */ /* G_main_del_fd(ais_source_sync); */ } int ais_membership_timer = 0; gboolean ais_membership_force = FALSE; static gboolean ais_membership_dampen(gpointer data) { crm_debug_2("Requesting cluster membership after stabilization delay"); send_ais_text(crm_class_members, __FUNCTION__, TRUE, NULL, crm_msg_ais); ais_membership_force = TRUE; ais_membership_timer = 0; return FALSE; /* never repeat automatically */ } static gboolean ais_dispatch(int sender, gpointer user_data) { char *data = NULL; char *uncompressed = NULL; int rc = SA_AIS_OK; AIS_Message *msg = NULL; mar_res_header_t *header = NULL; static int header_len = sizeof(mar_res_header_t); gboolean (*dispatch)(AIS_Message*,char*,int) = user_data; crm_malloc0(header, header_len); errno = 0; rc = saRecvRetry(sender, header, header_len); if (rc != SA_AIS_OK) { crm_perror(LOG_ERR, "Receiving message header failed: (%d/%d) %s", rc, errno, ais_error2text(rc)); goto bail; } else if(header->size == header_len) { crm_err("Empty message: id=%d, size=%d, error=%d, header_len=%d", header->id, header->size, header->error, header_len); goto done; } else if(header->size == 0 || header->size < header_len) { crm_err("Mangled header: size=%d, header=%d, error=%d", header->size, header_len, header->error); goto done; } else if(header->error != 0) { crm_err("Header contined error: %d", header->error); } crm_debug_2("Looking for %d (%d - %d) more bytes", header->size - header_len, header->size, header_len); crm_realloc(header, header->size); /* Use a char* so we can store the remainder into an offset */ data = (char*)header; errno = 0; rc = saRecvRetry(sender, data+header_len, header->size - header_len); msg = (AIS_Message*)data; if (rc != SA_AIS_OK) { crm_perror(LOG_ERR,"Receiving message body failed: (%d) %s", rc, ais_error2text(rc)); goto bail; } - if(msg->header.id != crm_class_members) { - crm_node_t *node = crm_get_peer(msg->sender.id, msg->sender.uname); - if(node == NULL) { - crm_info("Creating node entry: %u/%s", msg->sender.id, msg->sender.uname); - crm_update_peer(msg->sender.id, 0,0,0,0, msg->sender.uname, NULL, NULL, NULL); - } - } - crm_debug_3("Got new%s message (size=%d, %d, %d)", msg->is_compressed?" compressed":"", ais_data_len(msg), msg->size, msg->compressed_size); data = msg->data; if(msg->is_compressed && msg->size > 0) { int rc = BZ_OK; unsigned int new_size = msg->size; if(check_message_sanity(msg, NULL) == FALSE) { goto badmsg; } crm_debug_5("Decompressing message data"); crm_malloc0(uncompressed, new_size); rc = BZ2_bzBuffToBuffDecompress( uncompressed, &new_size, data, msg->compressed_size, 1, 0); if(rc != BZ_OK) { crm_err("Decompression failed: %d", rc); goto badmsg; } CRM_ASSERT(rc == BZ_OK); CRM_ASSERT(new_size == msg->size); data = uncompressed; } else if(check_message_sanity(msg, data) == FALSE) { goto badmsg; } else if(safe_str_eq("identify", data)) { int pid = getpid(); char *pid_s = crm_itoa(pid); send_ais_text(0, pid_s, TRUE, NULL, crm_msg_ais); crm_free(pid_s); goto done; } if(msg->header.id == crm_class_rmpeer) { uint32_t id = crm_int_helper(data, NULL); crm_info("Removing peer %s/%u", data, id); reap_crm_member(id); crm_calculate_quorum(); goto done; } if(msg->header.id == crm_class_members) { xmlNode *xml = string2xml(data); if(xml != NULL) { gboolean do_ask = FALSE; gboolean do_process = TRUE; unsigned long long seq = 0; int new_size = 0; int current_size = crm_active_members(); const char *reason = "unknown"; const char *value = crm_element_value(xml, "id"); seq = crm_int_helper(value, NULL); crm_debug_2("Received membership %llu", seq); xml_child_iter(xml, node, const char *state = crm_element_value(node, "state"); if(safe_str_eq(state, CRM_NODE_MEMBER)) { new_size++; } ); if(ais_membership_force) { /* always process */ crm_debug_2("Processing delayed membership change"); } else if(current_size == 0 && new_size == 1) { do_ask = TRUE; do_process = FALSE; reason = "We've come up alone"; } else if(new_size < (current_size/2)) { do_process = FALSE; reason = "We've lost more than half our peers"; if(ais_membership_timer == 0) { reason = "We've lost more than half our peers"; crm_log_xml_debug(xml, __PRETTY_FUNCTION__); do_ask = TRUE; } } if(do_process) { static long long last = 0; /* if there is a timer running - let it run * there is no harm in getting an extra membership message */ /* Skip resends */ if(last < seq) { crm_info("Processing membership %llu", seq); } /* crm_log_xml_debug(xml, __PRETTY_FUNCTION__); */ if(ais_membership_force) { ais_membership_force = FALSE; } xml_child_iter(xml, node, crm_update_ais_node(node, seq)); crm_calculate_quorum(); last = seq; } else if(do_ask) { dispatch = NULL; crm_warn("Pausing to allow membership stability (size %d -> %d): %s", current_size, new_size, reason); ais_membership_timer = Gmain_timeout_add(4*1000, ais_membership_dampen, NULL); /* process node additions */ xml_child_iter(xml, node, const char *state = crm_element_value(node, "state"); if(crm_str_eq(state, CRM_NODE_MEMBER, FALSE)) { crm_update_ais_node(node, seq); } ); } else { dispatch = NULL; crm_warn("Membership is still unstable (size %d -> %d): %s", current_size, new_size, reason); } } else { crm_warn("Invalid peer update: %s", data); } free_xml(xml); + + } else { + const char *uuid = msg->sender.uname; + crm_update_peer(msg->sender.id, 0,0,0,0, uuid, msg->sender.uname, NULL, NULL); } if(dispatch != NULL) { dispatch(msg, data, sender); } done: crm_free(uncompressed); crm_free(msg); return TRUE; badmsg: crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):" " min=%d, total=%d, size=%d, bz2_size=%d", msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, (int)sizeof(AIS_Message), msg->header.size, msg->size, msg->compressed_size); goto done; bail: crm_err("AIS connection failed"); return FALSE; } static void ais_destroy(gpointer user_data) { crm_err("AIS connection terminated"); ais_fd_sync = -1; exit(1); } gboolean init_ais_connection( gboolean (*dispatch)(AIS_Message*,char*,int), void (*destroy)(gpointer), char **our_uuid, char **our_uname) { int pid = 0; int retries = 0; int rc = SA_AIS_OK; char *pid_s = NULL; struct utsname name; uint32_t local_nodeid = 0; const char *local_uname = NULL; if(uname(&name) < 0) { crm_perror(LOG_ERR,"uname(2) call failed"); exit(100); } local_uname = name.nodename; crm_notice("Local node name: %s", *our_uname); if(our_uuid != NULL) { *our_uuid = crm_strdup(local_uname); } if(our_uname != NULL) { *our_uname = crm_strdup(local_uname); } retry: crm_info("Creating connection to our AIS plugin"); rc = saServiceConnect (&ais_fd_sync, &ais_fd_async, CRM_SERVICE); if (rc != SA_AIS_OK) { crm_info("Connection to our AIS plugin (%d) failed: %s (%d)", CRM_SERVICE, ais_error2text(rc), rc); } switch(rc) { case SA_AIS_OK: break; case SA_AIS_ERR_TRY_AGAIN: if(retries < 30) { sleep(1); retries++; goto retry; } crm_err("Retry count exceeded"); return FALSE; default: return FALSE; } if(destroy == NULL) { crm_debug("Using the default destroy handler"); destroy = ais_destroy; } crm_info("AIS connection established"); #if 0 ais_source_sync = G_main_add_fd( G_PRIORITY_HIGH, ais_fd_sync, FALSE, ais_dispatch, dispatch, destroy); #endif pid = getpid(); pid_s = crm_itoa(pid); send_ais_text(0, pid_s, TRUE, NULL, crm_msg_ais); crm_free(pid_s); crm_peer_init(); local_nodeid = get_ais_nodeid(); crm_info("Local node id: %u", local_nodeid); if(local_nodeid != 0) { /* Ensure the local node always exists */ crm_update_peer(local_nodeid, 0, 0, 0, 0, local_uname, local_uname, NULL, NULL); } ais_source = G_main_add_fd( G_PRIORITY_HIGH, ais_fd_async, FALSE, ais_dispatch, dispatch, destroy); return TRUE; } gboolean check_message_sanity(AIS_Message *msg, char *data) { gboolean sane = TRUE; gboolean repaired = FALSE; int dest = msg->host.type; int tmp_size = msg->header.size - sizeof(AIS_Message); if(sane && msg->header.size == 0) { crm_warn("Message with no size"); sane = FALSE; } if(sane && msg->header.error != 0) { crm_warn("Message header contains an error: %d", msg->header.error); sane = FALSE; } if(sane && ais_data_len(msg) != tmp_size) { int cur_size = ais_data_len(msg); repaired = TRUE; if(msg->is_compressed) { msg->compressed_size = tmp_size; } else { msg->size = tmp_size; } crm_warn("Repaired message payload size %d -> %d", cur_size, tmp_size); } if(sane && ais_data_len(msg) == 0) { crm_warn("Message with no payload"); sane = FALSE; } if(sane && data && msg->is_compressed == FALSE) { int str_size = strlen(data) + 1; if(ais_data_len(msg) != str_size) { int lpc = 0; crm_warn("Message payload is corrupted: expected %d bytes, got %d", ais_data_len(msg), str_size); sane = FALSE; for(lpc = (str_size - 10); lpc < msg->size; lpc++) { if(lpc < 0) { lpc = 0; } crm_debug("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]); } } } if(sane == FALSE) { crm_err("Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } else if(repaired) { crm_err("Repaired message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } else { crm_debug_3("Verfied message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } return sane; } #endif diff --git a/lib/common/membership.c b/lib/common/membership.c index 448b09d9c8..1a4e29d371 100644 --- a/lib/common/membership.c +++ b/lib/common/membership.c @@ -1,557 +1,573 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #ifndef _GNU_SOURCE # define _GNU_SOURCE #endif #include #include #include #include #include #include #include #include #include struct quorum_count_s { guint votes_max; guint votes_active; guint votes_total; guint nodes_max; guint nodes_total; }; GHashTable *crm_peer_id_cache = NULL; GHashTable *crm_peer_cache = NULL; unsigned long long crm_peer_seq = 0; unsigned long long crm_max_peers = 0; struct quorum_count_s quorum_stats; gboolean crm_have_quorum = FALSE; gboolean crm_is_member_active(const crm_node_t *node) { if(node && safe_str_eq(node->state, CRM_NODE_MEMBER)) { return TRUE; } return FALSE; } static gboolean crm_reap_dead_member( gpointer key, gpointer value, gpointer user_data) { crm_node_t *node = value; crm_node_t *search = user_data; if(search != NULL && node->id != search->id) { return FALSE; } else if(crm_is_member_active(value) == FALSE) { quorum_stats.nodes_total -= 1; quorum_stats.votes_total -= node->votes; crm_notice("Removing %s/%u from the membership list (votes=%u, nodes=%u)", node->uname, node->id, quorum_stats.votes_total, quorum_stats.nodes_total); return TRUE; } return FALSE; } guint reap_crm_member(uint32_t id) { int matches = 0; crm_node_t *node = g_hash_table_lookup(crm_peer_id_cache, GUINT_TO_POINTER(id)); if(node == NULL) { crm_info("Peer %u is unknown", id); } else if(crm_is_member_active(node)) { crm_warn("Peer %u/%s is still active", id, node->uname); } else { if(g_hash_table_remove(crm_peer_id_cache, GUINT_TO_POINTER(id))) { crm_notice("Removed dead peer %u from the uuid cache", id); } else { crm_warn("Peer %u/%s was not removed", id, node->uname); } matches = g_hash_table_foreach_remove( crm_peer_cache, crm_reap_dead_member, node); crm_notice("Removed %d dead peers with id=%u from the membership list", matches, id); } return matches; } static void crm_count_member( gpointer key, gpointer value, gpointer user_data) { guint *count = user_data; if(crm_is_member_active(value)) { *count = *count + 1; } } guint crm_active_members(void) { guint count = 0; g_hash_table_foreach(crm_peer_cache, crm_count_member, &count); return count; } struct peer_count_s { uint32_t peer; guint count; }; static void crm_count_peer( gpointer key, gpointer value, gpointer user_data) { crm_node_t *node = value; struct peer_count_s *search = user_data; if(crm_is_member_active(node) && (node->processes & search->peer)) { search->count = search->count + 1; } } guint crm_active_peers(uint32_t peer) { struct peer_count_s search; search.count = 0; search.peer = peer; g_hash_table_foreach(crm_peer_cache, crm_count_peer, &search); return search.count; } void destroy_crm_node(gpointer data) { crm_node_t *node = data; crm_debug_2("Destroying entry for node %u", node->id); crm_free(node->addr); crm_free(node->uname); crm_free(node->state); crm_free(node); } void crm_peer_init(void) { const char *value = NULL; static gboolean initialized = FALSE; if(initialized) { return; } initialized = TRUE; quorum_stats.votes_max = 2; quorum_stats.votes_active = 0; quorum_stats.votes_total = 0; quorum_stats.nodes_max = 1; quorum_stats.nodes_total = 0; value = getenv("HA_expected_votes"); if(value) { crm_notice("%s expected quorum votes", value); quorum_stats.votes_max = crm_int_helper(value, NULL); } value = getenv("HA_expected_nodes"); if(value) { crm_notice("%s expected nodes", value); quorum_stats.nodes_max = crm_int_helper(value, NULL); } if(quorum_stats.votes_max < 1) { quorum_stats.votes_max = 1; } if(quorum_stats.nodes_max < 1) { quorum_stats.nodes_max = 1; } crm_peer_destroy(); if(crm_peer_cache == NULL) { crm_peer_cache = g_hash_table_new_full( g_str_hash, g_str_equal, NULL, destroy_crm_node); } if(crm_peer_id_cache == NULL) { crm_peer_id_cache = g_hash_table_new_full( g_direct_hash, g_direct_equal, NULL, NULL); } } void crm_peer_destroy(void) { if(crm_peer_cache != NULL) { g_hash_table_destroy(crm_peer_cache); crm_peer_cache = NULL; } if(crm_peer_id_cache != NULL) { g_hash_table_destroy(crm_peer_id_cache); crm_peer_id_cache = NULL; } } +void (*crm_status_callback)(enum crm_status_type, crm_node_t*, const void*) = NULL; + +void crm_set_status_callback( + void (*dispatch)(enum crm_status_type,crm_node_t*, const void*)) +{ + crm_status_callback = dispatch; +} + static crm_node_t *crm_new_peer(unsigned int id, const char *uname) { crm_node_t *node = NULL; CRM_CHECK(uname != NULL || id > 0, return NULL); crm_debug("Creating entry for node %s/%u", uname, id); crm_malloc0(node, sizeof(crm_node_t)); node->state = crm_strdup("unknown"); + if(id > 0) { + node->id = id; + crm_info("Node %s now has id: %u", crm_str(uname), id); + g_hash_table_remove(crm_peer_id_cache, GUINT_TO_POINTER(id)); + g_hash_table_insert(crm_peer_id_cache, GUINT_TO_POINTER(id), node); + } + + if(uname) { + node->uname = crm_strdup(uname); + crm_info("Node %u is now known as %s", id, uname); + g_hash_table_insert(crm_peer_cache, node->uname, node); + + if(is_openais_cluster()) { + node->uuid = crm_strdup(node->uname); + } + + if(crm_status_callback) { + crm_status_callback(crm_status_uname, node, NULL); + } + } + return node; } crm_node_t *crm_get_peer(unsigned int id, const char *uname) { crm_node_t *node = NULL; if(uname != NULL) { node = g_hash_table_lookup(crm_peer_cache, uname); } + if(node == NULL && id > 0) { node = g_hash_table_lookup(crm_peer_id_cache, GUINT_TO_POINTER(id)); - if(node && uname) { - CRM_CHECK(node->uname == NULL, - crm_err("Node %u was renamed from %s to %s", id, node->uname, uname)); + if(node && node->uname && uname) { + crm_err("Node %u is was renamed from %s to %s", id, node->uname, uname); + /* NOTE: Calling crm_new_peer() means the entry in + * crm_peer_id_cache will point to the new entity + */ + + /* TODO: Replace the old uname instead? */ node = crm_new_peer(id, uname); g_hash_table_insert(crm_peer_cache, node->uname, node); + CRM_ASSERT(node->uname != NULL); } } - return node; -} -void (*crm_status_callback)(enum crm_status_type, crm_node_t*, const void*) = NULL; + if(node && uname && node->uname == NULL) { + node->uname = crm_strdup(uname); + crm_info("Node %u is now known as %s", id, uname); + g_hash_table_insert(crm_peer_cache, node->uname, node); + if(crm_status_callback) { + crm_status_callback(crm_status_uname, node, NULL); + } + + } + + if(node && id > 0 && id != node->id) { + g_hash_table_remove(crm_peer_id_cache, GUINT_TO_POINTER(node->id)); + g_hash_table_insert(crm_peer_id_cache, GUINT_TO_POINTER(id), node); + node->id = id; + crm_info("Node %s now has id: %u", crm_str(uname), id); + } -void crm_set_status_callback( - void (*dispatch)(enum crm_status_type,crm_node_t*, const void*)) -{ - crm_status_callback = dispatch; + return node; } - crm_node_t *crm_update_peer( unsigned int id, uint64_t born, uint64_t seen, int32_t votes, uint32_t children, const char *uuid, const char *uname, const char *addr, const char *state) { - gboolean id_changed = FALSE; - gboolean uname_changed = FALSE; gboolean state_changed = FALSE; gboolean addr_changed = FALSE; gboolean procs_changed = FALSE; gboolean votes_changed = FALSE; crm_node_t *node = NULL; CRM_CHECK(uname != NULL || id > 0, return NULL); CRM_ASSERT(crm_peer_cache != NULL); CRM_ASSERT(crm_peer_id_cache != NULL); - - if(uname != NULL) { - node = g_hash_table_lookup(crm_peer_cache, uname); - } - if(node == NULL && id > 0) { - node = g_hash_table_lookup(crm_peer_id_cache, GUINT_TO_POINTER(id)); - } - + node = crm_get_peer(id, uname); if(node == NULL) { node = crm_new_peer(id, uname); /* do it now so we don't get '(new)' everywhere */ node->votes = votes; node->processes = children; if(addr) { node->addr = crm_strdup(addr); } } if(votes > 0 && node->votes != votes) { votes_changed = TRUE; node->votes = votes; } - - if(uname != NULL && node->uname == NULL) { - uname_changed = TRUE; - node->uname = crm_strdup(uname); - crm_info("Node %u is now known as %s", id, uname); - g_hash_table_insert(crm_peer_cache, node->uname, node); - if(crm_status_callback) { - crm_status_callback(crm_status_uname, node, NULL); - } - } if(node->uuid == NULL) { if(uuid != NULL) { node->uuid = crm_strdup(uuid); } else if(node->uname != NULL && is_openais_cluster()) { node->uuid = crm_strdup(node->uname); } } - if(id > 0 && id != node->id) { - id_changed = TRUE; - g_hash_table_remove(crm_peer_id_cache, GUINT_TO_POINTER(node->id)); - g_hash_table_insert(crm_peer_id_cache, GUINT_TO_POINTER(id), node); - node->id = id; - crm_info("Node %s now has id: %u", crm_str(uname), id); - } - if(children > 0 && children != node->processes) { uint32_t last = node->processes; node->processes = children; procs_changed = TRUE; if(crm_status_callback) { crm_status_callback(crm_status_processes, node, &last); } } if(born != 0) { node->born = born; } if(state != NULL && safe_str_neq(node->state, state)) { char *last = node->state; node->state = crm_strdup(state); state_changed = TRUE; if(crm_status_callback) { crm_status_callback(crm_status_nstate, node, last); } crm_free(last); } if(seen != 0 && crm_is_member_active(node)) { node->last_seen = seen; } if(addr != NULL) { if(node->addr == NULL || crm_str_eq(node->addr, addr, FALSE) == FALSE) { addr_changed = TRUE; crm_free(node->addr); node->addr = crm_strdup(addr); } } - if(id_changed || uname_changed || state_changed || addr_changed || votes_changed || procs_changed) { - crm_info("%sNode %s: id=%u%s state=%s%s addr=%s%s votes=%d%s born="U64T" seen="U64T" proc=%.32x%s", - uname_changed?"New ":"", node->uname, - node->id, id_changed?" (new)":"", + if(state_changed || addr_changed || votes_changed || procs_changed) { + crm_info("Node %s: id=%u state=%s%s addr=%s%s votes=%d%s born="U64T" seen="U64T" proc=%.32x%s", + node->uname, node->id, node->state, state_changed?" (new)":"", node->addr, addr_changed?" (new)":"", node->votes, votes_changed?" (new)":"", node->born, node->last_seen, node->processes, procs_changed?" (new)":"" ); } return node; } crm_node_t *crm_update_ais_node(xmlNode *member, long long seq) { const char *id_s = crm_element_value(member, "id"); const char *addr = crm_element_value(member, "addr"); const char *uname = crm_element_value(member, "uname"); const char *state = crm_element_value(member, "state"); const char *born_s = crm_element_value(member, "born"); const char *seen_s = crm_element_value(member, "seen"); const char *votes_s = crm_element_value(member, "votes"); const char *procs_s = crm_element_value(member, "processes"); int votes = crm_int_helper(votes_s, NULL); unsigned int id = crm_int_helper(id_s, NULL); unsigned int procs = crm_int_helper(procs_s, NULL); /* TODO: These values will contain garbage if version < 0.7.1 */ uint64_t born = crm_int_helper(born_s, NULL); uint64_t seen = crm_int_helper(seen_s, NULL); return crm_update_peer(id, born, seen, votes, procs, uname, uname, addr, state); } #if SUPPORT_HEARTBEAT crm_node_t *crm_update_ccm_node( const oc_ev_membership_t *oc, int offset, const char *state, uint64_t seq) { crm_node_t *node = NULL; const char *uuid = NULL; CRM_CHECK(oc->m_array[offset].node_uname != NULL, return NULL); uuid = get_uuid(oc->m_array[offset].node_uname); node = crm_update_peer(oc->m_array[offset].node_id, oc->m_array[offset].node_born_on, seq, -1, 0, uuid, oc->m_array[offset].node_uname, NULL, state); if(safe_str_eq(CRM_NODE_ACTIVE, state)) { /* Heartbeat doesn't send status notifications for nodes that were already part of the cluster */ crm_update_peer_proc( oc->m_array[offset].node_uname, crm_proc_ais, ONLINESTATUS); /* Nor does it send status notifications for processes that were already active */ crm_update_peer_proc( oc->m_array[offset].node_uname, crm_proc_crmd, ONLINESTATUS); } return node; } #endif void crm_update_peer_proc(const char *uname, uint32_t flag, const char *status) { crm_node_t *node = NULL; gboolean changed = FALSE; CRM_ASSERT(crm_peer_cache != NULL); CRM_CHECK(uname != NULL, return); node = g_hash_table_lookup(crm_peer_cache, uname); CRM_CHECK(node != NULL, crm_err("Could not set %s.%s to %s", uname, peer2text(flag), status); return); if(safe_str_eq(status, ONLINESTATUS)) { if((node->processes & flag) == 0) { set_bit_inplace(node->processes, flag); changed = TRUE; } } else if(node->processes & flag) { clear_bit_inplace(node->processes, flag); changed = TRUE; } if(changed) { crm_info("%s.%s is now %s", uname, peer2text(flag), status); } } static void crm_count_quorum( gpointer key, gpointer value, gpointer user_data) { crm_node_t *node = value; quorum_stats.nodes_total += 1; quorum_stats.votes_total += node->votes; if(crm_is_member_active(node)) { quorum_stats.votes_active = quorum_stats.votes_active + node->votes; } } gboolean crm_calculate_quorum(void) { unsigned int limit = 0; gboolean quorate = TRUE; quorum_stats.votes_total = 0; quorum_stats.nodes_total = 0; quorum_stats.votes_active = 0; g_hash_table_foreach(crm_peer_cache, crm_count_quorum, NULL); if(quorum_stats.votes_total > quorum_stats.votes_max) { crm_info("Known quorum votes: %u -> %u", quorum_stats.votes_max, quorum_stats.votes_total); quorum_stats.votes_max = quorum_stats.votes_total; } if(quorum_stats.nodes_total > quorum_stats.nodes_max) { crm_debug("Known quorum nodes: %u -> %u", quorum_stats.nodes_max, quorum_stats.nodes_total); quorum_stats.nodes_max = quorum_stats.nodes_total; } limit = (quorum_stats.votes_max + 2) / 2; if(quorum_stats.votes_active < limit) { quorate = FALSE; } crm_debug("known: %u, available: %u, limit: %u, active: %u: %s", quorum_stats.votes_max, quorum_stats.votes_total, limit, quorum_stats.votes_active, quorate?"true":"false"); if(quorate != crm_have_quorum) { crm_notice("Membership %llu: quorum %s", crm_peer_seq, quorate?"attained":"lost"); } else { crm_debug("Membership %llu: quorum %s", crm_peer_seq, quorate?"retained":"lost"); } crm_have_quorum = quorate; return quorate; } /* Code appropriated (with permission) from cman/daemon/commands.c under GPLv2 */ #if 0 static int calculate_quorum(int allow_decrease, int max_expected, unsigned int *ret_total_votes) { struct list *nodelist; struct cluster_node *node; unsigned int total_votes = 0; unsigned int highest_expected = 0; unsigned int newquorum, q1, q2; unsigned int total_nodes = 0; list_iterate(nodelist, &cluster_members_list) { node = list_item(nodelist, struct cluster_node); if (node->state == NODESTATE_MEMBER) { highest_expected = max(highest_expected, node->expected_votes); total_votes += node->votes; total_nodes++; } } if (quorum_device && quorum_device->state == NODESTATE_MEMBER) total_votes += quorum_device->votes; if (max_expected > 0) highest_expected = max_expected; /* This quorum calculation is taken from the OpenVMS Cluster Systems * manual, but, then, you guessed that didn't you */ q1 = (highest_expected + 2) / 2; q2 = (total_votes + 2) / 2; newquorum = max(q1, q2); /* Normally quorum never decreases but the system administrator can * force it down by setting expected votes to a maximum value */ if (!allow_decrease) newquorum = max(quorum, newquorum); /* The special two_node mode allows each of the two nodes to retain * quorum if the other fails. Only one of the two should live past * fencing (as both nodes try to fence each other in split-brain.) * Also: if there are more than two nodes, force us inquorate to avoid * any damage or confusion. */ if (two_node && total_nodes <= 2) newquorum = 1; if (ret_total_votes) *ret_total_votes = total_votes; return newquorum; } #endif