diff --git a/lib/cluster/corosync.c b/lib/cluster/corosync.c index c7d9276109..fe773107b7 100644 --- a/lib/cluster/corosync.c +++ b/lib/cluster/corosync.c @@ -1,1029 +1,1029 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include cpg_handle_t pcmk_cpg_handle = 0; struct cpg_name pcmk_cpg_group = { .length = 0, .value[0] = 0, }; quorum_handle_t pcmk_quorum_handle = 0; gboolean(*quorum_app_callback) (unsigned long long seq, gboolean quorate) = NULL; static char *pcmk_uname = NULL; static int pcmk_uname_len = 0; static uint32_t pcmk_nodeid = 0; #define cs_repeat(counter, max, code) do { \ code; \ if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \ counter++; \ crm_debug("Retrying operation after %ds", counter); \ sleep(counter); \ } else { \ break; \ } \ } while(counter < max) /* * CFG functionality stolen from node_name() in corosync-quorumtool.c * This resolves the first address assigned to a node and returns the name or IP address. */ char *corosync_node_name(uint64_t /*cmap_handle_t*/ cmap_handle, uint32_t nodeid) { int lpc = 0; int rc = CS_OK; int retries = 0; char *name = NULL; cmap_handle_t local_handle = 0; /* nodeid == 0 == CMAN_NODEID_US */ if(nodeid == 0 && pcmk_nodeid) { nodeid = pcmk_nodeid; } else if(nodeid == 0) { /* Look it up */ int rc = -1; int retries = 0; cpg_handle_t handle = 0; cpg_callbacks_t cb = {}; cs_repeat(retries, 5, rc = cpg_initialize(&handle, &cb)); if (rc == CS_OK) { retries = 0; cs_repeat(retries, 5, rc = cpg_local_get(handle, &pcmk_nodeid)); } if (rc != CS_OK) { crm_err("Could not get local node id from the CPG API: %d", rc); } cpg_finalize(handle); } if(cmap_handle == 0 && local_handle == 0) { retries = 0; crm_trace("Initializing CMAP connection"); do { rc = cmap_initialize(&local_handle); if(rc != CS_OK) { retries++; crm_debug("API connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries); sleep(retries); } } while(retries < 5 && rc != CS_OK); if (rc != CS_OK) { crm_warn("Could not connect to Cluster Configuration Database API, error %s", cs_strerror(rc)); local_handle = 0; } } if(cmap_handle == 0) { cmap_handle = local_handle; } while(name == NULL && cmap_handle != 0) { uint32_t id = 0; char *key = NULL; key = g_strdup_printf("nodelist.node.%d.nodeid", lpc); rc = cmap_get_uint32(cmap_handle, key, &id); crm_trace("Checking %u vs %u from %s", nodeid, id, key); g_free(key); if(rc != CS_OK) { break; } if(nodeid == id) { crm_trace("Searching for node name for %u in nodelist.node.%d %s", nodeid, lpc, name); if(name == NULL) { key = g_strdup_printf("nodelist.node.%d.ring0_addr", lpc); rc = cmap_get_string(cmap_handle, key, &name); crm_trace("%s = %s", key, name); if(node_name_is_valid(key, name) == FALSE) { free(name); name = NULL; } g_free(key); } if(name == NULL) { key = g_strdup_printf("nodelist.node.%d.name", lpc); rc = cmap_get_string(cmap_handle, key, &name); crm_trace("%s = %s %d", key, name, rc); g_free(key); } break; } lpc++; } if(name == NULL) { crm_notice("Unable to get node name for nodeid %u", nodeid); } return name; } enum crm_ais_msg_types text2msg_type(const char *text) { int type = crm_msg_none; CRM_CHECK(text != NULL, return type); if (safe_str_eq(text, "ais")) { type = crm_msg_ais; } else if (safe_str_eq(text, "crm_plugin")) { type = crm_msg_ais; } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) { type = crm_msg_cib; } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) { type = crm_msg_crmd; } else if (safe_str_eq(text, CRM_SYSTEM_DC)) { type = crm_msg_crmd; } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) { type = crm_msg_te; } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) { type = crm_msg_pe; } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) { type = crm_msg_lrmd; } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) { type = crm_msg_stonithd; } else if (safe_str_eq(text, "stonith-ng")) { type = crm_msg_stonith_ng; } else if (safe_str_eq(text, "attrd")) { type = crm_msg_attrd; } else { /* This will normally be a transient client rather than * a cluster daemon. Set the type to the pid of the client */ int scan_rc = sscanf(text, "%d", &type); if (scan_rc != 1) { /* Ensure its sane */ type = crm_msg_none; } } return type; } static char *ais_cluster_name = NULL; gboolean crm_get_cluster_name(char **cname) { CRM_CHECK(cname != NULL, return FALSE); if (ais_cluster_name) { *cname = strdup(ais_cluster_name); return TRUE; } return FALSE; } gboolean send_ais_text(int class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest) { static int msg_id = 0; static int local_pid = 0; int retries = 0; int rc = CS_OK; int buf_len = sizeof(cs_ipc_header_response_t); char *buf = NULL; struct iovec iov; const char *transport = "pcmk"; AIS_Message *ais_msg = NULL; enum crm_ais_msg_types sender = text2msg_type(crm_system_name); /* There are only 6 handlers registered to crm_lib_service in plugin.c */ CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class); return FALSE); if (data == NULL) { data = ""; } if (local_pid == 0) { local_pid = getpid(); } if (sender == crm_msg_none) { sender = local_pid; } ais_msg = calloc(1, sizeof(AIS_Message)); ais_msg->id = msg_id++; ais_msg->header.id = class; ais_msg->header.error = CS_OK; ais_msg->host.type = dest; ais_msg->host.local = local; if (node) { if (node->uname) { ais_msg->host.size = strlen(node->uname); memset(ais_msg->host.uname, 0, MAX_NAME); memcpy(ais_msg->host.uname, node->uname, ais_msg->host.size); } ais_msg->host.id = node->id; } ais_msg->sender.id = 0; ais_msg->sender.type = sender; ais_msg->sender.pid = local_pid; ais_msg->sender.size = pcmk_uname_len; memset(ais_msg->sender.uname, 0, MAX_NAME); memcpy(ais_msg->sender.uname, pcmk_uname, ais_msg->sender.size); ais_msg->size = 1 + strlen(data); if (ais_msg->size < CRM_BZ2_THRESHOLD) { failback: ais_msg = realloc(ais_msg, sizeof(AIS_Message) + ais_msg->size); memcpy(ais_msg->data, data, ais_msg->size); } else { char *compressed = NULL; char *uncompressed = strdup(data); unsigned int len = (ais_msg->size * 1.1) + 600; /* recomended size */ crm_trace("Compressing message payload"); /* coverity[returned_null] Ignore */ compressed = malloc( len); rc = BZ2_bzBuffToBuffCompress(compressed, &len, uncompressed, ais_msg->size, CRM_BZ2_BLOCKS, 0, CRM_BZ2_WORK); free(uncompressed); if (rc != BZ_OK) { crm_err("Compression failed: %d", rc); free(compressed); goto failback; } ais_msg = realloc(ais_msg, sizeof(AIS_Message) + len + 1); memcpy(ais_msg->data, compressed, len); ais_msg->data[len] = 0; free(compressed); ais_msg->is_compressed = TRUE; ais_msg->compressed_size = len; crm_trace("Compression details: %d -> %d", ais_msg->size, ais_data_len(ais_msg)); } ais_msg->header.size = sizeof(AIS_Message) + ais_data_len(ais_msg); crm_trace("Sending%s message %d to %s.%s (data=%d, total=%d)", ais_msg->is_compressed ? " compressed" : "", ais_msg->id, ais_dest(&(ais_msg->host)), msg_type2text(dest), ais_data_len(ais_msg), ais_msg->header.size); iov.iov_base = ais_msg; iov.iov_len = ais_msg->header.size; buf = realloc(buf, buf_len); do { if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { retries++; crm_info("Peer overloaded or membership in flux:" " Re-sending message (Attempt %d of 20)", retries); sleep(retries); /* Proportional back off */ } errno = 0; transport = "cpg"; CRM_CHECK(dest != crm_msg_ais, rc = CS_ERR_MESSAGE_ERROR; goto bail); rc = cpg_mcast_joined(pcmk_cpg_handle, CPG_TYPE_AGREED, &iov, 1); if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { cpg_flow_control_state_t fc_state = CPG_FLOW_CONTROL_DISABLED; int rc2 = cpg_flow_control_state_get(pcmk_cpg_handle, &fc_state); if (rc2 == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) { crm_warn("Connection overloaded, cannot send messages"); goto bail; } else if (rc2 != CS_OK) { crm_warn("Could not determin the connection state: %s (%d)", ais_error2text(rc2), rc2); goto bail; } } } while ((rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) && retries < 20); bail: if (rc != CS_OK) { crm_perror(LOG_ERR, "Sending message %d via %s: FAILED (rc=%d): %s", ais_msg->id, transport, rc, ais_error2text(rc)); } else { crm_trace("Message %d: sent", ais_msg->id); } free(buf); free(ais_msg); return (rc == CS_OK); } gboolean send_ais_message(xmlNode * msg, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest) { gboolean rc = TRUE; char *data = dump_xml_unformatted(msg); rc = send_ais_text(crm_class_cluster, data, local, node, dest); free(data); return rc; } void terminate_cs_connection(void) { crm_notice("Disconnecting from Corosync"); if(pcmk_cpg_handle) { crm_trace("Disconnecting CPG"); cpg_leave(pcmk_cpg_handle, &pcmk_cpg_group); cpg_finalize(pcmk_cpg_handle); pcmk_cpg_handle = 0; } else { crm_info("No CPG connection"); } if(pcmk_quorum_handle) { crm_trace("Disconnecting quorum"); quorum_finalize(pcmk_quorum_handle); pcmk_quorum_handle = 0; } else { crm_info("No Quorum connection"); } } int ais_membership_timer = 0; gboolean ais_membership_force = FALSE; static gboolean ais_dispatch_message(AIS_Message * msg, gboolean(*dispatch) (int kind, const char *from, const char *data)) { char *data = NULL; char *uncompressed = NULL; xmlNode *xml = NULL; CRM_ASSERT(msg != NULL); crm_trace("Got new%s message (size=%d, %d, %d)", msg->is_compressed ? " compressed" : "", ais_data_len(msg), msg->size, msg->compressed_size); data = msg->data; if (msg->is_compressed && msg->size > 0) { int rc = BZ_OK; unsigned int new_size = msg->size + 1; if (check_message_sanity(msg, NULL) == FALSE) { goto badmsg; } crm_trace("Decompressing message data"); uncompressed = calloc(1, new_size); rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, data, msg->compressed_size, 1, 0); if (rc != BZ_OK) { crm_err("Decompression failed: %d", rc); goto badmsg; } CRM_ASSERT(rc == BZ_OK); CRM_ASSERT(new_size == msg->size); data = uncompressed; } else if (check_message_sanity(msg, data) == FALSE) { goto badmsg; } else if (safe_str_eq("identify", data)) { int pid = getpid(); char *pid_s = crm_itoa(pid); send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais); free(pid_s); goto done; } if (msg->header.id != crm_class_members) { /* Is this even needed anymore? */ crm_get_peer(msg->sender.id, msg->sender.uname); } if (msg->header.id == crm_class_rmpeer) { uint32_t id = crm_int_helper(data, NULL); crm_info("Removing peer %s/%u", data, id); reap_crm_member(id, NULL); goto done; } - crm_trace("Payload: %s", data); + crm_trace("Payload: %.200s", data); if (dispatch != NULL) { dispatch(msg->header.id, msg->sender.uname, data); } done: free(uncompressed); free_xml(xml); return TRUE; badmsg: crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):" " min=%d, total=%d, size=%d, bz2_size=%d", msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, (int)sizeof(AIS_Message), msg->header.size, msg->size, msg->compressed_size); goto done; } gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL; static int pcmk_cpg_dispatch(gpointer user_data) { int rc = 0; pcmk_cpg_dispatch_fn = user_data; rc = cpg_dispatch(pcmk_cpg_handle, CS_DISPATCH_ALL); if (rc != CS_OK) { crm_err("Connection to the CPG API failed: %d", rc); pcmk_cpg_handle = 0; return -1; } return 0; } static void pcmk_cpg_deliver(cpg_handle_t handle, const struct cpg_name *groupName, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { AIS_Message *ais_msg = (AIS_Message *) msg; if (ais_msg->sender.id > 0 && ais_msg->sender.id != nodeid) { crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, ais_msg->sender.id); return; } else if (ais_msg->host.size != 0 && safe_str_neq(ais_msg->host.uname, pcmk_uname)) { /* Not for us */ return; } else if (ais_msg->host.id != 0 && (pcmk_nodeid != ais_msg->host.id)) { /* Not for us */ return; } ais_msg->sender.id = nodeid; if (ais_msg->sender.size == 0) { crm_node_t *peer = crm_get_peer(nodeid, NULL); if (peer == NULL) { crm_err("Peer with nodeid=%u is unknown", nodeid); } else if (peer->uname == NULL) { crm_err("No uname for peer with nodeid=%u", nodeid); } else { crm_notice("Fixing uname for peer with nodeid=%u", nodeid); ais_msg->sender.size = strlen(peer->uname); memset(ais_msg->sender.uname, 0, MAX_NAME); memcpy(ais_msg->sender.uname, peer->uname, ais_msg->sender.size); } } ais_dispatch_message(ais_msg, pcmk_cpg_dispatch_fn); } static void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries) { int i; gboolean found = FALSE; static int counter = 0; for (i = 0; i < left_list_entries; i++) { crm_node_t *peer = crm_get_peer(left_list[i].nodeid, NULL); crm_info("Left[%d.%d] %s.%u ", counter, i, groupName->value, left_list[i].nodeid); crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS); } for (i = 0; i < joined_list_entries; i++) { crm_info("Joined[%d.%d] %s.%u ", counter, i, groupName->value, joined_list[i].nodeid); } for (i = 0; i < member_list_entries; i++) { crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL); crm_info("Member[%d.%d] %s.%u ", counter, i, groupName->value, member_list[i].nodeid); crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS); if(pcmk_nodeid == member_list[i].nodeid) { found = TRUE; } } if(!found) { crm_err("We're not part of CPG group %s anymore!", groupName->value); /* Possibly re-call cpg_join() */ } counter++; } cpg_callbacks_t cpg_callbacks = { .cpg_deliver_fn = pcmk_cpg_deliver, .cpg_confchg_fn = pcmk_cpg_membership, }; static gboolean init_cpg_connection(gboolean(*dispatch) (int kind, const char *from, const char *data), void (*destroy) (gpointer), uint32_t * nodeid) { int rc = -1; int fd = 0; int retries = 0; crm_node_t *peer = NULL; struct mainloop_fd_callbacks cpg_fd_callbacks = { .dispatch = pcmk_cpg_dispatch, .destroy = destroy, }; strncpy(pcmk_cpg_group.value, crm_system_name, 128); pcmk_cpg_group.length = strlen(crm_system_name) + 1; cs_repeat(retries, 30, rc = cpg_initialize(&pcmk_cpg_handle, &cpg_callbacks)); if (rc != CS_OK) { crm_err("Could not connect to the Cluster Process Group API: %d\n", rc); goto bail; } retries = 0; cs_repeat(retries, 30, rc = cpg_local_get(pcmk_cpg_handle, (unsigned int *)nodeid)); if (rc != CS_OK) { crm_err("Could not get local node id from the CPG API"); goto bail; } retries = 0; cs_repeat(retries, 30, rc = cpg_join(pcmk_cpg_handle, &pcmk_cpg_group)); if (rc != CS_OK) { crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc); goto bail; } rc = cpg_fd_get(pcmk_cpg_handle, &fd); if (rc != CS_OK) { crm_err("Could not obtain the CPG API connection: %d\n", rc); goto bail; } mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, dispatch, &cpg_fd_callbacks); bail: if (rc != CS_OK) { cpg_finalize(pcmk_cpg_handle); return FALSE; } peer = crm_get_peer(pcmk_nodeid, pcmk_uname); crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS); return TRUE; } static int pcmk_quorum_dispatch(gpointer user_data) { int rc = 0; rc = quorum_dispatch(pcmk_quorum_handle, CS_DISPATCH_ALL); if (rc < 0) { crm_err("Connection to the Quorum API failed: %d", rc); pcmk_quorum_handle = 0; return -1; } return 0; } static void corosync_mark_unseen_peer_dead(gpointer key, gpointer value, gpointer user_data) { int *seq = user_data; crm_node_t *node = value; if (node->last_seen != *seq && node->state && crm_str_eq(CRM_NODE_LOST, node->state, TRUE) == FALSE) { crm_notice("Node %d/%s was not seen in the previous transition", node->id, node->uname); crm_update_peer_state(__FUNCTION__, node, CRM_NODE_LOST, 0); } } static void corosync_mark_node_unseen(gpointer key, gpointer value, gpointer user_data) { crm_node_t *node = value; node->last_seen = 0; } static void pcmk_quorum_notification(quorum_handle_t handle, uint32_t quorate, uint64_t ring_id, uint32_t view_list_entries, uint32_t * view_list) { int i; static gboolean init_phase = TRUE; if (quorate != crm_have_quorum) { crm_notice("Membership " U64T ": quorum %s (%lu)", ring_id, quorate ? "acquired" : "lost", (long unsigned int)view_list_entries); crm_have_quorum = quorate; } else { crm_info("Membership " U64T ": quorum %s (%lu)", ring_id, quorate ? "retained" : "still lost", (long unsigned int)view_list_entries); } if(view_list_entries == 0 && init_phase) { crm_info("Corosync membership is still forming, ignoring"); return; } init_phase = FALSE; g_hash_table_foreach(crm_peer_cache, corosync_mark_node_unseen, NULL); for (i = 0; i < view_list_entries; i++) { uint32_t id = view_list[i]; char *name = NULL; crm_node_t *node = NULL; crm_debug("Member[%d] %d ", i, id); node = crm_get_peer(id, NULL); if(node->uname == NULL) { crm_info("Obtaining name for new node %u", id); name = corosync_node_name(0, id); node = crm_get_peer(id, name); } crm_update_peer_state(__FUNCTION__, node, CRM_NODE_MEMBER, ring_id); free(name); } crm_trace("Reaping unseen nodes..."); g_hash_table_foreach(crm_peer_cache, corosync_mark_unseen_peer_dead, &ring_id); if (quorum_app_callback) { quorum_app_callback(ring_id, quorate); } } quorum_callbacks_t quorum_callbacks = { .quorum_notify_fn = pcmk_quorum_notification, }; gboolean init_quorum_connection(gboolean(*dispatch) (unsigned long long, gboolean), void (*destroy) (gpointer)) { int rc = -1; int fd = 0; int quorate = 0; uint32_t quorum_type = 0; struct mainloop_fd_callbacks quorum_fd_callbacks; quorum_fd_callbacks.dispatch = pcmk_quorum_dispatch; quorum_fd_callbacks.destroy = destroy; crm_debug("Configuring Pacemaker to obtain quorum from Corosync"); rc = quorum_initialize(&pcmk_quorum_handle, &quorum_callbacks, &quorum_type); if (rc != CS_OK) { crm_err("Could not connect to the Quorum API: %d\n", rc); goto bail; } else if (quorum_type != QUORUM_SET) { crm_err("Corosync quorum is not configured\n"); goto bail; } rc = quorum_getquorate(pcmk_quorum_handle, &quorate); if (rc != CS_OK) { crm_err("Could not obtain the current Quorum API state: %d\n", rc); goto bail; } crm_notice("Quorum %s", quorate ? "acquired" : "lost"); quorum_app_callback = dispatch; crm_have_quorum = quorate; rc = quorum_trackstart(pcmk_quorum_handle, CS_TRACK_CHANGES | CS_TRACK_CURRENT); if (rc != CS_OK) { crm_err("Could not setup Quorum API notifications: %d\n", rc); goto bail; } rc = quorum_fd_get(pcmk_quorum_handle, &fd); if (rc != CS_OK) { crm_err("Could not obtain the Quorum API connection: %d\n", rc); goto bail; } mainloop_add_fd("quorum", G_PRIORITY_HIGH, fd, dispatch, &quorum_fd_callbacks); corosync_initialize_nodelist(NULL, FALSE, NULL); bail: if (rc != CS_OK) { quorum_finalize(pcmk_quorum_handle); return FALSE; } return TRUE; } gboolean init_cs_connection(crm_cluster_t *cluster) { int retries = 0; while (retries < 5) { int rc = init_cs_connection_once(cluster); retries++; switch (rc) { case CS_OK: return TRUE; break; case CS_ERR_TRY_AGAIN: case CS_ERR_QUEUE_FULL: sleep(retries); break; default: return FALSE; } } crm_err("Could not connect to corosync after %d retries", retries); return FALSE; } gboolean init_cs_connection_once(crm_cluster_t *cluster) { enum cluster_type_e stack = get_cluster_type(); crm_peer_init(); /* Here we just initialize comms */ if(stack != pcmk_cluster_corosync) { crm_err("Invalid cluster type: %s (%d)", name_for_cluster_type(stack), stack); return FALSE; } if (init_cpg_connection(cluster->cs_dispatch, cluster->destroy, &pcmk_nodeid) == FALSE) { return FALSE; } pcmk_uname = get_local_node_name(); crm_info("Connection to '%s': established", name_for_cluster_type(stack)); CRM_ASSERT(pcmk_uname != NULL); pcmk_uname_len = strlen(pcmk_uname); if (pcmk_nodeid != 0) { /* Ensure the local node always exists */ crm_get_peer(pcmk_nodeid, pcmk_uname); } cluster->uuid = get_corosync_uuid(pcmk_nodeid, pcmk_uname); cluster->uname = strdup(pcmk_uname); cluster->nodeid = pcmk_nodeid; return TRUE; } gboolean check_message_sanity(const AIS_Message * msg, const char *data) { gboolean sane = TRUE; int dest = msg->host.type; int tmp_size = msg->header.size - sizeof(AIS_Message); if (sane && msg->header.size == 0) { crm_warn("Message with no size"); sane = FALSE; } if (sane && msg->header.error != CS_OK) { crm_warn("Message header contains an error: %d", msg->header.error); sane = FALSE; } if (sane && ais_data_len(msg) != tmp_size) { crm_warn("Message payload size is incorrect: expected %d, got %d", ais_data_len(msg), tmp_size); sane = TRUE; } if (sane && ais_data_len(msg) == 0) { crm_warn("Message with no payload"); sane = FALSE; } if (sane && data && msg->is_compressed == FALSE) { int str_size = strlen(data) + 1; if (ais_data_len(msg) != str_size) { int lpc = 0; crm_warn("Message payload is corrupted: expected %d bytes, got %d", ais_data_len(msg), str_size); sane = FALSE; for (lpc = (str_size - 10); lpc < msg->size; lpc++) { if (lpc < 0) { lpc = 0; } crm_debug("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]); } } } if (sane == FALSE) { crm_err("Invalid message %d: (dest=%s:%s, from=%s:%s.%u, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } else { crm_trace ("Verfied message %d: (dest=%s:%s, from=%s:%s.%u, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } return sane; } enum cluster_type_e find_corosync_variant(void) { int rc = CS_OK; cmap_handle_t handle; /* There can be only one (possibility if confdb isn't around) */ rc = cmap_initialize(&handle); if (rc != CS_OK) { crm_info("Failed to initialize the cmap API. Error %d", rc); return pcmk_cluster_unknown; } cmap_finalize(handle); return pcmk_cluster_corosync; } gboolean crm_is_corosync_peer_active(const crm_node_t * node) { if (node == NULL) { crm_trace("NULL"); return FALSE; } else if(safe_str_neq(node->state, CRM_NODE_MEMBER)) { crm_trace("%s: state=%s", node->uname, node->state); return FALSE; } else if((node->processes & crm_proc_cpg) == 0) { crm_trace("%s: processes=%.16x", node->uname, node->processes); return FALSE; } return TRUE; } gboolean corosync_initialize_nodelist(void *cluster, gboolean force_member, xmlNode *xml_parent) { int lpc = 0; int rc = CS_OK; int retries = 0; gboolean any = FALSE; cmap_handle_t cmap_handle; do { rc = cmap_initialize(&cmap_handle); if(rc != CS_OK) { retries++; crm_debug("API connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries); sleep(retries); } } while(retries < 5 && rc != CS_OK); if (rc != CS_OK) { crm_warn("Could not connect to Cluster Configuration Database API, error %d", rc); return FALSE; } crm_peer_init(); crm_trace("Initializing corosync nodelist"); for(lpc = 0; ; lpc++) { uint32_t nodeid = 0; char *name = NULL; char *key = NULL; key = g_strdup_printf("nodelist.node.%d.nodeid", lpc); rc = cmap_get_uint32(cmap_handle, key, &nodeid); g_free(key); if(rc != CS_OK) { break; } name = corosync_node_name(cmap_handle, nodeid); if (name != NULL) { crm_node_t *node = g_hash_table_lookup(crm_peer_cache, name); if(node && node->id != nodeid) { crm_crit("Nodes %u and %u share the same name '%s': shutting down", node->id, nodeid, name); crm_exit(100); } } if(nodeid > 0 || name != NULL) { crm_trace("Initializing node[%d] %u = %s", lpc, nodeid, name); crm_get_peer(nodeid, name); } if(nodeid > 0 && name != NULL) { any = TRUE; if(xml_parent) { xmlNode *node = create_xml_node(xml_parent, XML_CIB_TAG_NODE); crm_xml_add_int(node, XML_ATTR_ID, nodeid); crm_xml_add(node, XML_ATTR_UNAME, name); if(force_member) { crm_xml_add(node, XML_ATTR_TYPE, CRM_NODE_MEMBER); } } } free(name); } cmap_finalize(cmap_handle); return any; } diff --git a/lib/common/ipc.c b/lib/common/ipc.c index 995b9db88c..6c047a546f 100644 --- a/lib/common/ipc.c +++ b/lib/common/ipc.c @@ -1,680 +1,680 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include struct crm_ipc_request_header { struct qb_ipc_request_header qb; uint32_t flags; }; static char * generateReference(const char *custom1, const char *custom2) { static uint ref_counter = 0; const char *local_cust1 = custom1; const char *local_cust2 = custom2; int reference_len = 4; char *since_epoch = NULL; reference_len += 20; /* too big */ reference_len += 40; /* too big */ if (local_cust1 == NULL) { local_cust1 = "_empty_"; } reference_len += strlen(local_cust1); if (local_cust2 == NULL) { local_cust2 = "_empty_"; } reference_len += strlen(local_cust2); since_epoch = calloc(1, reference_len); if (since_epoch != NULL) { sprintf(since_epoch, "%s-%s-%ld-%u", local_cust1, local_cust2, (unsigned long)time(NULL), ref_counter++); } return since_epoch; } xmlNode * create_request_adv(const char *task, xmlNode * msg_data, const char *host_to, const char *sys_to, const char *sys_from, const char *uuid_from, const char *origin) { char *true_from = NULL; xmlNode *request = NULL; char *reference = generateReference(task, sys_from); if (uuid_from != NULL) { true_from = generate_hash_key(sys_from, uuid_from); } else if (sys_from != NULL) { true_from = strdup(sys_from); } else { crm_err("No sys from specified"); } /* host_from will get set for us if necessary by CRMd when routed */ request = create_xml_node(NULL, __FUNCTION__); crm_xml_add(request, F_CRM_ORIGIN, origin); crm_xml_add(request, F_TYPE, T_CRM); crm_xml_add(request, F_CRM_VERSION, CRM_FEATURE_SET); crm_xml_add(request, F_CRM_MSG_TYPE, XML_ATTR_REQUEST); crm_xml_add(request, F_CRM_REFERENCE, reference); crm_xml_add(request, F_CRM_TASK, task); crm_xml_add(request, F_CRM_SYS_TO, sys_to); crm_xml_add(request, F_CRM_SYS_FROM, true_from); /* HOSTTO will be ignored if it is to the DC anyway. */ if (host_to != NULL && strlen(host_to) > 0) { crm_xml_add(request, F_CRM_HOST_TO, host_to); } if (msg_data != NULL) { add_message_xml(request, F_CRM_DATA, msg_data); } free(reference); free(true_from); return request; } /* * This method adds a copy of xml_response_data */ xmlNode * create_reply_adv(xmlNode * original_request, xmlNode * xml_response_data, const char *origin) { xmlNode *reply = NULL; const char *host_from = crm_element_value(original_request, F_CRM_HOST_FROM); const char *sys_from = crm_element_value(original_request, F_CRM_SYS_FROM); const char *sys_to = crm_element_value(original_request, F_CRM_SYS_TO); const char *type = crm_element_value(original_request, F_CRM_MSG_TYPE); const char *operation = crm_element_value(original_request, F_CRM_TASK); const char *crm_msg_reference = crm_element_value(original_request, F_CRM_REFERENCE); if (type == NULL) { crm_err("Cannot create new_message," " no message type in original message"); CRM_ASSERT(type != NULL); return NULL; #if 0 } else if (strcasecmp(XML_ATTR_REQUEST, type) != 0) { crm_err("Cannot create new_message," " original message was not a request"); return NULL; #endif } reply = create_xml_node(NULL, __FUNCTION__); crm_xml_add(reply, F_CRM_ORIGIN, origin); crm_xml_add(reply, F_TYPE, T_CRM); crm_xml_add(reply, F_CRM_VERSION, CRM_FEATURE_SET); crm_xml_add(reply, F_CRM_MSG_TYPE, XML_ATTR_RESPONSE); crm_xml_add(reply, F_CRM_REFERENCE, crm_msg_reference); crm_xml_add(reply, F_CRM_TASK, operation); /* since this is a reply, we reverse the from and to */ crm_xml_add(reply, F_CRM_SYS_TO, sys_from); crm_xml_add(reply, F_CRM_SYS_FROM, sys_to); /* HOSTTO will be ignored if it is to the DC anyway. */ if (host_from != NULL && strlen(host_from) > 0) { crm_xml_add(reply, F_CRM_HOST_TO, host_from); } if (xml_response_data != NULL) { add_message_xml(reply, F_CRM_DATA, xml_response_data); } return reply; } /* Libqb based IPC */ /* Server... */ int crm_ipcs_client_pid(qb_ipcs_connection_t *c) { struct qb_ipcs_connection_stats stats; stats.client_pid = 0; qb_ipcs_connection_stats_get(c, &stats, 0); return stats.client_pid; } xmlNode * crm_ipcs_recv(qb_ipcs_connection_t *c, void *data, size_t size, uint32_t *id, uint32_t *flags) { char *text = ((char*)data) + sizeof(struct crm_ipc_request_header); crm_trace("Received %.200s", text); if(id) { *id = ((struct qb_ipc_request_header*)data)->id; } if(flags) { *flags = ((struct crm_ipc_request_header*)data)->flags; } return string2xml(text); } ssize_t crm_ipcs_send(qb_ipcs_connection_t *c, uint32_t request, xmlNode *message, enum crm_ipc_server_flags flags) { int rc; int lpc = 0; int retries = 40; int level = LOG_CRIT; struct iovec iov[2]; static uint32_t id = 1; const char *type = "Response"; struct qb_ipc_response_header header; char *buffer = dump_xml_unformatted(message); struct timespec delay = { 0, 250000000 }; /* 250ms */ memset(&iov, 0, 2 * sizeof(struct iovec)); iov[0].iov_len = sizeof(struct qb_ipc_response_header); iov[0].iov_base = &header; iov[1].iov_len = 1 + strlen(buffer); iov[1].iov_base = buffer; if(flags & crm_ipc_server_event) { header.id = id++; /* We don't really use it, but doesn't hurt to set one */ } else { CRM_LOG_ASSERT (request != 0); header.id = request; /* Replying to a specific request */ } header.error = 0; /* unused */ header.size = iov[0].iov_len + iov[1].iov_len; if(flags & crm_ipc_server_error) { retries = 20; level = LOG_ERR; } else if(flags & crm_ipc_server_info) { retries = 10; level = LOG_INFO; } while(lpc < retries) { if(flags & crm_ipc_server_event) { type = "Event"; rc = qb_ipcs_event_sendv(c, iov, 2); if(rc == -EPIPE || rc == -ENOTCONN) { crm_trace("Client %p disconnected", c); level = LOG_INFO; } } else { rc = qb_ipcs_response_sendv(c, iov, 2); } if(rc != -EAGAIN) { break; } lpc++; crm_debug("Attempting resend %d of %s %d (%d bytes) to %p[%d]: %.120s", lpc, type, header.id, header.size, c, crm_ipcs_client_pid(c), buffer); nanosleep(&delay, NULL); } if(rc < header.size) { struct qb_ipcs_connection_stats_2 *stats = qb_ipcs_connection_stats_get_2(c, 0); do_crm_log(level, "%s %d failed, size=%d, to=%p[%d], queue=%d, rc=%d: %.120s", type, header.id, header.size, c, stats->client_pid, stats->event_q_length, rc, buffer); free(stats); } else { crm_trace("%s %d sent, %d bytes to %p[%d]: %.120s", type, header.id, rc, c, crm_ipcs_client_pid(c), buffer); } free(buffer); return rc; } void crm_ipcs_send_ack( qb_ipcs_connection_t *c, uint32_t request, const char *tag, const char *function, int line) { xmlNode *ack = create_xml_node(NULL, tag); crm_xml_add(ack, "function", function); crm_xml_add_int(ack, "line", line); crm_ipcs_send(c, request, ack, FALSE); free_xml(ack); } /* Client... */ #define MIN_MSG_SIZE 12336 /* sizeof(struct qb_ipc_connection_response) */ #define MAX_MSG_SIZE 20*1024 struct crm_ipc_s { struct pollfd pfd; int buf_size; int msg_size; int need_reply; char *buffer; char *name; qb_ipcc_connection_t *ipc; }; static int pick_ipc_buffer(int max) { const char *env = getenv("PCMK_ipc_buffer"); if(env) { max = crm_parse_int(env, "0"); } if(max <= 0) { max = MAX_MSG_SIZE; } if(max < MIN_MSG_SIZE) { max = MIN_MSG_SIZE; } crm_trace("Using max message size of %d", max); return max; } crm_ipc_t * crm_ipc_new(const char *name, size_t max_size) { crm_ipc_t *client = NULL; client = calloc(1, sizeof(crm_ipc_t)); client->name = strdup(name); client->buf_size = pick_ipc_buffer(max_size); client->buffer = malloc(client->buf_size); client->pfd.fd = -1; client->pfd.events = POLLIN; client->pfd.revents = 0; return client; } bool crm_ipc_connect(crm_ipc_t *client) { client->need_reply = FALSE; client->ipc = qb_ipcc_connect(client->name, client->buf_size); if (client->ipc == NULL) { crm_perror(LOG_INFO, "Could not establish %s connection", client->name); return FALSE; } client->pfd.fd = crm_ipc_get_fd(client); if(client->pfd.fd < 0) { crm_perror(LOG_INFO, "Could not obtain file descriptor for %s connection", client->name); return FALSE; } qb_ipcc_context_set(client->ipc, client); return TRUE; } void crm_ipc_close(crm_ipc_t *client) { if(client) { crm_trace("Disconnecting %s IPC connection %p (%p.%p)", client->name, client, client->ipc); if(client->ipc) { qb_ipcc_connection_t *ipc = client->ipc; client->ipc = NULL; qb_ipcc_disconnect(ipc); } } } void crm_ipc_destroy(crm_ipc_t *client) { if(client) { if(client->ipc && qb_ipcc_is_connected(client->ipc)) { crm_notice("Destroying an active IPC connection to %s", client->name); /* The next line is basically unsafe * * If this connection was attached to mainloop and mainloop is active, * the 'disconnected' callback will end up back here and we'll end * up free'ing the memory twice - something that can still happen * even without this if we destroy a connection and it closes before * we call exit */ /* crm_ipc_close(client); */ } crm_trace("Destroying IPC connection to %s: %p", client->name, client); free(client->buffer); free(client->name); free(client); } } int crm_ipc_get_fd(crm_ipc_t *client) { int fd = 0; CRM_ASSERT(client != NULL); if(client->ipc && qb_ipcc_fd_get(client->ipc, &fd) == 0) { return fd; } crm_perror(LOG_ERR, "Could not obtain file IPC descriptor for %s", client->name); return -EINVAL; } bool crm_ipc_connected(crm_ipc_t *client) { bool rc = FALSE; if(client == NULL) { crm_trace("No client"); return FALSE; } else if(client->ipc == NULL) { crm_trace("No connection"); return FALSE; } else if(client->pfd.fd < 0) { crm_trace("Bad descriptor"); return FALSE; } rc = qb_ipcc_is_connected(client->ipc); if(rc == FALSE) { client->pfd.fd = -EINVAL; } return rc; } int crm_ipc_ready(crm_ipc_t *client) { CRM_ASSERT(client != NULL); if(crm_ipc_connected(client) == FALSE) { return -ENOTCONN; } client->pfd.revents = 0; return poll(&(client->pfd), 1, 0); } long crm_ipc_read(crm_ipc_t *client) { CRM_ASSERT(client != NULL); CRM_ASSERT(client->ipc != NULL); CRM_ASSERT(client->buffer != NULL); client->buffer[0] = 0; client->msg_size = qb_ipcc_event_recv(client->ipc, client->buffer, client->buf_size-1, 0); if(client->msg_size >= 0) { struct qb_ipc_response_header *header = (struct qb_ipc_response_header *)client->buffer; client->buffer[client->msg_size] = 0; - crm_trace("Recieved %s event %d, size=%d, rc=%d, text: %.200s", + crm_trace("Recieved %s event %d, size=%d, rc=%d, text: %.100s", client->name, header->id, header->size, client->msg_size, client->buffer+sizeof(struct qb_ipc_response_header)); } else { crm_trace("No message from %s recieved: %s", client->name, pcmk_strerror(client->msg_size)); } if(crm_ipc_connected(client) == FALSE || client->msg_size == -ENOTCONN) { crm_err("Connection to %s failed", client->name); } return client->msg_size; } const char * crm_ipc_buffer(crm_ipc_t *client) { CRM_ASSERT(client != NULL); return client->buffer + sizeof(struct qb_ipc_response_header); } const char *crm_ipc_name(crm_ipc_t *client) { CRM_ASSERT(client != NULL); return client->name; } static int internal_ipc_send_recv(crm_ipc_t *client, const void *iov) { int rc = 0; do { rc = qb_ipcc_sendv_recv(client->ipc, iov, 2, client->buffer, client->buf_size, -1); } while(rc == -EAGAIN && crm_ipc_connected(client)); return rc; } static int internal_ipc_send_request(crm_ipc_t *client, const void *iov, int ms_timeout) { int rc = 0; time_t timeout = time(NULL) + 1 + (ms_timeout / 1000); do { rc = qb_ipcc_sendv(client->ipc, iov, 2); } while(rc == -EAGAIN && time(NULL) < timeout && crm_ipc_connected(client)); return rc; } static int internal_ipc_get_reply(crm_ipc_t *client, int request_id, int ms_timeout) { time_t timeout = time(NULL) + 1 + (ms_timeout / 1000); int rc = 0; /* get the reply */ crm_trace("client %s waiting on reply to msg id %d", client->name, request_id); do { rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, 1000); if(rc > 0) { struct qb_ipc_response_header *hdr = (struct qb_ipc_response_header *)client->buffer; if(hdr->id == request_id) { /* Got it */ break; } else if(hdr->id < request_id){ xmlNode *bad = string2xml(crm_ipc_buffer(client)); crm_err("Discarding old reply %d (need %d)", hdr->id, request_id); crm_log_xml_notice(bad, "OldIpcReply"); } else { xmlNode *bad = string2xml(crm_ipc_buffer(client)); crm_err("Discarding newer reply %d (need %d)", hdr->id, request_id); crm_log_xml_notice(bad, "ImpossibleReply"); CRM_ASSERT(hdr->id <= request_id); } } else if (crm_ipc_connected(client) == FALSE) { crm_err("Server disconnected client %s while waiting for msg id %d", client->name, request_id); break; } } while(time(NULL) < timeout); return rc; } int crm_ipc_send(crm_ipc_t *client, xmlNode *message, enum crm_ipc_flags flags, int32_t ms_timeout, xmlNode **reply) { long rc = 0; struct iovec iov[2]; static uint32_t id = 0; struct crm_ipc_request_header header; char *buffer = NULL; if(crm_ipc_connected(client) == FALSE) { /* Don't even bother */ crm_notice("Connection to %s closed", client->name); return -ENOTCONN; } if(client->need_reply) { crm_trace("Trying again to obtain pending reply from %s", client->name); rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, 300); if(rc < 0) { crm_warn("Sending to %s is disabled until pending reply is recieved", client->name); free(buffer); return -EREMOTEIO; } else { crm_notice("Lost reply from %s finally arrived, sending re-enabled", client->name); client->need_reply = FALSE; } } buffer = dump_xml_unformatted(message); iov[0].iov_len = sizeof(struct crm_ipc_request_header); iov[0].iov_base = &header; iov[1].iov_len = 1 + strlen(buffer); iov[1].iov_base = buffer; header.qb.id = ++id; header.qb.size = iov[0].iov_len + iov[1].iov_len; header.flags = flags; if(ms_timeout == 0) { ms_timeout = 5000; } crm_trace("Sending from client: %s request id: %d bytes: %u timeout:%d msg: %.200s...", client->name, header.qb.id, header.qb.size, ms_timeout, buffer); if(ms_timeout > 0) { rc = internal_ipc_send_request(client, iov, ms_timeout); if (rc <= 0) { crm_trace("Failed to send from client %s request %d with %u bytes: %.200s...", client->name, header.qb.id, header.qb.size, buffer); goto send_cleanup; } else if(is_not_set(flags, crm_ipc_client_response)) { crm_trace("Message sent, not waiting for reply to %d from %s to %u bytes: %.200s...", header.qb.id, client->name, header.qb.size, buffer); goto send_cleanup; } rc = internal_ipc_get_reply(client, header.qb.id, ms_timeout); if(rc < 0) { /* No reply, for now, disable sending * * The alternative is to close the connection since we don't know * how to detect and discard out-of-sequence replies * * TODO - implement the above */ client->need_reply = TRUE; } } else { rc = internal_ipc_send_recv(client, iov); } if(rc > 0) { struct qb_ipc_response_header *hdr = (struct qb_ipc_response_header *)client->buffer; crm_trace("Recieved response %d, size=%d, rc=%ld, text: %.200s", hdr->id, hdr->size, rc, crm_ipc_buffer(client)); if(reply) { *reply = string2xml(crm_ipc_buffer(client)); } } else { crm_trace("Response not recieved: rc=%ld, errno=%d", rc, errno); } send_cleanup: if(crm_ipc_connected(client) == FALSE) { crm_notice("Connection to %s closed: %s (%ld)", client->name, pcmk_strerror(rc), rc); } else if(rc <= 0) { crm_warn("Request %d to %s failed: %s (%ld)", header.qb.id, client->name, pcmk_strerror(rc), rc); crm_info("Request was %.120s", buffer); } free(buffer); return rc; } /* Utils */ xmlNode * create_hello_message(const char *uuid, const char *client_name, const char *major_version, const char *minor_version) { xmlNode *hello_node = NULL; xmlNode *hello = NULL; if (uuid == NULL || strlen(uuid) == 0 || client_name == NULL || strlen(client_name) == 0 || major_version == NULL || strlen(major_version) == 0 || minor_version == NULL || strlen(minor_version) == 0) { crm_err("Missing fields, Hello message will not be valid."); return NULL; } hello_node = create_xml_node(NULL, XML_TAG_OPTIONS); crm_xml_add(hello_node, "major_version", major_version); crm_xml_add(hello_node, "minor_version", minor_version); crm_xml_add(hello_node, "client_name", client_name); crm_xml_add(hello_node, "client_uuid", uuid); crm_trace("creating hello message"); hello = create_request(CRM_OP_HELLO, hello_node, NULL, NULL, client_name, uuid); free_xml(hello_node); return hello; }