diff --git a/lib/common/ais.c b/lib/common/ais.c index d5a9148553..a3c0cc9132 100644 --- a/lib/common/ais.c +++ b/lib/common/ais.c @@ -1,1360 +1,1360 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include "stack.h" #ifdef SUPPORT_COROSYNC # include # include #endif #ifdef SUPPORT_CMAN # include cman_handle_t pcmk_cman_handle = NULL; #endif #ifdef SUPPORT_CS_QUORUM # include # include # include # include # include quorum_handle_t pcmk_quorum_handle = 0; cpg_handle_t pcmk_cpg_handle = 0; struct cpg_name pcmk_cpg_group = { .length = 0, .value[0] = 0, }; #endif static char *pcmk_uname = NULL; static int pcmk_uname_len = 0; static uint32_t pcmk_nodeid = 0; #define cs_repeat(counter, max, code) do { \ code; \ if(rc == CS_ERR_TRY_AGAIN) { \ counter++; \ crm_debug("Retrying operation after %ds", counter); \ sleep(counter); \ } \ } while(rc == CS_ERR_TRY_AGAIN && counter < max) enum crm_ais_msg_types text2msg_type(const char *text) { int type = crm_msg_none; CRM_CHECK(text != NULL, return type); if(safe_str_eq(text, "ais")) { type = crm_msg_ais; } else if(safe_str_eq(text, "crm_plugin")) { type = crm_msg_ais; } else if(safe_str_eq(text, CRM_SYSTEM_CIB)) { type = crm_msg_cib; } else if(safe_str_eq(text, CRM_SYSTEM_CRMD)) { type = crm_msg_crmd; } else if(safe_str_eq(text, CRM_SYSTEM_DC)) { type = crm_msg_crmd; } else if(safe_str_eq(text, CRM_SYSTEM_TENGINE)) { type = crm_msg_te; } else if(safe_str_eq(text, CRM_SYSTEM_PENGINE)) { type = crm_msg_pe; } else if(safe_str_eq(text, CRM_SYSTEM_LRMD)) { type = crm_msg_lrmd; } else if(safe_str_eq(text, CRM_SYSTEM_STONITHD)) { type = crm_msg_stonithd; } else if(safe_str_eq(text, "stonith-ng")) { type = crm_msg_stonith_ng; } else if(safe_str_eq(text, "attrd")) { type = crm_msg_attrd; } else { /* This will normally be a transient client rather than * a cluster daemon. Set the type to the pid of the client */ int scan_rc = sscanf(text, "%d", &type); if(scan_rc != 1) { /* Ensure its sane */ type = crm_msg_none; } } return type; } char *get_ais_data(const AIS_Message *msg) { int rc = BZ_OK; char *uncompressed = NULL; unsigned int new_size = msg->size + 1; if(msg->is_compressed == FALSE) { crm_debug_2("Returning uncompressed message data"); uncompressed = strdup(msg->data); } else { crm_debug_2("Decompressing message data"); crm_malloc0(uncompressed, new_size); rc = BZ2_bzBuffToBuffDecompress( uncompressed, &new_size, (char*)msg->data, msg->compressed_size, 1, 0); CRM_ASSERT(rc == BZ_OK); CRM_ASSERT(new_size == msg->size); } return uncompressed; } #if SUPPORT_COROSYNC int ais_fd_sync = -1; int ais_fd_async = -1; /* never send messages via this channel */ void *ais_ipc_ctx = NULL; hdb_handle_t ais_ipc_handle = 0; GFDSource *ais_source = NULL; GFDSource *ais_source_sync = NULL; GFDSource *cman_source = NULL; GFDSource *cpg_source = NULL; GFDSource *quorumd_source = NULL; static char *ais_cluster_name = NULL; gboolean get_ais_nodeid(uint32_t *id, char **uname) { struct iovec iov; int retries = 0; int rc = CS_OK; coroipc_response_header_t header; struct crm_ais_nodeid_resp_s answer; header.error = CS_OK; header.id = crm_class_nodeid; header.size = sizeof(coroipc_response_header_t); CRM_CHECK(id != NULL, return FALSE); CRM_CHECK(uname != NULL, return FALSE); iov.iov_base = &header; iov.iov_len = header.size; retry: errno = 0; rc = coroipcc_msg_send_reply_receive( ais_ipc_handle, &iov, 1, &answer, sizeof (answer)); if(rc == CS_OK) { CRM_CHECK(answer.header.size == sizeof (struct crm_ais_nodeid_resp_s), crm_err("Odd message: id=%d, size=%d, error=%d", answer.header.id, answer.header.size, answer.header.error)); CRM_CHECK(answer.header.id == crm_class_nodeid, crm_err("Bad response id: %d", answer.header.id)); } if(rc == CS_ERR_TRY_AGAIN && retries < 20) { retries++; crm_info("Peer overloaded: Re-sending message (Attempt %d of 20)", retries); sleep(retries); /* Proportional back off */ goto retry; } if(rc != CS_OK) { crm_err("Sending nodeid request: FAILED (rc=%d): %s", rc, ais_error2text(rc)); return FALSE; } else if(answer.header.error != CS_OK) { crm_err("Bad response from peer: (rc=%d): %s", rc, ais_error2text(rc)); return FALSE; } crm_info("Server details: id=%u uname=%s cname=%s", answer.id, answer.uname, answer.cname); *id = answer.id; *uname = crm_strdup(answer.uname); ais_cluster_name = crm_strdup(answer.cname); return TRUE; } gboolean crm_get_cluster_name(char **cname) { CRM_CHECK(cname != NULL, return FALSE); if(ais_cluster_name) { *cname = crm_strdup(ais_cluster_name); return TRUE; } return FALSE; } gboolean send_ais_text(int class, const char *data, gboolean local, const char *node, enum crm_ais_msg_types dest) { static int msg_id = 0; static int local_pid = 0; enum cluster_type_e cluster_type = get_cluster_type(); int retries = 0; int rc = CS_OK; int buf_len = sizeof(coroipc_response_header_t); char *buf = NULL; struct iovec iov; const char *transport = "pcmk"; coroipc_response_header_t *header = NULL; AIS_Message *ais_msg = NULL; enum crm_ais_msg_types sender = text2msg_type(crm_system_name); /* There are only 6 handlers registered to crm_lib_service in plugin.c */ CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class); return FALSE); if(data == NULL) { data = ""; } if(local_pid == 0) { local_pid = getpid(); } if(sender == crm_msg_none) { sender = local_pid; } crm_malloc0(ais_msg, sizeof(AIS_Message)); ais_msg->id = msg_id++; ais_msg->header.id = class; ais_msg->header.error = CS_OK; ais_msg->host.type = dest; ais_msg->host.local = local; if(node) { ais_msg->host.size = strlen(node); memset(ais_msg->host.uname, 0, MAX_NAME); memcpy(ais_msg->host.uname, node, ais_msg->host.size); ais_msg->host.id = 0; } else { ais_msg->host.size = 0; memset(ais_msg->host.uname, 0, MAX_NAME); ais_msg->host.id = 0; } ais_msg->sender.id = 0; ais_msg->sender.type = sender; ais_msg->sender.pid = local_pid; ais_msg->sender.size = pcmk_uname_len; memset(ais_msg->sender.uname, 0, MAX_NAME); memcpy(ais_msg->sender.uname, pcmk_uname, ais_msg->sender.size); ais_msg->size = 1 + strlen(data); if(ais_msg->size < CRM_BZ2_THRESHOLD) { failback: crm_realloc(ais_msg, sizeof(AIS_Message) + ais_msg->size); memcpy(ais_msg->data, data, ais_msg->size); } else { char *compressed = NULL; char *uncompressed = crm_strdup(data); unsigned int len = (ais_msg->size * 1.1) + 600; /* recomended size */ crm_debug_5("Compressing message payload"); crm_malloc(compressed, len); rc = BZ2_bzBuffToBuffCompress( compressed, &len, uncompressed, ais_msg->size, CRM_BZ2_BLOCKS, 0, CRM_BZ2_WORK); crm_free(uncompressed); if(rc != BZ_OK) { crm_err("Compression failed: %d", rc); crm_free(compressed); goto failback; } crm_realloc(ais_msg, sizeof(AIS_Message) + len + 1); memcpy(ais_msg->data, compressed, len); ais_msg->data[len] = 0; crm_free(compressed); ais_msg->is_compressed = TRUE; ais_msg->compressed_size = len; crm_debug_2("Compression details: %d -> %d", ais_msg->size, ais_data_len(ais_msg)); } ais_msg->header.size = sizeof(AIS_Message) + ais_data_len(ais_msg); crm_debug_3("Sending%s message %d to %s.%s (data=%d, total=%d)", ais_msg->is_compressed?" compressed":"", ais_msg->id, ais_dest(&(ais_msg->host)), msg_type2text(dest), ais_data_len(ais_msg), ais_msg->header.size); iov.iov_base = ais_msg; iov.iov_len = ais_msg->header.size; crm_realloc(buf, buf_len); do { if(rc == CS_ERR_TRY_AGAIN) { retries++; crm_info("Peer overloaded or membership in flux:" " Re-sending message (Attempt %d of 20)", retries); sleep(retries); /* Proportional back off */ } errno = 0; switch(cluster_type) { case pcmk_cluster_classic_ais: rc = coroipcc_msg_send_reply_receive(ais_ipc_handle, &iov, 1, buf, buf_len); header = (coroipc_response_header_t *)buf; if(rc == CS_OK) { CRM_CHECK(header->size == sizeof (coroipc_response_header_t), crm_err("Odd message: id=%d, size=%d, class=%d, error=%d", header->id, header->size, class, header->error)); CRM_ASSERT(buf_len >= header->size); CRM_CHECK(header->id == CRM_MESSAGE_IPC_ACK, crm_err("Bad response id (%d) for request (%d)", header->id, ais_msg->header.id)); CRM_CHECK(header->error == CS_OK, rc = header->error); } break; case pcmk_cluster_corosync: case pcmk_cluster_cman: transport = "cpg"; CRM_CHECK(dest != crm_msg_ais, rc = CS_ERR_MESSAGE_ERROR; goto bail); rc = cpg_mcast_joined(pcmk_cpg_handle, CPG_TYPE_AGREED, &iov, 1); if(rc == CS_ERR_TRY_AGAIN) { cpg_flow_control_state_t fc_state = CPG_FLOW_CONTROL_DISABLED; int rc2 = cpg_flow_control_state_get (pcmk_cpg_handle, &fc_state); if (rc2 == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) { crm_warn("Connection overloaded, cannot send messages"); goto bail; } else if (rc2 != CS_OK) { crm_warn("Could not determin the connection state: %s (%d)", ais_error2text(rc2), rc2); goto bail; } } break; case pcmk_cluster_unknown: case pcmk_cluster_invalid: case pcmk_cluster_heartbeat: CRM_ASSERT(is_openais_cluster()); break; } } while (rc == CS_ERR_TRY_AGAIN && retries < 20); bail: if(rc != CS_OK) { crm_perror(LOG_ERR,"Sending message %d via %s: FAILED (rc=%d): %s", ais_msg->id, transport, rc, ais_error2text(rc)); } else { crm_debug_4("Message %d: sent", ais_msg->id); } crm_free(buf); crm_free(ais_msg); return (rc == CS_OK); } gboolean send_ais_message(xmlNode *msg, gboolean local, const char *node, enum crm_ais_msg_types dest) { gboolean rc = TRUE; char *data = NULL; if(is_classic_ais_cluster()) { if(ais_fd_async < 0 || ais_source == NULL) { crm_err("Not connected to AIS: %d %p", ais_fd_async, ais_source); return FALSE; } } data = dump_xml_unformatted(msg); rc = send_ais_text(0, data, local, node, dest); crm_free(data); return rc; } void terminate_ais_connection(void) { crm_notice("Disconnecting from AIS"); /* G_main_del_fd(ais_source); */ /* G_main_del_fd(ais_source_sync); */ if(is_classic_ais_cluster() == FALSE) { coroipcc_service_disconnect(ais_ipc_handle); } else { cpg_leave(pcmk_cpg_handle, &pcmk_cpg_group); } if(is_corosync_cluster()) { quorum_finalize(pcmk_quorum_handle); } #ifdef SUPPORT_CMAN if(is_cman_cluster()) { cman_stop_notification(pcmk_cman_handle); cman_finish(pcmk_cman_handle); } #endif } int ais_membership_timer = 0; gboolean ais_membership_force = FALSE; static gboolean ais_dispatch_message( AIS_Message *msg, gboolean (*dispatch)(AIS_Message*,char*,int)) { char *data = NULL; char *uncompressed = NULL; xmlNode *xml = NULL; CRM_ASSERT(msg != NULL); crm_debug_3("Got new%s message (size=%d, %d, %d)", msg->is_compressed?" compressed":"", ais_data_len(msg), msg->size, msg->compressed_size); data = msg->data; if(msg->is_compressed && msg->size > 0) { int rc = BZ_OK; unsigned int new_size = msg->size + 1; if(check_message_sanity(msg, NULL) == FALSE) { goto badmsg; } crm_debug_5("Decompressing message data"); crm_malloc0(uncompressed, new_size); rc = BZ2_bzBuffToBuffDecompress( uncompressed, &new_size, data, msg->compressed_size, 1, 0); if(rc != BZ_OK) { crm_err("Decompression failed: %d", rc); goto badmsg; } CRM_ASSERT(rc == BZ_OK); CRM_ASSERT(new_size == msg->size); data = uncompressed; } else if(check_message_sanity(msg, data) == FALSE) { goto badmsg; } else if(safe_str_eq("identify", data)) { int pid = getpid(); char *pid_s = crm_itoa(pid); send_ais_text(0, pid_s, TRUE, NULL, crm_msg_ais); crm_free(pid_s); goto done; } if(msg->header.id != crm_class_members) { crm_update_peer(msg->sender.id, 0,0,0,0, msg->sender.uname, msg->sender.uname, NULL, NULL); } if(msg->header.id == crm_class_rmpeer) { uint32_t id = crm_int_helper(data, NULL); crm_info("Removing peer %s/%u", data, id); reap_crm_member(id); goto done; } else if(msg->header.id == crm_class_members || msg->header.id == crm_class_quorum) { xml = string2xml(data); if(xml == NULL) { crm_err("Invalid membership update: %s", data); goto badmsg; } if(is_classic_ais_cluster() == FALSE) { xmlNode *node = NULL; for(node = __xml_first_child(xml); node != NULL; node = __xml_next(node)) { crm_update_cman_node(node, crm_peer_seq); } } else { xmlNode *node = NULL; const char *value = NULL; gboolean quorate = FALSE; value = crm_element_value(xml, "quorate"); CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No quorum value:"); goto badmsg); if(crm_is_true(value)) { quorate = TRUE; } value = crm_element_value(xml, "id"); CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No membership id"); goto badmsg); crm_peer_seq = crm_int_helper(value, NULL); if(quorate != crm_have_quorum) { crm_notice("Membership %s: quorum %s", value, quorate?"acquired":"lost"); crm_have_quorum = quorate; } else { crm_info("Membership %s: quorum %s", value, quorate?"retained":"still lost"); } for(node = __xml_first_child(xml); node != NULL; node = __xml_next(node)) { crm_update_ais_node(node, crm_peer_seq); } } } if(dispatch != NULL) { dispatch(msg, data, 0); } done: crm_free(uncompressed); free_xml(xml); return TRUE; badmsg: crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):" " min=%d, total=%d, size=%d, bz2_size=%d", msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, (int)sizeof(AIS_Message), msg->header.size, msg->size, msg->compressed_size); goto done; } gboolean ais_dispatch(int sender, gpointer user_data) { int rc = CS_OK; char *buffer = NULL; gboolean good = TRUE; gboolean (*dispatch)(AIS_Message*,char*,int) = user_data; do { rc = coroipcc_dispatch_get (ais_ipc_handle, (void**)&buffer, 0); if (rc == 0 || buffer == NULL) { /* Zero is a legal "no message afterall" value */ return TRUE; } else if (rc != CS_OK) { crm_perror(LOG_ERR,"Receiving message body failed: (%d) %s", rc, ais_error2text(rc)); goto bail; } good = ais_dispatch_message((AIS_Message*)buffer, dispatch); coroipcc_dispatch_put (ais_ipc_handle); } while(good); return good; bail: crm_err("AIS connection failed"); return FALSE; } static void ais_destroy(gpointer user_data) { crm_err("AIS connection terminated"); ais_fd_sync = -1; exit(1); } static gboolean pcmk_proc_dispatch(IPC_Channel *ch, gpointer user_data) { xmlNode *msg = NULL; gboolean stay_connected = TRUE; while(IPC_ISRCONN(ch)) { if(ch->ops->is_message_pending(ch) == 0) { break; } msg = xmlfromIPC(ch, MAX_IPC_DELAY); if(msg) { xmlNode *node = NULL; for(node = __xml_first_child(msg); node != NULL; node = __xml_next(node)) { int id = 0; int children = 0; const char *uname = crm_element_value(node, "uname"); crm_element_value_int(node, "processes", &children); crm_update_peer(id, 0, 0, 0, children, NULL, uname, NULL, NULL); } free_xml(msg); } if(ch->ch_status != IPC_CONNECT) { break; } } if (ch->ch_status != IPC_CONNECT) { stay_connected = FALSE; } return stay_connected; } #ifdef SUPPORT_CMAN static gboolean pcmk_cman_dispatch(int sender, gpointer user_data) { int rc = cman_dispatch(pcmk_cman_handle, CMAN_DISPATCH_ALL); if(rc < 0) { crm_err("Connection to cman failed: %d", rc); return FALSE; } return TRUE; } #define MAX_NODES 256 static void cman_event_callback(cman_handle_t handle, void *privdata, int reason, int arg) { int rc = 0, lpc = 0, node_count = 0; cman_cluster_t cluster; static cman_node_t cman_nodes[MAX_NODES]; gboolean (*dispatch)(unsigned long long, gboolean) = privdata; switch (reason) { case CMAN_REASON_STATECHANGE: memset(&cluster, 0, sizeof(cluster)); rc = cman_get_cluster(pcmk_cman_handle, &cluster); if (rc < 0) { crm_err("Couldn't query cman cluster details: %d %d", rc, errno); return; } crm_peer_seq = cluster.ci_generation; if(arg != crm_have_quorum) { crm_notice("Membership %llu: quorum %s", crm_peer_seq, arg?"acquired":"lost"); crm_have_quorum = arg; } else { crm_info("Membership %llu: quorum %s", crm_peer_seq, arg?"retained":"still lost"); } rc = cman_get_nodes(pcmk_cman_handle, MAX_NODES, &node_count, cman_nodes); if (rc < 0) { crm_err("Couldn't query cman node list: %d %d", rc, errno); return; } for (lpc = 0; lpc < node_count; lpc++) { if (cman_nodes[lpc].cn_nodeid == 0) { /* Never allow node ID 0 to be considered a member #315711 */ cman_nodes[lpc].cn_member = 0; } crm_update_peer(cman_nodes[lpc].cn_nodeid, cman_nodes[lpc].cn_incarnation, cman_nodes[lpc].cn_member?crm_peer_seq:0, 0, 0, cman_nodes[lpc].cn_name, cman_nodes[lpc].cn_name, NULL, cman_nodes[lpc].cn_member?CRM_NODE_MEMBER:CRM_NODE_LOST); } if(dispatch) { dispatch(crm_peer_seq, crm_have_quorum); } break; case CMAN_REASON_TRY_SHUTDOWN: /* Always reply with a negative - pacemaker needs to be stopped first */ crm_info("CMAN wants to shut down: %s", arg?"forced":"optional"); cman_replyto_shutdown(pcmk_cman_handle, 0); break; case CMAN_REASON_CONFIG_UPDATE: /* Ignore */ break; } } #endif gboolean init_cman_connection( gboolean (*dispatch)(unsigned long long, gboolean), void (*destroy)(gpointer)) { #ifdef SUPPORT_CMAN int rc = -1, fd = -1; cman_cluster_t cluster; crm_info("Configuring Pacemaker to obtain quorum from cman"); memset(&cluster, 0, sizeof(cluster)); pcmk_cman_handle = cman_init(dispatch); if(pcmk_cman_handle == NULL || cman_is_active(pcmk_cman_handle) == FALSE) { crm_err("Couldn't connect to cman"); goto cman_bail; } rc = cman_get_cluster(pcmk_cman_handle, &cluster); if (rc < 0) { crm_err("Couldn't query cman cluster details: %d %d", rc, errno); goto cman_bail; } ais_cluster_name = crm_strdup(cluster.ci_name); rc = cman_start_notification(pcmk_cman_handle, cman_event_callback); if (rc < 0) { crm_err("Couldn't register for cman notifications: %d %d", rc, errno); goto cman_bail; } /* Get the current membership state */ cman_event_callback(pcmk_cman_handle, dispatch, CMAN_REASON_STATECHANGE, cman_is_quorate(pcmk_cman_handle)); fd = cman_get_fd(pcmk_cman_handle); crm_debug("Adding fd=%d to mainloop", fd); cman_source = G_main_add_fd( G_PRIORITY_HIGH, fd, FALSE, pcmk_cman_dispatch, dispatch, destroy); cman_bail: if (rc < 0) { cman_finish(pcmk_cman_handle); return FALSE; } #else crm_err("cman qorum is not supported in this build"); exit(100); #endif return TRUE; } #ifdef SUPPORT_CS_QUORUM gboolean (*pcmk_cpg_dispatch_fn)(AIS_Message*,char*,int) = NULL; static gboolean pcmk_cpg_dispatch(int sender, gpointer user_data) { int rc = 0; pcmk_cpg_dispatch_fn = user_data; rc = cpg_dispatch(pcmk_cpg_handle, CS_DISPATCH_ALL); if(rc != CS_OK) { crm_err("Connection to the CPG API failed: %d", rc); return FALSE; } return TRUE; } static void pcmk_cpg_deliver ( cpg_handle_t handle, const struct cpg_name *groupName, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { AIS_Message *ais_msg = (AIS_Message*)msg; if(ais_msg->sender.id > 0 && ais_msg->sender.id != nodeid) { crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, ais_msg->sender.id); return; } else if(ais_msg->host.size != 0 && safe_str_neq(ais_msg->host.uname, pcmk_uname)) { /* Not for us */ return; } ais_msg->sender.id = nodeid; if(ais_msg->sender.size == 0) { crm_node_t *peer = crm_get_peer(nodeid, NULL); if(peer == NULL) { crm_err("Peer with nodeid=%u is unknown", nodeid); } else if(peer->uname == NULL) { crm_err("No uname for peer with nodeid=%u", nodeid); } else { crm_notice("Fixing uname for peer with nodeid=%u", nodeid); ais_msg->sender.size = strlen(peer->uname); memset(ais_msg->sender.uname, 0, MAX_NAME); memcpy(ais_msg->sender.uname, peer->uname, ais_msg->sender.size); } } ais_dispatch_message(ais_msg, pcmk_cpg_dispatch_fn); } static void pcmk_cpg_membership( cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries) { /* Don't care about CPG membership */ } static gboolean pcmk_quorum_dispatch(int sender, gpointer user_data) { int rc = 0; rc = quorum_dispatch(pcmk_quorum_handle, CS_DISPATCH_ALL); if(rc < 0) { crm_err("Connection to the Quorum API failed: %d", rc); return FALSE; } return TRUE; } static void pcmk_quorum_notification( quorum_handle_t handle, uint32_t quorate, uint64_t ring_id, uint32_t view_list_entries, uint32_t *view_list) { int i; if(quorate != crm_have_quorum) { crm_notice("Membership "U64T": quorum %s (%lu)", ring_id, quorate?"acquired":"lost", (long unsigned int)view_list_entries); crm_have_quorum = quorate; } else { crm_info("Membership "U64T": quorum %s (%lu)", ring_id, quorate?"retained":"still lost", (long unsigned int)view_list_entries); } for (i=0; ihost.type; int tmp_size = msg->header.size - sizeof(AIS_Message); if(sane && msg->header.size == 0) { crm_warn("Message with no size"); sane = FALSE; } if(sane && msg->header.error != CS_OK) { crm_warn("Message header contains an error: %d", msg->header.error); sane = FALSE; } if(sane && ais_data_len(msg) != tmp_size) { crm_warn("Message payload size is incorrect: expected %d, got %d", ais_data_len(msg), tmp_size); sane = TRUE; } if(sane && ais_data_len(msg) == 0) { crm_warn("Message with no payload"); sane = FALSE; } if(sane && data && msg->is_compressed == FALSE) { int str_size = strlen(data) + 1; if(ais_data_len(msg) != str_size) { int lpc = 0; crm_warn("Message payload is corrupted: expected %d bytes, got %d", ais_data_len(msg), str_size); sane = FALSE; for(lpc = (str_size - 10); lpc < msg->size; lpc++) { if(lpc < 0) { lpc = 0; } crm_debug("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]); } } } if(sane == FALSE) { crm_err("Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } else if(repaired) { crm_err("Repaired message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } else { crm_debug_3("Verfied message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } return sane; } #endif static int get_config_opt( confdb_handle_t config, hdb_handle_t object_handle, const char *key, char **value, const char *fallback) { size_t len = 0; char *env_key = NULL; const char *env_value = NULL; char buffer[256]; if(*value) { crm_free(*value); *value = NULL; } if(object_handle > 0) { if(CS_OK == confdb_key_get(config, object_handle, key, strlen(key), &buffer, &len)) { *value = crm_strdup(buffer); } } if (*value) { crm_info("Found '%s' for option: %s", *value, key); return 0; } env_key = crm_concat("HA", key, '_'); env_value = getenv(env_key); crm_free(env_key); if (*value) { crm_info("Found '%s' in ENV for option: %s", *value, key); *value = crm_strdup(env_value); return 0; } if(fallback) { crm_info("Defaulting to '%s' for option: %s", fallback, key); *value = crm_strdup(fallback); } else { crm_info("No default for option: %s", key); } return -1; } static confdb_handle_t config_find_init(confdb_handle_t config) { cs_error_t rc = CS_OK; confdb_handle_t local_handle = OBJECT_PARENT_HANDLE; rc = confdb_object_find_start(config, local_handle); if(rc == CS_OK) { return local_handle; } else { crm_err("Couldn't create search context: %d", rc); } return 0; } static hdb_handle_t config_find_next( confdb_handle_t config, const char *name, confdb_handle_t top_handle) { cs_error_t rc = CS_OK; hdb_handle_t local_handle = 0; if(top_handle == 0) { crm_err("Couldn't search for %s: no valid context", name); return 0; } crm_debug_2("Searching for %s in "HDB_X_FORMAT, name, top_handle); rc = confdb_object_find(config, top_handle, name, strlen(name), &local_handle); if(rc != CS_OK) { crm_info("No additional configuration supplied for: %s", name); local_handle = 0; } else { crm_info("Processing additional %s options...", name); } return local_handle; } enum cluster_type_e find_corosync_variant(void) { confdb_handle_t config; enum cluster_type_e found = pcmk_cluster_unknown; int rc; char *value = NULL; - gboolean have_quorum = FALSE; confdb_handle_t top_handle = 0; hdb_handle_t local_handle = 0; static confdb_callbacks_t callbacks = {}; rc = confdb_initialize (&config, &callbacks); if (rc != CS_OK) { crm_debug("Could not initialize Cluster Configuration Database API instance error %d", rc); return found; } top_handle = config_find_init(config); local_handle = config_find_next(config, "service", top_handle); - while(local_handle && have_quorum == FALSE) { + while(local_handle) { crm_free(value); get_config_opt(config, local_handle, "name", &value, NULL); if(safe_str_eq("pacemaker", value)) { found = pcmk_cluster_classic_ais; crm_free(value); get_config_opt(config, local_handle, "ver", &value, "0"); crm_trace("Found Pacemaker plugin version: %s", value); + break; } local_handle = config_find_next(config, "service", top_handle); } crm_free(value); if(found == pcmk_cluster_unknown) { top_handle = config_find_init(config); local_handle = config_find_next(config, "quorum", top_handle); get_config_opt(config, local_handle, "provider", &value, NULL); if(safe_str_eq("quorum_cman", value)) { crm_trace("Found CMAN quorum provider"); found = pcmk_cluster_cman; } } crm_free(value); if(found == pcmk_cluster_unknown) { - crm_trace("Defaulting to a 'standard' corosync cluster"); + crm_trace("Defaulting to a 'bare' corosync cluster"); found = pcmk_cluster_corosync; } confdb_finalize (config); return found; } diff --git a/lib/common/cluster.c b/lib/common/cluster.c index 17ac0a19f4..9094ca470d 100644 --- a/lib/common/cluster.c +++ b/lib/common/cluster.c @@ -1,417 +1,413 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include "stack.h" CRM_TRACE_INIT_DATA(cluster); xmlNode *create_common_message( xmlNode *original_request, xmlNode *xml_response_data); gboolean crm_cluster_connect( char **our_uname, char **our_uuid, void *dispatch, void *destroy, #if SUPPORT_HEARTBEAT ll_cluster_t **hb_conn #else void **hb_conn #endif ) { enum cluster_type_e type = get_cluster_type(); crm_info("Connecting to cluster infrastructure: %s", name_for_cluster_type(type)); if(hb_conn != NULL) { *hb_conn = NULL; } #if SUPPORT_COROSYNC if(is_openais_cluster()) { crm_peer_init(); return init_ais_connection(dispatch, destroy, our_uuid, our_uname, NULL); } #endif #if SUPPORT_HEARTBEAT if(is_heartbeat_cluster()) { int rv; CRM_ASSERT(hb_conn != NULL); if(*hb_conn == NULL) { /* No object passed in, create a new one. */ *hb_conn = ll_cluster_new("heartbeat"); } else { /* Object passed in. Disconnect first, then reconnect below. */ ll_cluster_t *conn = *hb_conn; conn->llc_ops->signoff(conn, FALSE); } /* make sure we are disconnected first with the old object, if any. */ if (heartbeat_cluster && heartbeat_cluster != *hb_conn) { heartbeat_cluster->llc_ops->signoff(heartbeat_cluster, FALSE); } CRM_ASSERT(*hb_conn != NULL); heartbeat_cluster = *hb_conn; rv = register_heartbeat_conn( heartbeat_cluster, our_uuid, our_uname, dispatch, destroy); if (rv) { /* we'll benefit from a bigger queue length on heartbeat side. * Otherwise, if peers send messages faster than we can consume * them right now, heartbeat messaging layer will kick us out once * it's (small) default queue fills up :( * If we fail to adjust the sendq length, that's not yet fatal, though. */ if (HA_OK != (*hb_conn)->llc_ops->set_sendq_len(*hb_conn, 1024)) { crm_warn("Cannot set sendq length: %s", (*hb_conn)->llc_ops->errmsg(*hb_conn)); } } return rv; } #endif crm_info("Unsupported cluster stack: %s", getenv("HA_cluster_type")); return FALSE; } gboolean send_cluster_message( const char *node, enum crm_ais_msg_types service, xmlNode *data, gboolean ordered) { #if SUPPORT_COROSYNC if(is_openais_cluster()) { return send_ais_message(data, FALSE, node, service); } #endif #if SUPPORT_HEARTBEAT if(is_heartbeat_cluster()) { return send_ha_message(heartbeat_cluster, data, node, ordered); } #endif return FALSE; } static GHashTable *crm_uuid_cache = NULL; static GHashTable *crm_uname_cache = NULL; void empty_uuid_cache(void) { if(crm_uuid_cache != NULL) { g_hash_table_destroy(crm_uuid_cache); crm_uuid_cache = NULL; } } void unget_uuid(const char *uname) { if(crm_uuid_cache == NULL) { return; } g_hash_table_remove(crm_uuid_cache, uname); } const char * get_uuid(const char *uname) { char *uuid_calc = NULL; CRM_CHECK(uname != NULL, return NULL); if(crm_uuid_cache == NULL) { crm_uuid_cache = g_hash_table_new_full( g_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); } CRM_CHECK(uname != NULL, return NULL); /* avoid blocking calls where possible */ uuid_calc = g_hash_table_lookup(crm_uuid_cache, uname); if(uuid_calc != NULL) { return uuid_calc; } #if SUPPORT_COROSYNC if(is_openais_cluster()) { uuid_calc = crm_strdup(uname); goto fallback; } #endif #if SUPPORT_HEARTBEAT if(is_heartbeat_cluster()) { cl_uuid_t uuid_raw; const char *unknown = "00000000-0000-0000-0000-000000000000"; if(heartbeat_cluster == NULL) { crm_warn("No connection to heartbeat, using uuid=uname"); uuid_calc = crm_strdup(uname); goto fallback; } if(heartbeat_cluster->llc_ops->get_uuid_by_name( heartbeat_cluster, uname, &uuid_raw) == HA_FAIL) { crm_err("get_uuid_by_name() call failed for host %s", uname); crm_free(uuid_calc); return NULL; } crm_malloc0(uuid_calc, 50); cl_uuid_unparse(&uuid_raw, uuid_calc); if(safe_str_eq(uuid_calc, unknown)) { crm_warn("Could not calculate UUID for %s", uname); crm_free(uuid_calc); return NULL; } } #endif fallback: g_hash_table_insert(crm_uuid_cache, crm_strdup(uname), uuid_calc); uuid_calc = g_hash_table_lookup(crm_uuid_cache, uname); return uuid_calc; } const char * get_uname(const char *uuid) { char *uname = NULL; if(crm_uname_cache == NULL) { crm_uname_cache = g_hash_table_new_full( g_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); } CRM_CHECK(uuid != NULL, return NULL); /* avoid blocking calls where possible */ uname = g_hash_table_lookup(crm_uname_cache, uuid); if(uname != NULL) { return uname; } #if SUPPORT_COROSYNC if(is_openais_cluster()) { g_hash_table_insert(crm_uname_cache, crm_strdup(uuid), crm_strdup(uuid)); } #endif #if SUPPORT_HEARTBEAT if(is_heartbeat_cluster()) { if(heartbeat_cluster != NULL && uuid != NULL) { cl_uuid_t uuid_raw; char *uuid_copy = crm_strdup(uuid); cl_uuid_parse(uuid_copy, &uuid_raw); crm_malloc(uname, MAX_NAME); if(heartbeat_cluster->llc_ops->get_name_by_uuid( heartbeat_cluster, &uuid_raw, uname, MAX_NAME) == HA_FAIL) { crm_err("Could not calculate uname for %s", uuid); crm_free(uuid_copy); crm_free(uname); } else { g_hash_table_insert(crm_uname_cache, uuid_copy, uname); } } } #endif return g_hash_table_lookup(crm_uname_cache, uuid); } void set_uuid(xmlNode *node,const char *attr,const char *uname) { const char *uuid_calc = get_uuid(uname); crm_xml_add(node, attr, uuid_calc); return; } xmlNode* createPingAnswerFragment(const char *from, const char *status) { xmlNode *ping = NULL; ping = create_xml_node(NULL, XML_CRM_TAG_PING); crm_xml_add(ping, XML_PING_ATTR_STATUS, status); crm_xml_add(ping, XML_PING_ATTR_SYSFROM, from); return ping; } const char * name_for_cluster_type(enum cluster_type_e type) { switch(type) { case pcmk_cluster_classic_ais: return "classic openais (with plugin)"; case pcmk_cluster_cman: return "cman"; case pcmk_cluster_corosync: return "corosync"; case pcmk_cluster_heartbeat: return "heartbeat"; case pcmk_cluster_unknown: return "unknown"; case pcmk_cluster_invalid: return "invalid"; } crm_err("Invalid cluster type: %d", type); return "invalid"; } /* Do not expose these two */ int set_cluster_type(enum cluster_type_e type); static enum cluster_type_e cluster_type = pcmk_cluster_unknown; int set_cluster_type(enum cluster_type_e type) { if(cluster_type == pcmk_cluster_unknown) { crm_info("Cluster type set to: %s", name_for_cluster_type(type)); cluster_type = type; return 0; } else if(cluster_type == type) { return 0; } else if(pcmk_cluster_unknown == type) { cluster_type = type; return 0; } crm_err("Cluster type already set to %s, ignoring %s", name_for_cluster_type(cluster_type), name_for_cluster_type(type)); return -1; } enum cluster_type_e get_cluster_type(void) { if(cluster_type == pcmk_cluster_unknown) { const char *cluster = getenv("HA_cluster_type"); cluster_type = pcmk_cluster_invalid; if(cluster) { crm_info("Cluster type is: '%s'", cluster); } else { #if SUPPORT_COROSYNC cluster_type = find_corosync_variant(); if(cluster_type == pcmk_cluster_unknown) { cluster = "heartbeat"; crm_info("Assuming a 'heartbeat' based cluster"); } else { cluster = name_for_cluster_type(cluster_type); crm_info("Detected an active '%s' cluster", cluster); } #else cluster = "heartbeat"; #endif } if(safe_str_eq(cluster, "heartbeat")) { #if SUPPORT_HEARTBEAT cluster_type = pcmk_cluster_heartbeat; #else - crm_crit("This installation of Pacemaker does not support the '%s' cluster infrastructure. Terminating.", - cluster); - exit(100); + cluster_type = pcmk_cluster_invalid; #endif } else if(safe_str_eq(cluster, "openais")) { #if SUPPORT_COROSYNC cluster_type = pcmk_cluster_classic_ais; #else - crm_crit("This installation of Pacemaker does not support the '%s' cluster infrastructure. Terminating.", - cluster); - exit(100); + cluster_type = pcmk_cluster_invalid; #endif } else if(safe_str_eq(cluster, "corosync")) { #if SUPPORT_COROSYNC cluster_type = pcmk_cluster_corosync; #else - crm_crit("This installation of Pacemaker does not support the '%s' cluster infrastructure. Terminating.", - cluster); - exit(100); + cluster_type = pcmk_cluster_invalid; #endif } else if(safe_str_eq(cluster, "cman")) { #if SUPPORT_CMAN cluster_type = pcmk_cluster_cman; #else - crm_crit("This installation of Pacemaker does not support the '%s' cluster infrastructure. Terminating.", - cluster); - exit(100); + cluster_type = pcmk_cluster_invalid; #endif } else { - crm_crit("Unknown cluster type: '%s'. Terminating.", cluster); + cluster_type = pcmk_cluster_invalid; + } + + if(cluster_type == pcmk_cluster_invalid) { + crm_crit("This installation of Pacemaker does not support the '%s' cluster infrastructure. Terminating.", cluster); exit(100); } } return cluster_type; } gboolean is_cman_cluster(void) { return get_cluster_type() == pcmk_cluster_cman; } gboolean is_corosync_cluster(void) { return get_cluster_type() == pcmk_cluster_corosync; } gboolean is_classic_ais_cluster(void) { return get_cluster_type() == pcmk_cluster_classic_ais; } gboolean is_openais_cluster(void) { enum cluster_type_e type = get_cluster_type(); if(type == pcmk_cluster_classic_ais) { return TRUE; } else if(type == pcmk_cluster_corosync) { return TRUE; } else if(type == pcmk_cluster_cman) { return TRUE; } return FALSE; } gboolean is_heartbeat_cluster(void) { return get_cluster_type() == pcmk_cluster_heartbeat; } diff --git a/mcp/Makefile.am b/mcp/Makefile.am index 535087e782..b6f3f2af26 100644 --- a/mcp/Makefile.am +++ b/mcp/Makefile.am @@ -1,52 +1,52 @@ # # Copyright (C) 2004-2009 Andrew Beekhof # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # MAINTAINERCLEANFILES = Makefile.in if BUILD_CS_SUPPORT INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include \ -I$(top_builddir)/libltdl -I$(top_srcdir)/libltdl initdir = $(INITDIR) init_SCRIPTS = pacemaker sbin_PROGRAMS = pacemakerd if BUILD_HELP man8_MANS = $(sbin_PROGRAMS:%=%.8) endif ## SOURCES noinst_HEADERS = pacemakerd_SOURCES = pacemaker.c corosync.c -pacemakerd_LDADD = $(CLUSTERLIBS) $(top_builddir)/lib/common/libcrmcommon.la -lcfg -lconfdb +pacemakerd_LDADD = $(CLUSTERLIBS) $(top_builddir)/lib/common/libcrmcluster.la $(top_builddir)/lib/common/libcrmcommon.la -lcfg -lconfdb %.8: % echo Creating $@ chmod a+x $(top_builddir)/mcp/$< help2man --output $@ --no-info --section 8 --name "Part of the Pacemaker cluster resource manager" $(top_builddir)/mcp/$< clean-generic: rm -f *.log *.debug *.xml *~ install-exec-local: uninstall-local: .PHONY: install-exec-hook endif diff --git a/mcp/corosync.c b/mcp/corosync.c index a379b3af10..f7e0fa4436 100644 --- a/mcp/corosync.c +++ b/mcp/corosync.c @@ -1,657 +1,561 @@ /* * Copyright (C) 2010 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include +#include + #ifdef SUPPORT_CMAN #include #endif static struct cpg_name cpg_group = { .length = 0, .value[0] = 0, }; gboolean use_cman = FALSE; static cpg_handle_t cpg_handle; static corosync_cfg_handle_t cfg_handle; static corosync_cfg_state_notification_t cfg_buffer; -static inline const char *ais_error2text(int error) -{ - const char *text = "unknown"; - switch(error) { - case CS_OK: - text = "None"; - break; - case CS_ERR_LIBRARY: - text = "Library error"; - break; - case CS_ERR_VERSION: - text = "Version error"; - break; - case CS_ERR_INIT: - text = "Initialization error"; - break; - case CS_ERR_TIMEOUT: - text = "Timeout"; - break; - case CS_ERR_TRY_AGAIN: - text = "Try again"; - break; - case CS_ERR_INVALID_PARAM: - text = "Invalid parameter"; - break; - case CS_ERR_NO_MEMORY: - text = "No memory"; - break; - case CS_ERR_BAD_HANDLE: - text = "Bad handle"; - break; - case CS_ERR_BUSY: - text = "Busy"; - break; - case CS_ERR_ACCESS: - text = "Access error"; - break; - case CS_ERR_NOT_EXIST: - text = "Doesn't exist"; - break; - case CS_ERR_NAME_TOO_LONG: - text = "Name too long"; - break; - case CS_ERR_EXIST: - text = "Exists"; - break; - case CS_ERR_NO_SPACE: - text = "No space"; - break; - case CS_ERR_INTERRUPT: - text = "Interrupt"; - break; - case CS_ERR_NAME_NOT_FOUND: - text = "Name not found"; - break; - case CS_ERR_NO_RESOURCES: - text = "No resources"; - break; - case CS_ERR_NOT_SUPPORTED: - text = "Not supported"; - break; - case CS_ERR_BAD_OPERATION: - text = "Bad operation"; - break; - case CS_ERR_FAILED_OPERATION: - text = "Failed operation"; - break; - case CS_ERR_MESSAGE_ERROR: - text = "Message error"; - break; - case CS_ERR_QUEUE_FULL: - text = "Queue full"; - break; - case CS_ERR_QUEUE_NOT_AVAILABLE: - text = "Queue not available"; - break; - case CS_ERR_BAD_FLAGS: - text = "Bad flags"; - break; - case CS_ERR_TOO_BIG: - text = "To big"; - break; - case CS_ERR_NO_SECTIONS: - text = "No sections"; - break; - } - return text; -} - /* =::=::=::= CFG - Shutdown stuff =::=::=::= */ static void cfg_shutdown_callback(corosync_cfg_handle_t h, corosync_cfg_shutdown_flags_t flags) { crm_info("Corosync wants to shut down: %s", (flags==COROSYNC_CFG_SHUTDOWN_FLAG_IMMEDIATE)?"immediate": (flags==COROSYNC_CFG_SHUTDOWN_FLAG_REGARDLESS)?"forced":"optional" ); /* Never allow corosync to shut down while we're running */ corosync_cfg_replyto_shutdown(h, COROSYNC_CFG_SHUTDOWN_FLAG_NO); } static corosync_cfg_callbacks_t cfg_callbacks = { .corosync_cfg_shutdown_callback = cfg_shutdown_callback, .corosync_cfg_state_track_callback = NULL, }; static gboolean pcmk_cfg_dispatch(int sender, gpointer user_data) { corosync_cfg_handle_t *handle = (corosync_cfg_handle_t*)user_data; cs_error_t rc = corosync_cfg_dispatch(*handle, CS_DISPATCH_ALL); if (rc != CS_OK) { return FALSE; } return TRUE; } static void cfg_connection_destroy(gpointer user_data) { crm_err("Connection destroyed"); cfg_handle = 0; return; } gboolean cluster_disconnect_cfg(void) { if(cfg_handle) { corosync_cfg_finalize(cfg_handle); cfg_handle = 0; } return TRUE; } #define cs_repeat(counter, max, code) do { \ code; \ if(rc == CS_ERR_TRY_AGAIN) { \ counter++; \ crm_debug("Retrying operation after %ds", counter); \ sleep(counter); \ } \ } while(rc == CS_ERR_TRY_AGAIN && counter < max) gboolean cluster_connect_cfg(uint32_t *nodeid) { cs_error_t rc; int fd = 0, retries = 0; cs_repeat( retries, 30, rc = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)); if (rc != CS_OK) { crm_err("corosync cfg init error %d", rc); return FALSE; } rc = corosync_cfg_fd_get(cfg_handle, &fd); if (rc != CS_OK) { crm_err("corosync cfg fd_get error %d", rc); goto bail; } retries = 0; cs_repeat(retries, 30, rc = corosync_cfg_local_get(cfg_handle, nodeid)); if (rc != CS_OK) { crm_err("corosync cfg local_get error %d", rc); goto bail; } crm_debug("Our nodeid: %d", *nodeid); retries = 0; cs_repeat( retries, 30, rc = corosync_cfg_state_track (cfg_handle, 0, &cfg_buffer)); if (rc != CS_OK) { crm_err("corosync cfg stack_track error %d", rc); goto bail; } crm_debug("Adding fd=%d to mainloop", fd); G_main_add_fd( G_PRIORITY_HIGH, fd, FALSE, pcmk_cfg_dispatch, &cfg_handle, cfg_connection_destroy); return TRUE; bail: corosync_cfg_finalize(cfg_handle); return FALSE; } /* =::=::=::= CPG - Closed Process Group Messaging =::=::=::= */ static gboolean pcmk_cpg_dispatch(int sender, gpointer user_data) { cpg_handle_t *handle = (cpg_handle_t*)user_data; cs_error_t rc = cpg_dispatch(*handle, CS_DISPATCH_ALL); if (rc != CS_OK) { return FALSE; } return TRUE; } static void cpg_connection_destroy(gpointer user_data) { crm_err("Connection destroyed"); cpg_handle = 0; return; } static void pcmk_cpg_deliver ( cpg_handle_t handle, const struct cpg_name *groupName, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { if(nodeid != local_nodeid) { uint32_t procs = 0; xmlNode *xml = string2xml(msg); const char *uname = crm_element_value(xml, "uname"); crm_element_value_int(xml, "proclist", (int*)&procs); /* crm_debug("Got proclist %.32x from %s", procs, uname); */ if(update_node_processes(nodeid, uname, procs)) { update_process_clients(); } } } static void pcmk_cpg_membership( cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries) { /* Don't care about CPG membership */ update_process_peers(); } cpg_callbacks_t cpg_callbacks = { .cpg_deliver_fn = pcmk_cpg_deliver, .cpg_confchg_fn = pcmk_cpg_membership, }; gboolean cluster_disconnect_cpg(void) { if(cpg_handle) { cpg_finalize(cpg_handle); cpg_handle = 0; } return TRUE; } gboolean cluster_connect_cpg(void) { cs_error_t rc; unsigned int nodeid; int fd; int retries = 0; strcpy(cpg_group.value, "pcmk"); cpg_group.length = strlen(cpg_group.value)+1; retries = 0; cs_repeat(retries, 30, rc = cpg_initialize(&cpg_handle, &cpg_callbacks)); if (rc != CS_OK) { crm_err("corosync cpg init error %d", rc); return FALSE; } rc = cpg_fd_get(cpg_handle, &fd); if (rc != CS_OK) { crm_err("corosync cpg fd_get error %d", rc); goto bail; } retries = 0; cs_repeat(retries, 30, rc = cpg_local_get(cpg_handle, &nodeid)); if (rc != CS_OK) { crm_err("corosync cpg local_get error %d", rc); goto bail; } crm_debug("Our nodeid: %d", nodeid); retries = 0; cs_repeat(retries, 30, rc = cpg_join(cpg_handle, &cpg_group)); if (rc != CS_OK) { crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc); goto bail; } crm_debug("Adding fd=%d to mainloop", fd); G_main_add_fd( G_PRIORITY_HIGH, fd, FALSE, pcmk_cpg_dispatch, &cpg_handle, cpg_connection_destroy); return TRUE; bail: cpg_finalize(cpg_handle); return FALSE; } gboolean send_cpg_message(struct iovec *iov) { int rc = CS_OK; int retries = 0; errno = 0; do { rc = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 1); if(rc == CS_ERR_TRY_AGAIN) { cpg_flow_control_state_t fc_state = CPG_FLOW_CONTROL_DISABLED; int rc2 = cpg_flow_control_state_get (cpg_handle, &fc_state); if (rc2 == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) { crm_debug("Attempting to clear cpg dispatch queue"); rc2 = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL); } if (rc2 != CS_OK) { crm_warn("Could not check/clear the cpg connection"); goto bail; } else { retries++; crm_debug("Retrying operation after %ds", retries); sleep(retries); } } /* 5 retires is plenty, we'll resend once the membership reforms anyway */ } while(rc == CS_ERR_TRY_AGAIN && retries < 5); bail: if(rc != CS_OK) { crm_err("Sending message via cpg FAILED: (rc=%d) %s", rc, ais_error2text(rc)); } return (rc == CS_OK); } /* =::=::=::= Configuration =::=::=::= */ static int get_config_opt( confdb_handle_t config, hdb_handle_t object_handle, const char *key, char **value, const char *fallback) { size_t len = 0; char *env_key = NULL; const char *env_value = NULL; char buffer[256]; if(*value) { crm_free(*value); *value = NULL; } if(object_handle > 0) { if(CS_OK == confdb_key_get(config, object_handle, key, strlen(key), &buffer, &len)) { *value = crm_strdup(buffer); } } if (*value) { crm_info("Found '%s' for option: %s", *value, key); return 0; } env_key = crm_concat("HA", key, '_'); env_value = getenv(env_key); crm_free(env_key); if (*value) { crm_info("Found '%s' in ENV for option: %s", *value, key); *value = crm_strdup(env_value); return 0; } if(fallback) { crm_info("Defaulting to '%s' for option: %s", fallback, key); *value = crm_strdup(fallback); } else { crm_info("No default for option: %s", key); } return -1; } static confdb_handle_t config_find_init(confdb_handle_t config) { cs_error_t rc = CS_OK; confdb_handle_t local_handle = OBJECT_PARENT_HANDLE; rc = confdb_object_find_start(config, local_handle); if(rc == CS_OK) { return local_handle; } else { crm_err("Couldn't create search context: %d", rc); } return 0; } static hdb_handle_t config_find_next( confdb_handle_t config, const char *name, confdb_handle_t top_handle) { cs_error_t rc = CS_OK; hdb_handle_t local_handle = 0; if(top_handle == 0) { crm_err("Couldn't search for %s: no valid context", name); return 0; } crm_debug_2("Searching for %s in "HDB_X_FORMAT, name, top_handle); rc = confdb_object_find(config, top_handle, name, strlen(name), &local_handle); if(rc != CS_OK) { crm_info("No additional configuration supplied for: %s", name); local_handle = 0; } else { crm_info("Processing additional %s options...", name); } return local_handle; } char *get_local_node_name(void) { char *name = NULL; struct utsname res; if(use_cman) { #ifdef SUPPORT_CMAN cman_node_t us; cman_handle_t cman; cman = cman_init(NULL); if(cman != NULL && cman_is_active(cman)) { us.cn_name[0] = 0; cman_get_node(cman, CMAN_NODEID_US, &us); name = crm_strdup(us.cn_name); crm_info("Using CMAN node name: %s", name); } else { crm_err("Couldn't determin node name from CMAN"); } cman_finish(cman); #endif } else if(uname(&res) < 0) { crm_perror(LOG_ERR,"Could not determin the current host"); exit(100); } else { name = crm_strdup(res.nodename); } return name; } gboolean read_config(void) { confdb_handle_t config; int rc; char *value = NULL; gboolean have_log = FALSE; - gboolean have_quorum = FALSE; confdb_handle_t top_handle = 0; hdb_handle_t local_handle = 0; static confdb_callbacks_t callbacks = {}; + enum cluster_type_e stack = get_cluster_type(); + crm_info("Reading configure for stack: %s", name_for_cluster_type(stack)); rc = confdb_initialize (&config, &callbacks); if (rc != CS_OK) { printf ("Could not initialize Cluster Configuration Database API instance error %d\n", rc); return FALSE; } - crm_info("Reading configure"); - /* =::=::= Should we be here =::=::= */ - - top_handle = config_find_init(config); - local_handle = config_find_next(config, "service", top_handle); - while(local_handle && have_quorum == FALSE) { - crm_free(value); - get_config_opt(config, local_handle, "name", &value, NULL); - if(safe_str_eq("pacemaker", value)) { - have_quorum = TRUE; - setenv("HA_cluster_type", "openais", 1); + if(stack == pcmk_cluster_corosync) { + setenv("HA_cluster_type", "corosync", 1); + + } else if(stack == pcmk_cluster_cman) { + setenv("HA_cluster_type", "cman", 1); + enable_crmd_as_root(TRUE); + use_cman = TRUE; + + } else if(stack == pcmk_cluster_classic_ais) { + setenv("HA_cluster_type", "openais", 1); + + /* Look for a service block to indicate our plugin is loaded */ + top_handle = config_find_init(config); + local_handle = config_find_next(config, "service", top_handle); + while(local_handle) { crm_free(value); - get_config_opt(config, local_handle, "ver", &value, "0"); - if(safe_str_eq(value, "1")) { - crm_free(value); - get_config_opt(config, local_handle, "use_logd", &value, "no"); - setenv("HA_use_logd", value, 1); - + get_config_opt(config, local_handle, "name", &value, NULL); + if(safe_str_eq("pacemaker", value)) { crm_free(value); - get_config_opt(config, local_handle, "use_mgmtd", &value, "no"); - enable_mgmtd(crm_is_true(value)); - - } else { - crm_err("We can only start Pacemaker from init if using version 1" - " of the Pacemaker plugin for Corosync. Terminating."); - exit(100); + get_config_opt(config, local_handle, "ver", &value, "0"); + if(safe_str_eq(value, "1")) { + crm_free(value); + get_config_opt(config, local_handle, "use_logd", &value, "no"); + setenv("HA_use_logd", value, 1); + + crm_free(value); + get_config_opt(config, local_handle, "use_mgmtd", &value, "no"); + enable_mgmtd(crm_is_true(value)); + + } else { + crm_err("We can only start Pacemaker from init if using version 1" + " of the Pacemaker plugin for Corosync. Terminating."); + exit(100); + } + break; } - + local_handle = config_find_next(config, "service", top_handle); } - local_handle = config_find_next(config, "service", top_handle); + crm_free(value); + + } else { + crm_err("Unsupported stack type: %s", name_for_cluster_type(stack)); + return FALSE; } /* =::=::= Logging =::=::= */ - crm_free(value); top_handle = config_find_init(config); local_handle = config_find_next(config, "logging", top_handle); get_config_opt(config, local_handle, "debug", &value, "on"); if(crm_is_true(value) && crm_log_level < LOG_DEBUG) { crm_log_level = LOG_DEBUG; } if(crm_log_level >= LOG_DEBUG) { char *level = crm_itoa(crm_log_level - LOG_INFO); setenv("HA_debug", level, 1); crm_free(level); } get_config_opt(config, local_handle, "to_logfile", &value, "off"); if(crm_is_true(value)) { get_config_opt(config, local_handle, "logfile", &value, NULL); if(value == NULL) { crm_err("Logging to a file requested but no log file specified"); } else { uid_t pcmk_uid = geteuid(); uid_t pcmk_gid = getegid(); FILE *logfile = fopen(value, "a"); if(logfile) { int logfd = fileno(logfile); setenv("HA_debugfile", value, 1); /* Ensure the file has the correct permissions */ fchown(logfd, pcmk_uid, pcmk_gid); fchmod(logfd, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); fprintf(logfile, "Set r/w permissions for uid=%d, gid=%d on %s\n", pcmk_uid, pcmk_gid, value); fflush(logfile); fsync(logfd); fclose(logfile); have_log = TRUE; } else { crm_err("Couldn't create logfile: %s", value); } } } get_config_opt(config, local_handle, "to_syslog", &value, "on"); if(have_log && crm_is_true(value) == FALSE) { crm_info("User configured file based logging and explicitly disabled syslog."); value = NULL; } else { if(crm_is_true(value) == FALSE) { crm_err("Please enable some sort of logging, either 'to_file: on' or 'to_syslog: on'."); crm_err("If you use file logging, be sure to also define a value for 'logfile'"); } get_config_opt(config, local_handle, "syslog_facility", &value, "daemon"); } setenv("HA_logfacility", value?value:"none", 1); setenv("HA_LOGFACILITY", value?value:"none", 1); - - /* =::=::= Quorum =::=::= */ - - if(have_quorum == FALSE) { - top_handle = config_find_init(config); - local_handle = config_find_next(config, "quorum", top_handle); - get_config_opt(config, local_handle, "provider", &value, NULL); - - if(value) { - have_quorum = TRUE; - if(safe_str_eq("quorum_cman", value)) { -#ifdef SUPPORT_CMAN - setenv("HA_cluster_type", "cman", 1); - enable_crmd_as_root(TRUE); - use_cman = TRUE; -#else - crm_err("Corosync configured for CMAN but this build of Pacemaker doesn't support it"); - exit(100); -#endif - } - } - } confdb_finalize (config); crm_free(value); return TRUE; }