diff --git a/cib/callbacks.c b/cib/callbacks.c index 8615e88ec3..681602843f 100644 --- a/cib/callbacks.c +++ b/cib/callbacks.c @@ -1,1613 +1,1621 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "common.h" typedef struct cib_local_notify_s { xmlNode *notify_src; char *client_id; gboolean from_peer; gboolean sync_reply; } cib_local_notify_t; int next_client_id = 0; gboolean legacy_mode = FALSE; qb_ipcs_service_t *ipcs_ro = NULL; qb_ipcs_service_t *ipcs_rw = NULL; qb_ipcs_service_t *ipcs_shm = NULL; gint cib_GCompareFunc(gconstpointer a, gconstpointer b); gboolean can_write(int flags); void send_cib_replace(const xmlNode * sync_request, const char *host); void cib_process_request(xmlNode * request, gboolean privileged, gboolean force_synchronous, gboolean from_peer, crm_client_t * cib_client); int cib_process_command(xmlNode * request, xmlNode ** reply, xmlNode ** cib_diff, gboolean privileged); gboolean cib_common_callback(qb_ipcs_connection_t * c, void *data, size_t size, gboolean privileged); static gboolean cib_read_legacy_mode(void) { static gboolean init = TRUE; static gboolean legacy = FALSE; if(init) { init = FALSE; legacy = daemon_option_enabled("cib", "legacy"); if(legacy) { crm_notice("Enabled legacy mode"); } } return legacy; } static gboolean cib_legacy_mode(void) { if(cib_read_legacy_mode()) { return TRUE; } return legacy_mode; } static int32_t cib_ipc_accept(qb_ipcs_connection_t * c, uid_t uid, gid_t gid) { crm_trace("Connection %p", c); if (cib_shutdown_flag) { crm_info("Ignoring new client [%d] during shutdown", crm_ipcs_client_pid(c)); return -EPERM; } if (crm_client_new(c, uid, gid) == NULL) { return -EIO; } return 0; } static void cib_ipc_created(qb_ipcs_connection_t * c) { crm_trace("Connection %p", c); } static int32_t cib_ipc_dispatch_rw(qb_ipcs_connection_t * c, void *data, size_t size) { crm_client_t *client = crm_client_get(c); crm_trace("%p message from %s", c, client->id); return cib_common_callback(c, data, size, TRUE); } static int32_t cib_ipc_dispatch_ro(qb_ipcs_connection_t * c, void *data, size_t size) { crm_client_t *client = crm_client_get(c); crm_trace("%p message from %s", c, client->id); return cib_common_callback(c, data, size, FALSE); } /* Error code means? */ static int32_t cib_ipc_closed(qb_ipcs_connection_t * c) { crm_client_t *client = crm_client_get(c); if (client == NULL) { return 0; } crm_trace("Connection %p", c); crm_client_destroy(client); return 0; } static void cib_ipc_destroy(qb_ipcs_connection_t * c) { crm_trace("Connection %p", c); cib_ipc_closed(c); if (cib_shutdown_flag) { cib_shutdown(0); } } struct qb_ipcs_service_handlers ipc_ro_callbacks = { .connection_accept = cib_ipc_accept, .connection_created = cib_ipc_created, .msg_process = cib_ipc_dispatch_ro, .connection_closed = cib_ipc_closed, .connection_destroyed = cib_ipc_destroy }; struct qb_ipcs_service_handlers ipc_rw_callbacks = { .connection_accept = cib_ipc_accept, .connection_created = cib_ipc_created, .msg_process = cib_ipc_dispatch_rw, .connection_closed = cib_ipc_closed, .connection_destroyed = cib_ipc_destroy }; void cib_common_callback_worker(uint32_t id, uint32_t flags, xmlNode * op_request, crm_client_t * cib_client, gboolean privileged) { const char *op = crm_element_value(op_request, F_CIB_OPERATION); if (crm_str_eq(op, CRM_OP_REGISTER, TRUE)) { if (flags & crm_ipc_client_response) { xmlNode *ack = create_xml_node(NULL, __FUNCTION__); crm_xml_add(ack, F_CIB_OPERATION, CRM_OP_REGISTER); crm_xml_add(ack, F_CIB_CLIENTID, cib_client->id); crm_ipcs_send(cib_client, id, ack, flags); cib_client->request_id = 0; free_xml(ack); } return; } else if (crm_str_eq(op, T_CIB_NOTIFY, TRUE)) { /* Update the notify filters for this client */ int on_off = 0; long long bit = 0; const char *type = crm_element_value(op_request, F_CIB_NOTIFY_TYPE); crm_element_value_int(op_request, F_CIB_NOTIFY_ACTIVATE, &on_off); crm_debug("Setting %s callbacks for %s (%s): %s", type, cib_client->name, cib_client->id, on_off ? "on" : "off"); if (safe_str_eq(type, T_CIB_POST_NOTIFY)) { bit = cib_notify_post; } else if (safe_str_eq(type, T_CIB_PRE_NOTIFY)) { bit = cib_notify_pre; } else if (safe_str_eq(type, T_CIB_UPDATE_CONFIRM)) { bit = cib_notify_confirm; } else if (safe_str_eq(type, T_CIB_DIFF_NOTIFY)) { bit = cib_notify_diff; } else if (safe_str_eq(type, T_CIB_REPLACE_NOTIFY)) { bit = cib_notify_replace; } if (on_off) { set_bit(cib_client->options, bit); } else { clear_bit(cib_client->options, bit); } if (flags & crm_ipc_client_response) { /* TODO - include rc */ crm_ipcs_send_ack(cib_client, id, flags, "ack", __FUNCTION__, __LINE__); } return; } cib_process_request(op_request, FALSE, privileged, FALSE, cib_client); } int32_t cib_common_callback(qb_ipcs_connection_t * c, void *data, size_t size, gboolean privileged) { uint32_t id = 0; uint32_t flags = 0; int call_options = 0; crm_client_t *cib_client = crm_client_get(c); xmlNode *op_request = crm_ipcs_recv(cib_client, data, size, &id, &flags); if (op_request) { crm_element_value_int(op_request, F_CIB_CALLOPTS, &call_options); } if (op_request == NULL) { crm_trace("Invalid message from %p", c); crm_ipcs_send_ack(cib_client, id, flags, "nack", __FUNCTION__, __LINE__); return 0; } else if(cib_client == NULL) { crm_trace("Invalid client %p", c); return 0; } if (is_set(call_options, cib_sync_call)) { CRM_ASSERT(flags & crm_ipc_client_response); CRM_LOG_ASSERT(cib_client->request_id == 0); /* This means the client has two synchronous events in-flight */ cib_client->request_id = id; /* Reply only to the last one */ } if (cib_client->name == NULL) { const char *value = crm_element_value(op_request, F_CIB_CLIENTNAME); if (value == NULL) { cib_client->name = crm_itoa(cib_client->pid); } else { cib_client->name = strdup(value); } } crm_xml_add(op_request, F_CIB_CLIENTID, cib_client->id); crm_xml_add(op_request, F_CIB_CLIENTNAME, cib_client->name); #if ENABLE_ACL CRM_ASSERT(cib_client->user != NULL); crm_acl_get_set_user(op_request, F_CIB_USER, cib_client->user); #endif crm_log_xml_trace(op_request, "Client[inbound]"); cib_common_callback_worker(id, flags, op_request, cib_client, privileged); free_xml(op_request); return 0; } static uint64_t ping_seq = 0; static char *ping_digest = NULL; static bool ping_modified_since = FALSE; int sync_our_cib(xmlNode * request, gboolean all); static gboolean cib_digester_cb(gpointer data) { if (cib_is_master) { char buffer[32]; xmlNode *ping = create_xml_node(NULL, "ping"); ping_seq++; free(ping_digest); ping_digest = NULL; ping_modified_since = FALSE; snprintf(buffer, 32, U64T, ping_seq); crm_trace("Requesting peer digests (%s)", buffer); crm_xml_add(ping, F_TYPE, "cib"); crm_xml_add(ping, F_CIB_OPERATION, CRM_OP_PING); crm_xml_add(ping, F_CIB_PING_ID, buffer); crm_xml_add(ping, XML_ATTR_CRM_VERSION, CRM_FEATURE_SET); send_cluster_message(NULL, crm_msg_cib, ping, TRUE); free_xml(ping); } return FALSE; } static void process_ping_reply(xmlNode *reply) { uint64_t seq = 0; const char *host = crm_element_value(reply, F_ORIG); xmlNode *pong = get_message_xml(reply, F_CIB_CALLDATA); const char *seq_s = crm_element_value(pong, F_CIB_PING_ID); const char *digest = crm_element_value(pong, XML_ATTR_DIGEST); if (seq_s) { seq = crm_int_helper(seq_s, NULL); } if(digest == NULL) { crm_trace("Ignoring ping reply %s from %s with no digest", seq_s, host); } else if(seq != ping_seq) { crm_trace("Ignoring out of sequence ping reply %s from %s", seq_s, host); } else if(ping_modified_since) { crm_trace("Ignoring ping reply %s from %s: cib updated since", seq_s, host); } else { const char *version = crm_element_value(pong, XML_ATTR_CRM_VERSION); if(ping_digest == NULL) { crm_trace("Calculating new digest"); ping_digest = calculate_xml_versioned_digest(the_cib, FALSE, TRUE, version); } crm_trace("Processing ping reply %s from %s (%s)", seq_s, host, digest); if(safe_str_eq(ping_digest, digest) == FALSE) { xmlNode *remote_cib = get_message_xml(pong, F_CIB_CALLDATA); crm_notice("Local CIB %s.%s.%s.%s differs from %s: %s.%s.%s.%s %p", crm_element_value(the_cib, XML_ATTR_GENERATION_ADMIN), crm_element_value(the_cib, XML_ATTR_GENERATION), crm_element_value(the_cib, XML_ATTR_NUMUPDATES), ping_digest, host, remote_cib?crm_element_value(remote_cib, XML_ATTR_GENERATION_ADMIN):"_", remote_cib?crm_element_value(remote_cib, XML_ATTR_GENERATION):"_", remote_cib?crm_element_value(remote_cib, XML_ATTR_NUMUPDATES):"_", digest, remote_cib); if(remote_cib && remote_cib->children) { /* Additional debug */ xml_calculate_changes(the_cib, remote_cib); xml_log_changes(LOG_INFO, __FUNCTION__, remote_cib); crm_trace("End of differences"); } free_xml(remote_cib); sync_our_cib(reply, FALSE); } } } static void do_local_notify(xmlNode * notify_src, const char *client_id, gboolean sync_reply, gboolean from_peer) { /* send callback to originating child */ crm_client_t *client_obj = NULL; int local_rc = pcmk_ok; int call_id = 0; crm_element_value_int(notify_src, F_CIB_CALLID, &call_id); if (client_id != NULL) { client_obj = crm_client_get_by_id(client_id); } if (client_obj == NULL) { local_rc = -ECONNRESET; crm_trace("No client to sent response %d to, F_CIB_CLIENTID not set.", call_id); } else { int rid = 0; if (sync_reply) { if (client_obj->ipcs) { CRM_LOG_ASSERT(client_obj->request_id); rid = client_obj->request_id; client_obj->request_id = 0; crm_trace("Sending response %d to %s %s", rid, client_obj->name, from_peer ? "(originator of delegated request)" : ""); } else { crm_trace("Sending response [call %d] to %s %s", call_id, client_obj->name, from_peer ? "(originator of delegated request)" : ""); } } else { crm_trace("Sending event %d to %s %s", call_id, client_obj->name, from_peer ? "(originator of delegated request)" : ""); } switch (client_obj->kind) { case CRM_CLIENT_IPC: if (crm_ipcs_send(client_obj, rid, notify_src, sync_reply?crm_ipc_flags_none:crm_ipc_server_event) < 0) { local_rc = -ENOMSG; } break; #ifdef HAVE_GNUTLS_GNUTLS_H case CRM_CLIENT_TLS: #endif case CRM_CLIENT_TCP: crm_remote_send(client_obj->remote, notify_src); break; default: crm_err("Unknown transport %d for %s", client_obj->kind, client_obj->name); } } if (local_rc != pcmk_ok && client_obj != NULL) { crm_warn("%sSync reply to %s failed: %s", sync_reply ? "" : "A-", client_obj ? client_obj->name : "", pcmk_strerror(local_rc)); } } static void parse_local_options_v1(crm_client_t * cib_client, int call_type, int call_options, const char *host, const char *op, gboolean * local_notify, gboolean * needs_reply, gboolean * process, gboolean * needs_forward) { if (cib_op_modifies(call_type) && !(call_options & cib_inhibit_bcast)) { /* we need to send an update anyway */ *needs_reply = TRUE; } else { *needs_reply = FALSE; } if (host == NULL && (call_options & cib_scope_local)) { crm_trace("Processing locally scoped %s op from %s", op, cib_client->name); *local_notify = TRUE; } else if (host == NULL && cib_is_master) { crm_trace("Processing master %s op locally from %s", op, cib_client->name); *local_notify = TRUE; } else if (safe_str_eq(host, cib_our_uname)) { crm_trace("Processing locally addressed %s op from %s", op, cib_client->name); *local_notify = TRUE; } else if (stand_alone) { *needs_forward = FALSE; *local_notify = TRUE; *process = TRUE; } else { crm_trace("%s op from %s needs to be forwarded to %s", op, cib_client->name, host ? host : "the master instance"); *needs_forward = TRUE; *process = FALSE; } } static void parse_local_options_v2(crm_client_t * cib_client, int call_type, int call_options, const char *host, const char *op, gboolean * local_notify, gboolean * needs_reply, gboolean * process, gboolean * needs_forward) { if (cib_op_modifies(call_type)) { if(safe_str_eq(op, CIB_OP_MASTER) || safe_str_eq(op, CIB_OP_SLAVE)) { /* Always handle these locally */ *process = TRUE; *needs_reply = FALSE; *local_notify = TRUE; *needs_forward = FALSE; return; } else { /* Redirect all other updates via CPG */ *needs_reply = TRUE; *needs_forward = TRUE; *process = FALSE; crm_trace("%s op from %s needs to be forwarded to %s", op, cib_client->name, host ? host : "the master instance"); return; } } *process = TRUE; *needs_reply = FALSE; *local_notify = TRUE; *needs_forward = FALSE; if (stand_alone) { crm_trace("Processing %s op from %s (stand-alone)", op, cib_client->name); } else if (host == NULL) { crm_trace("Processing unaddressed %s op from %s", op, cib_client->name); } else if (safe_str_eq(host, cib_our_uname)) { crm_trace("Processing locally addressed %s op from %s", op, cib_client->name); } else { crm_trace("%s op from %s needs to be forwarded to %s", op, cib_client->name, host); *needs_forward = TRUE; *process = FALSE; } } static void parse_local_options(crm_client_t * cib_client, int call_type, int call_options, const char *host, const char *op, gboolean * local_notify, gboolean * needs_reply, gboolean * process, gboolean * needs_forward) { if(cib_legacy_mode()) { parse_local_options_v1(cib_client, call_type, call_options, host, op, local_notify, needs_reply, process, needs_forward); } else { parse_local_options_v2(cib_client, call_type, call_options, host, op, local_notify, needs_reply, process, needs_forward); } } static gboolean parse_peer_options_v1(int call_type, xmlNode * request, gboolean * local_notify, gboolean * needs_reply, gboolean * process, gboolean * needs_forward) { const char *op = NULL; const char *host = NULL; const char *delegated = NULL; const char *originator = crm_element_value(request, F_ORIG); const char *reply_to = crm_element_value(request, F_CIB_ISREPLY); const char *update = crm_element_value(request, F_CIB_GLOBAL_UPDATE); gboolean is_reply = safe_str_eq(reply_to, cib_our_uname); if (crm_is_true(update)) { *needs_reply = FALSE; if (is_reply) { *local_notify = TRUE; crm_trace("Processing global/peer update from %s" " that originated from us", originator); } else { crm_trace("Processing global/peer update from %s", originator); } return TRUE; } crm_trace("Processing %s request sent by %s", op, originator); op = crm_element_value(request, F_CIB_OPERATION); if (safe_str_eq(op, "cib_shutdown_req")) { /* Always process these */ *local_notify = FALSE; if (reply_to == NULL || is_reply) { *process = TRUE; } if (is_reply) { *needs_reply = FALSE; } return *process; } if (is_reply && safe_str_eq(op, CRM_OP_PING)) { process_ping_reply(request); return FALSE; } if (is_reply) { crm_trace("Forward reply sent from %s to local clients", originator); *process = FALSE; *needs_reply = FALSE; *local_notify = TRUE; return TRUE; } host = crm_element_value(request, F_CIB_HOST); if (host != NULL && safe_str_eq(host, cib_our_uname)) { crm_trace("Processing %s request sent to us from %s", op, originator); return TRUE; } else if(is_reply == FALSE && safe_str_eq(op, CRM_OP_PING)) { crm_trace("Processing %s request sent to %s by %s", op, host?host:"everyone", originator); *needs_reply = TRUE; return TRUE; } else if (host == NULL && cib_is_master == TRUE) { crm_trace("Processing %s request sent to master instance from %s", op, originator); return TRUE; } delegated = crm_element_value(request, F_CIB_DELEGATED); if (delegated != NULL) { crm_trace("Ignoring msg for master instance"); } else if (host != NULL) { /* this is for a specific instance and we're not it */ crm_trace("Ignoring msg for instance on %s", crm_str(host)); } else if (reply_to == NULL && cib_is_master == FALSE) { /* this is for the master instance and we're not it */ crm_trace("Ignoring reply to %s", crm_str(reply_to)); } else if (safe_str_eq(op, "cib_shutdown_req")) { if (reply_to != NULL) { crm_debug("Processing %s from %s", op, host); *needs_reply = FALSE; } else { crm_debug("Processing %s reply from %s", op, host); } return TRUE; } else { crm_err("Nothing for us to do?"); crm_log_xml_err(request, "Peer[inbound]"); } return FALSE; } static gboolean parse_peer_options_v2(int call_type, xmlNode * request, gboolean * local_notify, gboolean * needs_reply, gboolean * process, gboolean * needs_forward) { const char *host = NULL; const char *delegated = crm_element_value(request, F_CIB_DELEGATED); const char *op = crm_element_value(request, F_CIB_OPERATION); const char *originator = crm_element_value(request, F_ORIG); const char *reply_to = crm_element_value(request, F_CIB_ISREPLY); const char *update = crm_element_value(request, F_CIB_GLOBAL_UPDATE); gboolean is_reply = safe_str_eq(reply_to, cib_our_uname); if(safe_str_eq(op, CIB_OP_REPLACE)) { /* sync_our_cib() sets F_CIB_ISREPLY */ if (reply_to) { delegated = reply_to; } goto skip_is_reply; } else if(safe_str_eq(op, CIB_OP_SYNC)) { } else if (is_reply && safe_str_eq(op, CRM_OP_PING)) { process_ping_reply(request); return FALSE; } else if (safe_str_eq(op, CIB_OP_UPGRADE)) { /* Only the DC (node with the oldest software) should process * this operation if F_CIB_SCHEMA_MAX is unset * * If the DC is happy it will then send out another * CIB_OP_UPGRADE which will tell all nodes to do the actual * upgrade. * * Except this time F_CIB_SCHEMA_MAX will be set which puts a * limit on how far newer nodes will go */ const char *max = crm_element_value(request, F_CIB_SCHEMA_MAX); crm_trace("Parsing %s operation%s for %s with max=%s", op, is_reply?" reply":"", cib_is_master?"master":"slave", max); if(max == NULL && cib_is_master) { /* We are the DC, check if this upgrade is allowed */ goto skip_is_reply; } else if(max) { /* Ok, go ahead and upgrade to 'max' */ goto skip_is_reply; } else { return FALSE; /* Ignore */ } } else if (crm_is_true(update)) { crm_info("Detected legacy %s global update from %s", op, originator); send_sync_request(NULL); legacy_mode = TRUE; return FALSE; } else if (is_reply && cib_op_modifies(call_type)) { crm_trace("Ignoring legacy %s reply sent from %s to local clients", op, originator); return FALSE; } else if (safe_str_eq(op, "cib_shutdown_req")) { /* Legacy handling */ crm_debug("Legacy handling of %s message from %s", op, originator); *local_notify = FALSE; if (reply_to == NULL) { *process = TRUE; } return *process; } if(is_reply) { crm_trace("Handling %s reply sent from %s to local clients", op, originator); *process = FALSE; *needs_reply = FALSE; *local_notify = TRUE; return TRUE; } skip_is_reply: *process = TRUE; *needs_reply = FALSE; if(safe_str_eq(delegated, cib_our_uname)) { *local_notify = TRUE; } else { *local_notify = FALSE; } host = crm_element_value(request, F_CIB_HOST); if (host != NULL && safe_str_eq(host, cib_our_uname)) { crm_trace("Processing %s request sent to us from %s", op, originator); *needs_reply = TRUE; return TRUE; } else if (host != NULL) { /* this is for a specific instance and we're not it */ crm_trace("Ignoring %s operation for instance on %s", op, crm_str(host)); return FALSE; } else if(is_reply == FALSE && safe_str_eq(op, CRM_OP_PING)) { *needs_reply = TRUE; } crm_trace("Processing %s request sent to everyone by %s/%s on %s %s", op, crm_element_value(request, F_CIB_CLIENTNAME), crm_element_value(request, F_CIB_CALLID), originator, (*local_notify)?"(notify)":""); return TRUE; } static gboolean parse_peer_options(int call_type, xmlNode * request, gboolean * local_notify, gboolean * needs_reply, gboolean * process, gboolean * needs_forward) { /* TODO: What happens when an update comes in after node A * requests the CIB from node B, but before it gets the reply (and * sends out the replace operation) */ if(cib_legacy_mode()) { return parse_peer_options_v1( call_type, request, local_notify, needs_reply, process, needs_forward); } else { return parse_peer_options_v2( call_type, request, local_notify, needs_reply, process, needs_forward); } } static void forward_request(xmlNode * request, crm_client_t * cib_client, int call_options) { const char *op = crm_element_value(request, F_CIB_OPERATION); const char *host = crm_element_value(request, F_CIB_HOST); crm_xml_add(request, F_CIB_DELEGATED, cib_our_uname); if (host != NULL) { crm_trace("Forwarding %s op to %s", op, host); send_cluster_message(crm_get_peer(0, host), crm_msg_cib, request, FALSE); } else { crm_trace("Forwarding %s op to master instance", op); send_cluster_message(NULL, crm_msg_cib, request, FALSE); } /* Return the request to its original state */ xml_remove_prop(request, F_CIB_DELEGATED); if (call_options & cib_discard_reply) { crm_trace("Client not interested in reply"); } } static gboolean send_peer_reply(xmlNode * msg, xmlNode * result_diff, const char *originator, gboolean broadcast) { CRM_ASSERT(msg != NULL); if (broadcast) { /* this (successful) call modified the CIB _and_ the * change needs to be broadcast... * send via HA to other nodes */ int diff_add_updates = 0; int diff_add_epoch = 0; int diff_add_admin_epoch = 0; int diff_del_updates = 0; int diff_del_epoch = 0; int diff_del_admin_epoch = 0; const char *digest = NULL; CRM_LOG_ASSERT(result_diff != NULL); digest = crm_element_value(result_diff, XML_ATTR_DIGEST); cib_diff_version_details(result_diff, &diff_add_admin_epoch, &diff_add_epoch, &diff_add_updates, &diff_del_admin_epoch, &diff_del_epoch, &diff_del_updates); crm_trace("Sending update diff %d.%d.%d -> %d.%d.%d %s", diff_del_admin_epoch, diff_del_epoch, diff_del_updates, diff_add_admin_epoch, diff_add_epoch, diff_add_updates, digest); crm_xml_add(msg, F_CIB_ISREPLY, originator); crm_xml_add(msg, F_CIB_GLOBAL_UPDATE, XML_BOOLEAN_TRUE); crm_xml_add(msg, F_CIB_OPERATION, CIB_OP_APPLY_DIFF); CRM_ASSERT(digest != NULL); add_message_xml(msg, F_CIB_UPDATE_DIFF, result_diff); crm_log_xml_explicit(msg, "copy"); return send_cluster_message(NULL, crm_msg_cib, msg, TRUE); } else if (originator != NULL) { /* send reply via HA to originating node */ crm_trace("Sending request result to %s only", originator); crm_xml_add(msg, F_CIB_ISREPLY, originator); return send_cluster_message(crm_get_peer(0, originator), crm_msg_cib, msg, FALSE); } return FALSE; } void cib_process_request(xmlNode * request, gboolean force_synchronous, gboolean privileged, gboolean unused, crm_client_t * cib_client) { int call_type = 0; int call_options = 0; gboolean process = TRUE; gboolean is_update = TRUE; gboolean from_peer = TRUE; gboolean needs_reply = TRUE; gboolean local_notify = FALSE; gboolean needs_forward = FALSE; gboolean global_update = crm_is_true(crm_element_value(request, F_CIB_GLOBAL_UPDATE)); xmlNode *op_reply = NULL; xmlNode *result_diff = NULL; int rc = pcmk_ok; const char *op = crm_element_value(request, F_CIB_OPERATION); const char *originator = crm_element_value(request, F_ORIG); const char *host = crm_element_value(request, F_CIB_HOST); const char *target = NULL; const char *call_id = crm_element_value(request, F_CIB_CALLID); const char *client_id = crm_element_value(request, F_CIB_CLIENTID); const char *client_name = crm_element_value(request, F_CIB_CLIENTNAME); const char *reply_to = crm_element_value(request, F_CIB_ISREPLY); if (cib_client) { from_peer = FALSE; } cib_num_ops++; if (cib_num_ops == 0) { cib_num_fail = 0; cib_num_local = 0; cib_num_updates = 0; crm_info("Stats wrapped around"); } crm_element_value_int(request, F_CIB_CALLOPTS, &call_options); if (force_synchronous) { call_options |= cib_sync_call; } if (host != NULL && strlen(host) == 0) { host = NULL; } if (host) { target = host; } else if (call_options & cib_scope_local) { target = "local host"; } else { target = "master"; } if (from_peer) { crm_trace("Processing peer %s operation from %s/%s on %s intended for %s (reply=%s)", op, client_name, call_id, originator, target, reply_to); } else { crm_xml_add(request, F_ORIG, cib_our_uname); crm_trace("Processing local %s operation from %s/%s intended for %s", op, client_name, call_id, target); } rc = cib_get_operation_id(op, &call_type); if (rc != pcmk_ok) { /* TODO: construct error reply? */ crm_err("Pre-processing of command failed: %s", pcmk_strerror(rc)); return; } if (from_peer == FALSE) { parse_local_options(cib_client, call_type, call_options, host, op, &local_notify, &needs_reply, &process, &needs_forward); } else if (parse_peer_options(call_type, request, &local_notify, &needs_reply, &process, &needs_forward) == FALSE) { return; } is_update = cib_op_modifies(call_type); if (is_update) { cib_num_updates++; } if (call_options & cib_discard_reply) { needs_reply = is_update; local_notify = FALSE; } if (needs_forward) { const char *host = crm_element_value(request, F_CIB_HOST); const char *section = crm_element_value(request, F_CIB_SECTION); crm_info("Forwarding %s operation for section %s to %s (origin=%s/%s/%s)", op, section ? section : "'all'", host ? host : "master", originator ? originator : "local", client_name, call_id); forward_request(request, cib_client, call_options); return; } if (cib_status != pcmk_ok) { const char *call = crm_element_value(request, F_CIB_CALLID); rc = cib_status; crm_err("Operation ignored, cluster configuration is invalid." " Please repair and restart: %s", pcmk_strerror(cib_status)); op_reply = create_xml_node(NULL, "cib-reply"); crm_xml_add(op_reply, F_TYPE, T_CIB); crm_xml_add(op_reply, F_CIB_OPERATION, op); crm_xml_add(op_reply, F_CIB_CALLID, call); crm_xml_add(op_reply, F_CIB_CLIENTID, client_id); crm_xml_add_int(op_reply, F_CIB_CALLOPTS, call_options); crm_xml_add_int(op_reply, F_CIB_RC, rc); crm_trace("Attaching reply output"); add_message_xml(op_reply, F_CIB_CALLDATA, the_cib); crm_log_xml_explicit(op_reply, "cib:reply"); } else if (process) { time_t finished = 0; int now = time(NULL); int level = LOG_INFO; const char *section = crm_element_value(request, F_CIB_SECTION); cib_num_local++; rc = cib_process_command(request, &op_reply, &result_diff, privileged); if (is_update == FALSE) { level = LOG_TRACE; } else if (global_update) { switch (rc) { case pcmk_ok: level = LOG_INFO; break; case -pcmk_err_old_data: case -pcmk_err_diff_resync: case -pcmk_err_diff_failed: level = LOG_TRACE; break; default: level = LOG_ERR; } } else if (rc != pcmk_ok && is_update) { cib_num_fail++; level = LOG_WARNING; } do_crm_log(level, "Completed %s operation for section %s: %s (rc=%d, origin=%s/%s/%s, version=%s.%s.%s)", op, section ? section : "'all'", pcmk_strerror(rc), rc, originator ? originator : "local", client_name, call_id, the_cib ? crm_element_value(the_cib, XML_ATTR_GENERATION_ADMIN) : "0", the_cib ? crm_element_value(the_cib, XML_ATTR_GENERATION) : "0", the_cib ? crm_element_value(the_cib, XML_ATTR_NUMUPDATES) : "0"); finished = time(NULL); if (finished - now > 3) { crm_trace("%s operation took %ds to complete", op, finished - now); crm_write_blackbox(0, NULL); } if (op_reply == NULL && (needs_reply || local_notify)) { crm_err("Unexpected NULL reply to message"); crm_log_xml_err(request, "null reply"); needs_reply = FALSE; local_notify = FALSE; } } /* from now on we are the server */ if(is_update && cib_legacy_mode() == FALSE) { crm_trace("Completed pre-sync update from %s/%s/%s%s", originator ? originator : "local", client_name, call_id, local_notify?" with local notification":""); } else if (needs_reply == FALSE || stand_alone) { /* nothing more to do... * this was a non-originating slave update */ crm_trace("Completed slave update"); } else if (call_options & cib_discard_reply) { crm_trace("Caller isn't interested in reply"); } else if (from_peer) { if (is_update == FALSE || result_diff == NULL) { crm_trace("Request not broadcast: R/O call"); } else if (call_options & cib_inhibit_bcast) { crm_trace("Request not broadcast: inhibited"); } else if (rc != pcmk_ok) { crm_trace("Request not broadcast: call failed: %s", pcmk_strerror(rc)); } else { crm_trace("Directing reply to %s", originator); } send_peer_reply(op_reply, result_diff, originator, FALSE); } if (local_notify && client_id) { crm_trace("Performing local %ssync notification for %s", (call_options & cib_sync_call) ? "" : "a-", client_id); if (process == FALSE) { do_local_notify(request, client_id, call_options & cib_sync_call, from_peer); } else { do_local_notify(op_reply, client_id, call_options & cib_sync_call, from_peer); } } free_xml(op_reply); free_xml(result_diff); return; } int cib_process_command(xmlNode * request, xmlNode ** reply, xmlNode ** cib_diff, gboolean privileged) { xmlNode *input = NULL; xmlNode *output = NULL; xmlNode *result_cib = NULL; xmlNode *current_cib = NULL; int call_type = 0; int call_options = 0; const char *op = NULL; const char *section = NULL; const char *call_id = crm_element_value(request, F_CIB_CALLID); int rc = pcmk_ok; int rc2 = pcmk_ok; gboolean send_r_notify = FALSE; gboolean global_update = FALSE; gboolean config_changed = FALSE; gboolean manage_counters = TRUE; static mainloop_timer_t *digest_timer = NULL; CRM_ASSERT(cib_status == pcmk_ok); if(digest_timer == NULL) { digest_timer = mainloop_timer_add("digester", 5000, FALSE, cib_digester_cb, NULL); } *reply = NULL; *cib_diff = NULL; current_cib = the_cib; /* Start processing the request... */ op = crm_element_value(request, F_CIB_OPERATION); crm_element_value_int(request, F_CIB_CALLOPTS, &call_options); rc = cib_get_operation_id(op, &call_type); if (rc == pcmk_ok && privileged == FALSE) { rc = cib_op_can_run(call_type, call_options, privileged, global_update); } rc2 = cib_op_prepare(call_type, request, &input, §ion); if (rc == pcmk_ok) { rc = rc2; } if (rc != pcmk_ok) { crm_trace("Call setup failed: %s", pcmk_strerror(rc)); goto done; } else if (cib_op_modifies(call_type) == FALSE) { rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE, section, request, input, FALSE, &config_changed, current_cib, &result_cib, NULL, &output); CRM_CHECK(result_cib == NULL, free_xml(result_cib)); goto done; } /* Handle a valid write action */ global_update = crm_is_true(crm_element_value(request, F_CIB_GLOBAL_UPDATE)); if (global_update) { /* legacy code */ manage_counters = FALSE; call_options |= cib_force_diff; crm_trace("Global update detected"); CRM_CHECK(call_type == 3 || call_type == 4, crm_err("Call type: %d", call_type); crm_log_xml_err(request, "bad op")); } if (rc == pcmk_ok) { ping_modified_since = TRUE; if (call_options & cib_inhibit_bcast) { /* skip */ crm_trace("Skipping update: inhibit broadcast"); manage_counters = FALSE; } if (is_not_set(call_options, cib_dryrun) && safe_str_eq(section, XML_CIB_TAG_STATUS)) { /* Copying large CIBs accounts for a huge percentage of our CIB usage */ call_options |= cib_zero_copy; } else { clear_bit(call_options, cib_zero_copy); } /* result_cib must not be modified after cib_perform_op() returns */ rc = cib_perform_op(op, call_options, cib_op_func(call_type), FALSE, section, request, input, manage_counters, &config_changed, current_cib, &result_cib, cib_diff, &output); if (manage_counters == FALSE) { /* Legacy code * If the diff is NULL at this point, its because nothing changed */ config_changed = cib_config_changed(NULL, NULL, cib_diff); } /* Always write to disk for replace ops, * this also negates the need to detect ordering changes */ if (crm_str_eq(CIB_OP_REPLACE, op, TRUE)) { config_changed = TRUE; } } if (rc == pcmk_ok && is_not_set(call_options, cib_dryrun)) { if(is_not_set(call_options, cib_zero_copy)) { rc = activateCibXml(result_cib, config_changed, op); } if (rc == pcmk_ok && cib_internal_config_changed(*cib_diff)) { cib_read_config(config_hash, result_cib); } if (crm_str_eq(CIB_OP_REPLACE, op, TRUE)) { if (section == NULL) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_TAG_CIB)) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_CIB_TAG_NODES)) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_CIB_TAG_STATUS)) { send_r_notify = TRUE; } } else if (crm_str_eq(CIB_OP_ERASE, op, TRUE)) { send_r_notify = TRUE; } mainloop_timer_stop(digest_timer); mainloop_timer_start(digest_timer); } else if (rc == -pcmk_err_schema_validation) { CRM_ASSERT(is_not_set(call_options, cib_zero_copy)); if (output != NULL) { crm_log_xml_info(output, "cib:output"); free_xml(output); } output = result_cib; } else { if(is_not_set(call_options, cib_zero_copy)) { free_xml(result_cib); } } if ((call_options & cib_inhibit_notify) == 0) { const char *client = crm_element_value(request, F_CIB_CLIENTNAME); crm_trace("Sending notifications"); cib_diff_notify(call_options, client, call_id, op, input, rc, *cib_diff); } if (send_r_notify) { const char *origin = crm_element_value(request, F_ORIG); cib_replace_notify(origin, the_cib, rc, *cib_diff); } xml_log_patchset(LOG_TRACE, "cib:diff", *cib_diff); done: if ((call_options & cib_discard_reply) == 0) { const char *caller = crm_element_value(request, F_CIB_CLIENTID); *reply = create_xml_node(NULL, "cib-reply"); crm_xml_add(*reply, F_TYPE, T_CIB); crm_xml_add(*reply, F_CIB_OPERATION, op); crm_xml_add(*reply, F_CIB_CALLID, call_id); crm_xml_add(*reply, F_CIB_CLIENTID, caller); crm_xml_add_int(*reply, F_CIB_CALLOPTS, call_options); crm_xml_add_int(*reply, F_CIB_RC, rc); if (output != NULL) { crm_trace("Attaching reply output"); add_message_xml(*reply, F_CIB_CALLDATA, output); } crm_log_xml_explicit(*reply, "cib:reply"); } crm_trace("cleanup"); if (cib_op_modifies(call_type) == FALSE && output != current_cib) { free_xml(output); output = NULL; } if (call_type >= 0) { cib_op_cleanup(call_type, call_options, &input, &output); } crm_trace("done"); return rc; } gint cib_GCompareFunc(gconstpointer a, gconstpointer b) { const xmlNode *a_msg = a; const xmlNode *b_msg = b; int msg_a_id = 0; int msg_b_id = 0; const char *value = NULL; value = crm_element_value_const(a_msg, F_CIB_CALLID); msg_a_id = crm_parse_int(value, NULL); value = crm_element_value_const(b_msg, F_CIB_CALLID); msg_b_id = crm_parse_int(value, NULL); if (msg_a_id == msg_b_id) { return 0; } else if (msg_a_id < msg_b_id) { return -1; } return 1; } #if SUPPORT_HEARTBEAT void cib_ha_peer_callback(HA_Message * msg, void *private_data) { xmlNode *xml = convert_ha_message(NULL, msg, __FUNCTION__); cib_peer_callback(xml, private_data); free_xml(xml); } #endif void cib_peer_callback(xmlNode * msg, void *private_data) { const char *reason = NULL; const char *originator = crm_element_value(msg, F_ORIG); if (cib_legacy_mode() && (originator == NULL || crm_str_eq(originator, cib_our_uname, TRUE))) { /* message is from ourselves */ return; } else if (crm_peer_cache == NULL) { reason = "membership not established"; goto bail; } if (crm_element_value(msg, F_CIB_CLIENTNAME) == NULL) { crm_xml_add(msg, F_CIB_CLIENTNAME, originator); } /* crm_log_xml_trace("Peer[inbound]", msg); */ cib_process_request(msg, FALSE, TRUE, TRUE, NULL); return; bail: if (reason) { const char *seq = crm_element_value(msg, F_SEQ); const char *op = crm_element_value(msg, F_CIB_OPERATION); crm_warn("Discarding %s message (%s) from %s: %s", op, seq, originator, reason); } } #if SUPPORT_HEARTBEAT extern oc_ev_t *cib_ev_token; static void *ccm_library = NULL; int (*ccm_api_callback_done) (void *cookie) = NULL; int (*ccm_api_handle_event) (const oc_ev_t * token) = NULL; void cib_client_status_callback(const char *node, const char *client, const char *status, void *private) { crm_node_t *peer = NULL; if (safe_str_eq(client, CRM_SYSTEM_CIB)) { + peer = crm_get_peer(0, node); + if (safe_str_neq(peer->state, CRM_NODE_MEMBER)) { + crm_warn("This peer is not a ccm member (yet). " + "Status ignored: Client %s/%s announced status [%s]", + node, client, status); + return; + } + crm_info("Status update: Client %s/%s now has status [%s]", node, client, status); if (safe_str_eq(status, JOINSTATUS)) { status = ONLINESTATUS; } else if (safe_str_eq(status, LEAVESTATUS)) { status = OFFLINESTATUS; } - peer = crm_get_peer(0, node); crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cib, status); } return; } int cib_ccm_dispatch(gpointer user_data) { int rc = 0; oc_ev_t *ccm_token = (oc_ev_t *) user_data; crm_trace("received callback"); if (ccm_api_handle_event == NULL) { ccm_api_handle_event = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_handle_event", 1); } rc = (*ccm_api_handle_event) (ccm_token); if (0 == rc) { return 0; } crm_err("CCM connection appears to have failed: rc=%d.", rc); /* eventually it might be nice to recover and reconnect... but until then... */ crm_err("Exiting to recover from CCM connection failure"); return crm_exit(ENOTCONN); } int current_instance = 0; void cib_ccm_msg_callback(oc_ed_t event, void *cookie, size_t size, const void *data) { gboolean update_id = FALSE; const oc_ev_membership_t *membership = data; CRM_ASSERT(membership != NULL); crm_info("Processing CCM event=%s (id=%d)", ccm_event_name(event), membership->m_instance); if (current_instance > membership->m_instance) { crm_err("Membership instance ID went backwards! %d->%d", current_instance, membership->m_instance); CRM_ASSERT(current_instance <= membership->m_instance); } switch (event) { case OC_EV_MS_NEW_MEMBERSHIP: case OC_EV_MS_INVALID: update_id = TRUE; break; case OC_EV_MS_PRIMARY_RESTORED: update_id = TRUE; break; case OC_EV_MS_NOT_PRIMARY: crm_trace("Ignoring transitional CCM event: %s", ccm_event_name(event)); break; case OC_EV_MS_EVICTED: crm_err("Evicted from CCM: %s", ccm_event_name(event)); break; default: crm_err("Unknown CCM event: %d", event); } if (update_id) { unsigned int lpc = 0; CRM_CHECK(membership != NULL, return); current_instance = membership->m_instance; for (lpc = 0; lpc < membership->m_n_out; lpc++) { crm_update_ccm_node(membership, lpc + membership->m_out_idx, CRM_NODE_LOST, current_instance); } for (lpc = 0; lpc < membership->m_n_member; lpc++) { crm_update_ccm_node(membership, lpc + membership->m_memb_idx, CRM_NODE_MEMBER, current_instance); } + heartbeat_cluster->llc_ops->client_status(heartbeat_cluster, NULL, crm_system_name, 0); } if (ccm_api_callback_done == NULL) { ccm_api_callback_done = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_callback_done", 1); } (*ccm_api_callback_done) (cookie); return; } #endif gboolean can_write(int flags) { return TRUE; } static gboolean cib_force_exit(gpointer data) { crm_notice("Forcing exit!"); terminate_cib(__FUNCTION__, TRUE); return FALSE; } static void disconnect_remote_client(gpointer key, gpointer value, gpointer user_data) { crm_client_t *a_client = value; crm_err("Disconnecting %s... Not implemented", crm_str(a_client->name)); } void cib_shutdown(int nsig) { struct qb_ipcs_stats srv_stats; if (cib_shutdown_flag == FALSE) { int disconnects = 0; qb_ipcs_connection_t *c = NULL; cib_shutdown_flag = TRUE; c = qb_ipcs_connection_first_get(ipcs_rw); while (c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_rw, last); crm_debug("Disconnecting r/w client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } c = qb_ipcs_connection_first_get(ipcs_ro); while (c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_ro, last); crm_debug("Disconnecting r/o client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } c = qb_ipcs_connection_first_get(ipcs_shm); while (c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_shm, last); crm_debug("Disconnecting non-blocking r/w client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } disconnects += crm_hash_table_size(client_connections); crm_debug("Disconnecting %d remote clients", crm_hash_table_size(client_connections)); g_hash_table_foreach(client_connections, disconnect_remote_client, NULL); crm_info("Disconnected %d clients", disconnects); } qb_ipcs_stats_get(ipcs_rw, &srv_stats, QB_FALSE); if (crm_hash_table_size(client_connections) == 0) { crm_info("All clients disconnected (%d)", srv_stats.active_connections); initiate_exit(); } else { crm_info("Waiting on %d clients to disconnect (%d)", crm_hash_table_size(client_connections), srv_stats.active_connections); } } void initiate_exit(void) { int active = 0; xmlNode *leaving = NULL; active = crm_active_peers(); if (active < 2) { terminate_cib(__FUNCTION__, FALSE); return; } crm_info("Sending disconnect notification to %d peers...", active); leaving = create_xml_node(NULL, "exit-notification"); crm_xml_add(leaving, F_TYPE, "cib"); crm_xml_add(leaving, F_CIB_OPERATION, "cib_shutdown_req"); send_cluster_message(NULL, crm_msg_cib, leaving, TRUE); free_xml(leaving); g_timeout_add(crm_get_msec("5s"), cib_force_exit, NULL); } extern int remote_fd; extern int remote_tls_fd; void terminate_cib(const char *caller, gboolean fast) { if (remote_fd > 0) { close(remote_fd); remote_fd = 0; } if (remote_tls_fd > 0) { close(remote_tls_fd); remote_tls_fd = 0; } if (!fast) { crm_info("%s: Disconnecting from cluster infrastructure", caller); crm_cluster_disconnect(&crm_cluster); } uninitializeCib(); crm_info("%s: Exiting%s...", caller, fast ? " fast" : mainloop ? " from mainloop" : ""); if (fast == FALSE && mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); } else { qb_ipcs_destroy(ipcs_ro); qb_ipcs_destroy(ipcs_rw); qb_ipcs_destroy(ipcs_shm); if (fast) { crm_exit(EINVAL); } else { crm_exit(pcmk_ok); } } } diff --git a/crmd/heartbeat.c b/crmd/heartbeat.c index 76c11b75c5..4726c0b345 100644 --- a/crmd/heartbeat.c +++ b/crmd/heartbeat.c @@ -1,539 +1,548 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ /* put these first so that uuid_t is defined without conflicts */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include void oc_ev_special(const oc_ev_t *, oc_ev_class_t, int); void ccm_event_detail(const oc_ev_membership_t * oc, oc_ed_t event); gboolean crmd_ha_msg_dispatch(ll_cluster_t * cluster_conn, gpointer user_data); void crmd_ccm_msg_callback(oc_ed_t event, void *cookie, size_t size, const void *data); int ccm_dispatch(gpointer user_data); #define CCM_EVENT_DETAIL 0 #define CCM_EVENT_DETAIL_PARTIAL 0 int (*ccm_api_callback_done) (void *cookie) = NULL; int (*ccm_api_handle_event) (const oc_ev_t * token) = NULL; static oc_ev_t *fsa_ev_token; static void *ccm_library = NULL; static int num_ccm_register_fails = 0; static int max_ccm_register_fails = 30; static void ccm_connection_destroy(void *userdata) { } /* A_CCM_CONNECT */ void do_ccm_control(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { static struct mainloop_fd_callbacks ccm_fd_callbacks = { .dispatch = ccm_dispatch, .destroy = ccm_connection_destroy, }; if (is_heartbeat_cluster()) { int (*ccm_api_register) (oc_ev_t ** token) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_register", 1); int (*ccm_api_set_callback) (const oc_ev_t * token, oc_ev_class_t class, oc_ev_callback_t * fn, oc_ev_callback_t ** prev_fn) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_set_callback", 1); void (*ccm_api_special) (const oc_ev_t *, oc_ev_class_t, int) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_special", 1); int (*ccm_api_activate) (const oc_ev_t * token, int *fd) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_activate", 1); int (*ccm_api_unregister) (oc_ev_t * token) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_unregister", 1); if (action & A_CCM_DISCONNECT) { set_bit(fsa_input_register, R_CCM_DISCONNECTED); (*ccm_api_unregister) (fsa_ev_token); } if (action & A_CCM_CONNECT) { int ret; int fsa_ev_fd; gboolean did_fail = FALSE; crm_trace("Registering with CCM"); clear_bit(fsa_input_register, R_CCM_DISCONNECTED); ret = (*ccm_api_register) (&fsa_ev_token); if (ret != 0) { crm_warn("CCM registration failed"); did_fail = TRUE; } if (did_fail == FALSE) { crm_trace("Setting up CCM callbacks"); ret = (*ccm_api_set_callback) (fsa_ev_token, OC_EV_MEMB_CLASS, crmd_ccm_msg_callback, NULL); if (ret != 0) { crm_warn("CCM callback not set"); did_fail = TRUE; } } if (did_fail == FALSE) { (*ccm_api_special) (fsa_ev_token, OC_EV_MEMB_CLASS, 0 /*don't care */ ); crm_trace("Activating CCM token"); ret = (*ccm_api_activate) (fsa_ev_token, &fsa_ev_fd); if (ret != 0) { crm_warn("CCM Activation failed"); did_fail = TRUE; } } if (did_fail) { num_ccm_register_fails++; (*ccm_api_unregister) (fsa_ev_token); if (num_ccm_register_fails < max_ccm_register_fails) { crm_warn("CCM Connection failed" " %d times (%d max)", num_ccm_register_fails, max_ccm_register_fails); crm_timer_start(wait_timer); crmd_fsa_stall(FALSE); return; } else { crm_err("CCM Activation failed %d (max) times", num_ccm_register_fails); register_fsa_error(C_FSA_INTERNAL, I_FAIL, NULL); return; } } crm_info("CCM connection established... waiting for first callback"); mainloop_add_fd("heartbeat-ccm", G_PRIORITY_HIGH, fsa_ev_fd, fsa_ev_token, &ccm_fd_callbacks); } } if (action & ~(A_CCM_CONNECT | A_CCM_DISCONNECT)) { crm_err("Unexpected action %s in %s", fsa_action2string(action), __FUNCTION__); } } void ccm_event_detail(const oc_ev_membership_t * oc, oc_ed_t event) { int lpc; gboolean member = FALSE; member = FALSE; crm_trace("-----------------------"); crm_info("%s: trans=%d, nodes=%d, new=%d, lost=%d n_idx=%d, " "new_idx=%d, old_idx=%d", ccm_event_name(event), oc->m_instance, oc->m_n_member, oc->m_n_in, oc->m_n_out, oc->m_memb_idx, oc->m_in_idx, oc->m_out_idx); #if !CCM_EVENT_DETAIL_PARTIAL for (lpc = 0; lpc < oc->m_n_member; lpc++) { crm_info("\tCURRENT: %s [nodeid=%d, born=%d]", oc->m_array[oc->m_memb_idx + lpc].node_uname, oc->m_array[oc->m_memb_idx + lpc].node_id, oc->m_array[oc->m_memb_idx + lpc].node_born_on); if (safe_str_eq(fsa_our_uname, oc->m_array[oc->m_memb_idx + lpc].node_uname)) { member = TRUE; } } if (member == FALSE) { crm_warn("MY NODE IS NOT IN CCM THE MEMBERSHIP LIST"); } #endif for (lpc = 0; lpc < (int)oc->m_n_in; lpc++) { crm_info("\tNEW: %s [nodeid=%d, born=%d]", oc->m_array[oc->m_in_idx + lpc].node_uname, oc->m_array[oc->m_in_idx + lpc].node_id, oc->m_array[oc->m_in_idx + lpc].node_born_on); } for (lpc = 0; lpc < (int)oc->m_n_out; lpc++) { crm_info("\tLOST: %s [nodeid=%d, born=%d]", oc->m_array[oc->m_out_idx + lpc].node_uname, oc->m_array[oc->m_out_idx + lpc].node_id, oc->m_array[oc->m_out_idx + lpc].node_born_on); } crm_trace("-----------------------"); } /* A_CCM_UPDATE_CACHE */ /* * Take the opportunity to update the node status in the CIB as well */ void do_ccm_update_cache(enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, oc_ed_t event, const oc_ev_membership_t * oc, xmlNode * xml) { unsigned long long instance = 0; unsigned int lpc = 0; if (is_heartbeat_cluster()) { CRM_ASSERT(oc != NULL); instance = oc->m_instance; } CRM_ASSERT(crm_peer_seq <= instance); switch (cur_state) { case S_STOPPING: case S_TERMINATE: case S_HALT: crm_debug("Ignoring %s CCM event %llu, we're in state %s", ccm_event_name(event), instance, fsa_state2string(cur_state)); return; case S_ELECTION: register_fsa_action(A_ELECTION_CHECK); break; default: break; } if (is_heartbeat_cluster()) { ccm_event_detail(oc, event); /*--*-- Recently Dead Member Nodes --*--*/ for (lpc = 0; lpc < oc->m_n_out; lpc++) { crm_update_ccm_node(oc, lpc + oc->m_out_idx, CRM_NODE_LOST, instance); } /*--*-- All Member Nodes --*--*/ for (lpc = 0; lpc < oc->m_n_member; lpc++) { crm_update_ccm_node(oc, lpc + oc->m_memb_idx, CRM_NODE_MEMBER, instance); } + heartbeat_cluster->llc_ops->client_status(heartbeat_cluster, NULL, crm_system_name, 0); } if (event == OC_EV_MS_EVICTED) { crm_node_t *peer = crm_get_peer(0, fsa_our_uname); crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_EVICTED, 0); /* todo: drop back to S_PENDING instead */ /* get out... NOW! * * go via the error recovery process so that HA will * restart us if required */ register_fsa_error_adv(cause, I_ERROR, NULL, NULL, __FUNCTION__); } post_cache_update(instance); return; } int ccm_dispatch(gpointer user_data) { int rc = 0; oc_ev_t *ccm_token = (oc_ev_t *) user_data; gboolean was_error = FALSE; crm_trace("Invoked"); if (ccm_api_handle_event == NULL) { ccm_api_handle_event = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_handle_event", 1); } rc = (*ccm_api_handle_event) (ccm_token); if (rc != 0) { if (is_set(fsa_input_register, R_CCM_DISCONNECTED) == FALSE) { /* we signed out, so this is expected */ register_fsa_input(C_CCM_CALLBACK, I_ERROR, NULL); crm_err("CCM connection appears to have failed: rc=%d.", rc); } was_error = TRUE; } trigger_fsa(fsa_source); if (was_error) { return -1; } return 0; } void crmd_ccm_msg_callback(oc_ed_t event, void *cookie, size_t size, const void *data) { gboolean update_cache = FALSE; const oc_ev_membership_t *membership = data; gboolean update_quorum = FALSE; crm_trace("Invoked"); CRM_ASSERT(data != NULL); crm_info("Quorum %s after event=%s (id=%d)", ccm_have_quorum(event) ? "(re)attained" : "lost", ccm_event_name(event), membership->m_instance); if (crm_peer_seq > membership->m_instance) { crm_err("Membership instance ID went backwards! %llu->%d", crm_peer_seq, membership->m_instance); CRM_ASSERT(crm_peer_seq <= membership->m_instance); return; } /* * OC_EV_MS_NEW_MEMBERSHIP: membership with quorum * OC_EV_MS_MS_INVALID: membership without quorum * OC_EV_MS_NOT_PRIMARY: previous membership no longer valid * OC_EV_MS_PRIMARY_RESTORED: previous membership restored * OC_EV_MS_EVICTED: the client is evicted from ccm. */ switch (event) { case OC_EV_MS_NEW_MEMBERSHIP: case OC_EV_MS_INVALID: update_cache = TRUE; update_quorum = TRUE; break; case OC_EV_MS_NOT_PRIMARY: break; case OC_EV_MS_PRIMARY_RESTORED: update_cache = TRUE; crm_peer_seq = membership->m_instance; break; case OC_EV_MS_EVICTED: update_quorum = TRUE; register_fsa_input(C_FSA_INTERNAL, I_STOP, NULL); crm_err("Shutting down after CCM event: %s", ccm_event_name(event)); break; default: crm_err("Unknown CCM event: %d", event); } if (update_quorum) { crm_have_quorum = ccm_have_quorum(event); if (crm_have_quorum == FALSE) { /* did we just loose quorum? */ if (fsa_has_quorum) { crm_info("Quorum lost: %s", ccm_event_name(event)); } } crm_update_quorum(crm_have_quorum, FALSE); } if (update_cache) { crm_trace("Updating cache after event %s", ccm_event_name(event)); do_ccm_update_cache(C_CCM_CALLBACK, fsa_state, event, data, NULL); } else if (event != OC_EV_MS_NOT_PRIMARY) { crm_peer_seq = membership->m_instance; register_fsa_action(A_TE_CANCEL); } if (ccm_api_callback_done == NULL) { ccm_api_callback_done = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_callback_done", 1); } (*ccm_api_callback_done) (cookie); return; } void crmd_ha_status_callback(const char *node, const char *status, void *private) { xmlNode *update = NULL; crm_node_t *peer = NULL; crm_notice("Status update: Node %s now has status [%s]", node, status); peer = crm_get_peer(0, node); if (safe_str_eq(status, PINGSTATUS)) { return; } if (safe_str_eq(status, DEADSTATUS)) { /* this node is toast */ - crm_update_peer_proc(__FUNCTION__, peer, crm_proc_heartbeat, OFFLINESTATUS); + crm_update_peer_proc(__FUNCTION__, peer, crm_proc_crmd|crm_proc_heartbeat, OFFLINESTATUS); } else { crm_update_peer_proc(__FUNCTION__, peer, crm_proc_heartbeat, ONLINESTATUS); } trigger_fsa(fsa_source); if (AM_I_DC) { update = do_update_node_cib(peer, node_update_cluster, NULL, __FUNCTION__); fsa_cib_anon_update(XML_CIB_TAG_STATUS, update, cib_scope_local | cib_quorum_override | cib_can_create); free_xml(update); } } void crmd_client_status_callback(const char *node, const char *client, const char *status, void *private) { crm_node_t *peer = NULL; crm_trace("Invoked"); if (safe_str_neq(client, CRM_SYSTEM_CRMD)) { return; } + peer = crm_get_peer(0, node); + + if (safe_str_neq(peer->state, CRM_NODE_MEMBER)) { + crm_warn("This peer is not a ccm member (yet). " + "Status ignored: Client %s/%s announced status [%s] (DC=%s)", + node, client, status, AM_I_DC ? "true" : "false"); + return; + } + set_bit(fsa_input_register, R_PEER_DATA); crm_notice("Status update: Client %s/%s now has status [%s] (DC=%s)", node, client, status, AM_I_DC ? "true" : "false"); - peer = crm_get_peer(0, node); /* rest of the code, especially crm_update_peer_proc, * does not know about JOINSTATUS, but expects ONLINESTATUS. * See also cib/callbacks.c */ if (safe_str_eq(status, JOINSTATUS)) { status = ONLINESTATUS; } else if (safe_str_eq(status, LEAVESTATUS)) { status = OFFLINESTATUS; } if (safe_str_eq(status, ONLINESTATUS)) { /* remove the cached value in case it changed */ crm_trace("Uncaching UUID for %s", node); free(peer->uuid); peer->uuid = NULL; } crm_update_peer_proc(__FUNCTION__, peer, crm_proc_crmd, status); if (AM_I_DC) { xmlNode *update = NULL; crm_trace("Got client status callback"); update = do_update_node_cib(peer, node_update_peer, NULL, __FUNCTION__); fsa_cib_anon_update(XML_CIB_TAG_STATUS, update, cib_scope_local | cib_quorum_override | cib_can_create); free_xml(update); } } void crmd_ha_msg_callback(HA_Message * hamsg, void *private_data) { int level = LOG_DEBUG; crm_node_t *from_node = NULL; xmlNode *msg = convert_ha_message(NULL, hamsg, __FUNCTION__); const char *from = crm_element_value(msg, F_ORIG); const char *op = crm_element_value(msg, F_CRM_TASK); const char *sys_from = crm_element_value(msg, F_CRM_SYS_FROM); CRM_CHECK(from != NULL, crm_log_xml_err(msg, "anon"); goto bail); crm_trace("HA[inbound]: %s from %s", op, from); if (crm_peer_cache == NULL || crm_active_peers() == 0) { crm_debug("Ignoring HA messages until we are" " connected to the CCM (%s op from %s)", op, from); crm_log_xml_trace(msg, "HA[inbound]: Ignore (No CCM)"); goto bail; } from_node = crm_get_peer(0, from); if (crm_is_peer_active(from_node) == FALSE) { if (safe_str_eq(op, CRM_OP_VOTE)) { level = LOG_WARNING; } else if (AM_I_DC && safe_str_eq(op, CRM_OP_JOIN_ANNOUNCE)) { level = LOG_WARNING; } else if (safe_str_eq(sys_from, CRM_SYSTEM_DC)) { level = LOG_WARNING; } do_crm_log(level, "Ignoring HA message (op=%s) from %s: not in our" " membership list (size=%d)", op, from, crm_active_peers()); crm_log_xml_trace(msg, "HA[inbound]: CCM Discard"); } else { crmd_ha_msg_filter(msg); } bail: free_xml(msg); return; } gboolean crmd_ha_msg_dispatch(ll_cluster_t * cluster_conn, gpointer user_data) { IPC_Channel *channel = NULL; gboolean stay_connected = TRUE; crm_trace("Invoked"); if (cluster_conn != NULL) { channel = cluster_conn->llc_ops->ipcchan(cluster_conn); } CRM_CHECK(cluster_conn != NULL,;); CRM_CHECK(channel != NULL,;); if (channel != NULL && IPC_ISRCONN(channel)) { if (cluster_conn->llc_ops->msgready(cluster_conn) == 0) { crm_trace("no message ready yet"); } /* invoke the callbacks but dont block */ cluster_conn->llc_ops->rcvmsg(cluster_conn, 0); } if (channel == NULL || channel->ch_status != IPC_CONNECT) { if (is_set(fsa_input_register, R_HA_DISCONNECTED) == FALSE) { crm_crit("Lost connection to heartbeat service."); } else { crm_info("Lost connection to heartbeat service."); } trigger_fsa(fsa_source); stay_connected = FALSE; } return stay_connected; } diff --git a/include/crm/cluster/internal.h b/include/crm/cluster/internal.h index b7edfebef0..0834be2e15 100644 --- a/include/crm/cluster/internal.h +++ b/include/crm/cluster/internal.h @@ -1,448 +1,449 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #ifndef CRM_CLUSTER_INTERNAL__H # define CRM_CLUSTER_INTERNAL__H # include # define AIS_IPC_NAME "ais-crm-ipc" # define AIS_IPC_MESSAGE_SIZE 8192*128 # define CRM_MESSAGE_IPC_ACK 0 # ifndef INTERFACE_MAX # define INTERFACE_MAX 2 /* from the private coroapi.h header */ # endif typedef struct crm_ais_host_s AIS_Host; typedef struct crm_ais_msg_s AIS_Message; struct crm_ais_host_s { uint32_t id; uint32_t pid; gboolean local; enum crm_ais_msg_types type; uint32_t size; char uname[MAX_NAME]; } __attribute__ ((packed)); struct crm_ais_msg_s { cs_ipc_header_response_t header __attribute__ ((aligned(8))); uint32_t id; gboolean is_compressed; AIS_Host host; AIS_Host sender; uint32_t size; uint32_t compressed_size; /* 584 bytes */ char data[0]; } __attribute__ ((packed)); struct crm_ais_nodeid_resp_s { cs_ipc_header_response_t header __attribute__ ((aligned(8))); uint32_t id; uint32_t counter; char uname[MAX_NAME]; char cname[MAX_NAME]; } __attribute__ ((packed)); struct crm_ais_quorum_resp_s { cs_ipc_header_response_t header __attribute__ ((aligned(8))); uint64_t id; uint32_t votes; uint32_t expected_votes; uint32_t quorate; } __attribute__ ((packed)); /* *INDENT-OFF* */ enum crm_proc_flag { crm_proc_none = 0x00000001, /* These values are sent over the network by the legacy plugin * Therefor changing any of these values is going to break compatability * * So don't */ /* 3 messaging types */ crm_proc_heartbeat = 0x01000000, crm_proc_plugin = 0x00000002, crm_proc_cpg = 0x04000000, crm_proc_lrmd = 0x00000010, crm_proc_cib = 0x00000100, crm_proc_crmd = 0x00000200, crm_proc_attrd = 0x00001000, crm_proc_stonithd = 0x00002000, crm_proc_stonith_ng= 0x00100000, crm_proc_pe = 0x00010000, crm_proc_te = 0x00020000, crm_proc_mgmtd = 0x00040000, }; /* *INDENT-ON* */ static inline const char * peer2text(enum crm_proc_flag proc) { const char *text = "unknown"; - if (proc == (crm_proc_cpg | crm_proc_crmd)) { + if (proc == (crm_proc_cpg | crm_proc_crmd) + || proc == (crm_proc_heartbeat | crm_proc_crmd)) { return "peer"; } switch (proc) { case crm_proc_none: text = "none"; break; case crm_proc_plugin: text = "ais"; break; case crm_proc_heartbeat: text = "heartbeat"; break; case crm_proc_cib: text = "cib"; break; case crm_proc_crmd: text = "crmd"; break; case crm_proc_pe: text = "pengine"; break; case crm_proc_te: text = "tengine"; break; case crm_proc_lrmd: text = "lrmd"; break; case crm_proc_attrd: text = "attrd"; break; case crm_proc_stonithd: text = "stonithd"; break; case crm_proc_stonith_ng: text = "stonith-ng"; break; case crm_proc_mgmtd: text = "mgmtd"; break; case crm_proc_cpg: text = "corosync-cpg"; break; } return text; } static inline enum crm_proc_flag text2proc(const char *proc) { /* We only care about these two so far */ if (proc && strcmp(proc, "cib") == 0) { return crm_proc_cib; } else if (proc && strcmp(proc, "crmd") == 0) { return crm_proc_crmd; } return crm_proc_none; } static inline const char * ais_dest(const struct crm_ais_host_s *host) { if (host->local) { return "local"; } else if (host->size > 0) { return host->uname; } else { return ""; } } # define ais_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size) static inline AIS_Message * ais_msg_copy(const AIS_Message * source) { AIS_Message *target = malloc(sizeof(AIS_Message) + ais_data_len(source)); if(target) { memcpy(target, source, sizeof(AIS_Message)); memcpy(target->data, source->data, ais_data_len(target)); } return target; } /* typedef enum { CS_OK = 1, CS_ERR_LIBRARY = 2, CS_ERR_VERSION = 3, CS_ERR_INIT = 4, CS_ERR_TIMEOUT = 5, CS_ERR_TRY_AGAIN = 6, CS_ERR_INVALID_PARAM = 7, CS_ERR_NO_MEMORY = 8, CS_ERR_BAD_HANDLE = 9, CS_ERR_BUSY = 10, CS_ERR_ACCESS = 11, CS_ERR_NOT_EXIST = 12, CS_ERR_NAME_TOO_LONG = 13, CS_ERR_EXIST = 14, CS_ERR_NO_SPACE = 15, CS_ERR_INTERRUPT = 16, CS_ERR_NAME_NOT_FOUND = 17, CS_ERR_NO_RESOURCES = 18, CS_ERR_NOT_SUPPORTED = 19, CS_ERR_BAD_OPERATION = 20, CS_ERR_FAILED_OPERATION = 21, CS_ERR_MESSAGE_ERROR = 22, CS_ERR_QUEUE_FULL = 23, CS_ERR_QUEUE_NOT_AVAILABLE = 24, CS_ERR_BAD_FLAGS = 25, CS_ERR_TOO_BIG = 26, CS_ERR_NO_SECTIONS = 27, CS_ERR_CONTEXT_NOT_FOUND = 28, CS_ERR_TOO_MANY_GROUPS = 30, CS_ERR_SECURITY = 100 } cs_error_t; */ static inline const char * ais_error2text(int error) { const char *text = "unknown"; # if SUPPORT_COROSYNC switch (error) { case CS_OK: text = "OK"; break; case CS_ERR_LIBRARY: text = "Library error"; break; case CS_ERR_VERSION: text = "Version error"; break; case CS_ERR_INIT: text = "Initialization error"; break; case CS_ERR_TIMEOUT: text = "Timeout"; break; case CS_ERR_TRY_AGAIN: text = "Try again"; break; case CS_ERR_INVALID_PARAM: text = "Invalid parameter"; break; case CS_ERR_NO_MEMORY: text = "No memory"; break; case CS_ERR_BAD_HANDLE: text = "Bad handle"; break; case CS_ERR_BUSY: text = "Busy"; break; case CS_ERR_ACCESS: text = "Access error"; break; case CS_ERR_NOT_EXIST: text = "Doesn't exist"; break; case CS_ERR_NAME_TOO_LONG: text = "Name too long"; break; case CS_ERR_EXIST: text = "Exists"; break; case CS_ERR_NO_SPACE: text = "No space"; break; case CS_ERR_INTERRUPT: text = "Interrupt"; break; case CS_ERR_NAME_NOT_FOUND: text = "Name not found"; break; case CS_ERR_NO_RESOURCES: text = "No resources"; break; case CS_ERR_NOT_SUPPORTED: text = "Not supported"; break; case CS_ERR_BAD_OPERATION: text = "Bad operation"; break; case CS_ERR_FAILED_OPERATION: text = "Failed operation"; break; case CS_ERR_MESSAGE_ERROR: text = "Message error"; break; case CS_ERR_QUEUE_FULL: text = "Queue full"; break; case CS_ERR_QUEUE_NOT_AVAILABLE: text = "Queue not available"; break; case CS_ERR_BAD_FLAGS: text = "Bad flags"; break; case CS_ERR_TOO_BIG: text = "To big"; break; case CS_ERR_NO_SECTIONS: text = "No sections"; break; } # endif return text; } static inline const char * msg_type2text(enum crm_ais_msg_types type) { const char *text = "unknown"; switch (type) { case crm_msg_none: text = "unknown"; break; case crm_msg_ais: text = "ais"; break; case crm_msg_cib: text = "cib"; break; case crm_msg_crmd: text = "crmd"; break; case crm_msg_pe: text = "pengine"; break; case crm_msg_te: text = "tengine"; break; case crm_msg_lrmd: text = "lrmd"; break; case crm_msg_attrd: text = "attrd"; break; case crm_msg_stonithd: text = "stonithd"; break; case crm_msg_stonith_ng: text = "stonith-ng"; break; } return text; } enum crm_ais_msg_types text2msg_type(const char *text); char *get_ais_data(const AIS_Message * msg); gboolean check_message_sanity(const AIS_Message * msg, const char *data); # if SUPPORT_HEARTBEAT extern ll_cluster_t *heartbeat_cluster; gboolean send_ha_message(ll_cluster_t * hb_conn, xmlNode * msg, const char *node, gboolean force_ordered); gboolean ha_msg_dispatch(ll_cluster_t * cluster_conn, gpointer user_data); gboolean register_heartbeat_conn(crm_cluster_t * cluster); xmlNode *convert_ha_message(xmlNode * parent, HA_Message * msg, const char *field); gboolean ccm_have_quorum(oc_ed_t event); const char *ccm_event_name(oc_ed_t event); crm_node_t *crm_update_ccm_node(const oc_ev_membership_t * oc, int offset, const char *state, uint64_t seq); gboolean heartbeat_initialize_nodelist(void *cluster, gboolean force_member, xmlNode * xml_parent); # endif # if SUPPORT_COROSYNC gboolean send_cpg_iov(struct iovec * iov); # if SUPPORT_PLUGIN char *classic_node_name(uint32_t nodeid); void plugin_handle_membership(AIS_Message *msg); bool send_plugin_text(int class, struct iovec *iov); # else char *corosync_node_name(uint64_t /*cmap_handle_t */ cmap_handle, uint32_t nodeid); char *corosync_cluster_name(void); int corosync_cmap_has_config(const char *prefix); # endif gboolean corosync_initialize_nodelist(void *cluster, gboolean force_member, xmlNode * xml_parent); gboolean send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest); enum cluster_type_e find_corosync_variant(void); void terminate_cs_connection(crm_cluster_t * cluster); gboolean init_cs_connection(crm_cluster_t * cluster); gboolean init_cs_connection_once(crm_cluster_t * cluster); # endif # ifdef SUPPORT_CMAN char *cman_node_name(uint32_t nodeid); # endif enum crm_quorum_source { crm_quorum_cman, crm_quorum_corosync, crm_quorum_pacemaker, }; int get_corosync_id(int id, const char *uuid); char *get_corosync_uuid(crm_node_t *peer); enum crm_quorum_source get_quorum_source(void); void crm_update_peer_proc(const char *source, crm_node_t * peer, uint32_t flag, const char *status); crm_node_t *crm_update_peer(const char *source, unsigned int id, uint64_t born, uint64_t seen, int32_t votes, uint32_t children, const char *uuid, const char *uname, const char *addr, const char *state); void crm_update_peer_expected(const char *source, crm_node_t * node, const char *expected); void crm_update_peer_state(const char *source, crm_node_t * node, const char *state, int membership); gboolean init_cman_connection(gboolean(*dispatch) (unsigned long long, gboolean), void (*destroy) (gpointer)); gboolean cluster_connect_quorum(gboolean(*dispatch) (unsigned long long, gboolean), void (*destroy) (gpointer)); void set_node_uuid(const char *uname, const char *uuid); gboolean node_name_is_valid(const char *key, const char *name); crm_node_t * crm_find_peer_full(unsigned int id, const char *uname, int flags); crm_node_t * crm_find_peer(unsigned int id, const char *uname); #endif diff --git a/lib/cluster/heartbeat.c b/lib/cluster/heartbeat.c index eb9c18f35d..c6724bef34 100644 --- a/lib/cluster/heartbeat.c +++ b/lib/cluster/heartbeat.c @@ -1,625 +1,637 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #if HAVE_BZLIB_H # include #endif #if SUPPORT_HEARTBEAT ll_cluster_t *heartbeat_cluster = NULL; static void convert_ha_field(xmlNode * parent, void *msg_v, int lpc) { int type = 0; const char *name = NULL; const char *value = NULL; xmlNode *xml = NULL; HA_Message *msg = msg_v; int rc = BZ_OK; size_t orig_len = 0; unsigned int used = 0; char *uncompressed = NULL; char *compressed = NULL; int size = orig_len * 10; CRM_CHECK(parent != NULL, return); CRM_CHECK(msg != NULL, return); name = msg->names[lpc]; type = cl_get_type(msg, name); switch (type) { case FT_STRUCT: convert_ha_message(parent, msg->values[lpc], name); break; case FT_COMPRESS: case FT_UNCOMPRESS: convert_ha_message(parent, cl_get_struct(msg, name), name); break; case FT_STRING: value = msg->values[lpc]; CRM_CHECK(value != NULL, return); crm_trace("Converting %s/%d/%s", name, type, value[0] == '<' ? "xml" : "field"); if (value[0] != '<') { crm_xml_add(parent, name, value); break; } /* unpack xml string */ xml = string2xml(value); if (xml == NULL) { crm_err("Conversion of field '%s' failed", name); return; } add_node_nocopy(parent, NULL, xml); break; case FT_BINARY: value = cl_get_binary(msg, name, &orig_len); size = orig_len * 10 + 1; /* +1 because an exact 10x compression factor happens occasionally */ if (orig_len < 3 || value[0] != 'B' || value[1] != 'Z' || value[2] != 'h') { if (strstr(name, "uuid") == NULL) { crm_err("Skipping non-bzip binary field: %s", name); } return; } compressed = calloc(1, orig_len); memcpy(compressed, value, orig_len); crm_trace("Trying to decompress %d bytes", (int)orig_len); retry: uncompressed = realloc(uncompressed, size); memset(uncompressed, 0, size); used = size - 1; /* always leave room for a trailing '\0' * BZ2_bzBuffToBuffDecompress wont say anything if * the uncompressed data is exactly 'size' bytes */ rc = BZ2_bzBuffToBuffDecompress(uncompressed, &used, compressed, orig_len, 1, 0); if (rc == BZ_OUTBUFF_FULL) { size = size * 2; /* dont try to allocate more memory than we have */ if (size > 0) { goto retry; } } if (rc != BZ_OK) { crm_err("Decompression of %s (%d bytes) into %d failed: %d", name, (int)orig_len, size, rc); } else if (used >= size) { CRM_ASSERT(used < size); } else { CRM_LOG_ASSERT(uncompressed[used] == 0); uncompressed[used] = 0; xml = string2xml(uncompressed); } if (xml != NULL) { add_node_copy(parent, xml); free_xml(xml); } free(uncompressed); free(compressed); break; } } xmlNode * convert_ha_message(xmlNode * parent, HA_Message * msg, const char *field) { int lpc = 0; xmlNode *child = NULL; const char *tag = NULL; CRM_CHECK(msg != NULL, crm_err("Empty message for %s", field); return parent); tag = cl_get_string(msg, F_XML_TAGNAME); if (tag == NULL) { tag = field; } else if (parent && safe_str_neq(field, tag)) { /* For compatability with 0.6.x */ crm_debug("Creating intermediate parent %s between %s and %s", field, crm_element_name(parent), tag); parent = create_xml_node(parent, field); } if (parent == NULL) { parent = create_xml_node(NULL, tag); child = parent; } else { child = create_xml_node(parent, tag); } for (lpc = 0; lpc < msg->nfields; lpc++) { convert_ha_field(child, msg, lpc); } return parent; } static void add_ha_nocopy(HA_Message * parent, HA_Message * child, const char *field) { int next = parent->nfields; if (parent->nfields >= parent->nalloc && ha_msg_expand(parent) != HA_OK) { crm_err("Parent expansion failed"); return; } parent->names[next] = strdup(field); parent->nlens[next] = strlen(field); parent->values[next] = child; parent->vlens[next] = sizeof(HA_Message); parent->types[next] = FT_UNCOMPRESS; parent->nfields++; } static HA_Message * convert_xml_message_struct(HA_Message * parent, xmlNode * src_node, const char *field) { xmlNode *child = NULL; xmlNode *__crm_xml_iter = src_node->children; xmlAttrPtr prop_iter = src_node->properties; const char *name = NULL; const char *value = NULL; HA_Message *result = ha_msg_new(3); ha_msg_add(result, F_XML_TAGNAME, (const char *)src_node->name); while (prop_iter != NULL) { name = (const char *)prop_iter->name; value = (const char *)xmlGetProp(src_node, prop_iter->name); prop_iter = prop_iter->next; ha_msg_add(result, name, value); } while (__crm_xml_iter != NULL) { child = __crm_xml_iter; __crm_xml_iter = __crm_xml_iter->next; convert_xml_message_struct(result, child, NULL); } if (parent == NULL) { return result; } if (field) { HA_Message *holder = ha_msg_new(3); CRM_ASSERT(holder != NULL); ha_msg_add(holder, F_XML_TAGNAME, field); add_ha_nocopy(holder, result, (const char *)src_node->name); ha_msg_addstruct_compress(parent, field, holder); ha_msg_del(holder); } else { add_ha_nocopy(parent, result, (const char *)src_node->name); } return result; } static void convert_xml_child(HA_Message * msg, xmlNode * xml) { int orig = 0; int rc = BZ_OK; unsigned int len = 0; char *buffer = NULL; char *compressed = NULL; const char *name = NULL; name = (const char *)xml->name; buffer = dump_xml_unformatted(xml); orig = strlen(buffer); if (orig < CRM_BZ2_THRESHOLD) { ha_msg_add(msg, name, buffer); goto done; } len = (orig * 1.1) + 600; /* recomended size */ compressed = malloc(len); rc = BZ2_bzBuffToBuffCompress(compressed, &len, buffer, orig, CRM_BZ2_BLOCKS, 0, CRM_BZ2_WORK); if (rc != BZ_OK) { crm_err("Compression failed: %d", rc); free(compressed); convert_xml_message_struct(msg, xml, name); goto done; } free(buffer); buffer = compressed; crm_trace("Compression details: %d -> %d", orig, len); ha_msg_addbin(msg, name, buffer, len); done: free(buffer); # if 0 { unsigned int used = orig; char *uncompressed = NULL; crm_debug("Trying to decompress %d bytes", len); uncompressed = calloc(1, orig); rc = BZ2_bzBuffToBuffDecompress(uncompressed, &used, compressed, len, 1, 0); CRM_CHECK(rc == BZ_OK,; ); CRM_CHECK(used == orig,; ); crm_debug("rc=%d, used=%d", rc, used); if (rc != BZ_OK) { crm_exit(DAEMON_RESPAWN_STOP); } crm_debug("Original %s, decompressed %s", buffer, uncompressed); free(uncompressed); } # endif } static HA_Message * convert_xml_message(xmlNode * xml) { xmlNode *child = NULL; xmlAttrPtr pIter = NULL; HA_Message *result = NULL; result = ha_msg_new(3); ha_msg_add(result, F_XML_TAGNAME, (const char *)xml->name); for (pIter = xml->properties; pIter != NULL; pIter = pIter->next) { const char *p_name = (const char *)pIter->name; if (pIter->children) { const char *p_value = (const char *)pIter->children->content; ha_msg_add(result, p_name, p_value); } } for (child = __xml_first_child(xml); child != NULL; child = __xml_next(child)) { convert_xml_child(result, child); } return result; } gboolean crm_is_heartbeat_peer_active(const crm_node_t * node) { enum crm_proc_flag proc = text2proc(crm_system_name); if (node == NULL) { crm_trace("NULL"); return FALSE; } else if (safe_str_neq(node->state, CRM_NODE_MEMBER)) { crm_trace("%s: state=%s", node->uname, node->state); return FALSE; } else if ((node->processes & crm_proc_heartbeat) == 0) { crm_trace("%s: processes=%.16x", node->uname, node->processes); return FALSE; } else if (proc == crm_proc_none) { return TRUE; } else if ((node->processes & proc) == 0) { crm_trace("%s: proc %.16x not in %.16x", node->uname, proc, node->processes); return FALSE; } return TRUE; } crm_node_t * crm_update_ccm_node(const oc_ev_membership_t * oc, int offset, const char *state, uint64_t seq) { + enum crm_proc_flag this_proc = text2proc(crm_system_name); crm_node_t *peer = NULL; const char *uuid = NULL; CRM_CHECK(oc->m_array[offset].node_uname != NULL, return NULL); peer = crm_get_peer(0, oc->m_array[offset].node_uname); uuid = crm_peer_uuid(peer); crm_update_peer(__FUNCTION__, oc->m_array[offset].node_id, oc->m_array[offset].node_born_on, seq, -1, 0, uuid, oc->m_array[offset].node_uname, NULL, state); if (safe_str_eq(CRM_NODE_MEMBER, state)) { - /* Heartbeat doesn't send status notifications for nodes that were already part of the cluster */ - crm_update_peer_proc(__FUNCTION__, peer, crm_proc_heartbeat, ONLINESTATUS); - - /* Nor does it send status notifications for processes that were already active */ - crm_update_peer_proc(__FUNCTION__, peer, crm_proc_crmd, ONLINESTATUS); + /* Heartbeat doesn't send status notifications for nodes that were already part of the cluster. + * Nor does it send status notifications for processes that were already active. + * Do not optimistically assume the peer client process to be online as well. + * We ask for cluster wide updated client status for crm_system_name + * directly in the ccm status callback, which will then tell us. + * For ourselves, we know. */ + enum crm_proc_flag flags = crm_proc_heartbeat; + const char *const_uname = heartbeat_cluster->llc_ops->get_mynodeid(heartbeat_cluster); + if (safe_str_eq(const_uname, peer->uname)) { + flags |= this_proc; + } + crm_update_peer_proc(__FUNCTION__, peer, flags, ONLINESTATUS); + } else { + /* crm_update_peer_proc(__FUNCTION__, peer, crm_proc_heartbeat, OFFLINESTATUS); */ + /* heartbeat may well be still alive. peer client process apparently vanished, though ... */ + crm_update_peer_proc(__FUNCTION__, peer, this_proc, OFFLINESTATUS); } return peer; } gboolean send_ha_message(ll_cluster_t * hb_conn, xmlNode * xml, const char *node, gboolean force_ordered) { gboolean all_is_good = TRUE; HA_Message *msg = convert_xml_message(xml); if (msg == NULL) { crm_err("cant send NULL message"); all_is_good = FALSE; } else if (hb_conn == NULL) { crm_err("No heartbeat connection specified"); all_is_good = FALSE; } else if (hb_conn->llc_ops->chan_is_connected(hb_conn) == FALSE) { crm_err("Not connected to Heartbeat"); all_is_good = FALSE; } else if (node != NULL) { char *host_lowercase = g_ascii_strdown(node, -1); if (hb_conn->llc_ops->send_ordered_nodemsg(hb_conn, msg, host_lowercase) != HA_OK) { all_is_good = FALSE; crm_err("Send failed"); } free(host_lowercase); } else if (force_ordered) { if (hb_conn->llc_ops->send_ordered_clustermsg(hb_conn, msg) != HA_OK) { all_is_good = FALSE; crm_err("Broadcast Send failed"); } } else { if (hb_conn->llc_ops->sendclustermsg(hb_conn, msg) != HA_OK) { all_is_good = FALSE; crm_err("Broadcast Send failed"); } } if (all_is_good == FALSE && hb_conn != NULL) { IPC_Channel *ipc = NULL; IPC_Queue *send_q = NULL; if (hb_conn->llc_ops->chan_is_connected(hb_conn) != HA_OK) { ipc = hb_conn->llc_ops->ipcchan(hb_conn); } if (ipc != NULL) { /* ipc->ops->resume_io(ipc); */ send_q = ipc->send_queue; } if (send_q != NULL) { CRM_CHECK(send_q->current_qlen < send_q->max_qlen,; ); } } if (all_is_good) { crm_log_xml_trace(xml, "outbound"); } else { crm_log_xml_warn(xml, "outbound"); } if (msg != NULL) { ha_msg_del(msg); } return all_is_good; } gboolean ha_msg_dispatch(ll_cluster_t * cluster_conn, gpointer user_data) { IPC_Channel *channel = NULL; crm_trace("Invoked"); if (cluster_conn != NULL) { channel = cluster_conn->llc_ops->ipcchan(cluster_conn); } CRM_CHECK(cluster_conn != NULL, return FALSE); CRM_CHECK(channel != NULL, return FALSE); if (channel != NULL && IPC_ISRCONN(channel)) { if (cluster_conn->llc_ops->msgready(cluster_conn) == 0) { crm_trace("no message ready yet"); } /* invoke the callbacks but dont block */ cluster_conn->llc_ops->rcvmsg(cluster_conn, 0); } if (channel == NULL || channel->ch_status != IPC_CONNECT) { crm_info("Lost connection to heartbeat service."); return FALSE; } return TRUE; } gboolean register_heartbeat_conn(crm_cluster_t * cluster) { crm_node_t *peer = NULL; const char *const_uuid = NULL; const char *const_uname = NULL; crm_debug("Signing in with Heartbeat"); if (cluster->hb_conn->llc_ops->signon(cluster->hb_conn, crm_system_name) != HA_OK) { crm_err("Cannot sign on with heartbeat: %s", cluster->hb_conn->llc_ops->errmsg(cluster->hb_conn)); return FALSE; } if (HA_OK != cluster->hb_conn->llc_ops->set_msg_callback(cluster->hb_conn, crm_system_name, cluster->hb_dispatch, cluster->hb_conn)) { crm_err("Cannot set msg callback: %s", cluster->hb_conn->llc_ops->errmsg(cluster->hb_conn)); return FALSE; } else { void *handle = NULL; GLLclusterSource *(*g_main_add_cluster) (int priority, ll_cluster_t * api, gboolean can_recurse, gboolean(*dispatch) (ll_cluster_t * source_data, gpointer user_data), gpointer userdata, GDestroyNotify notify) = find_library_function(&handle, HEARTBEAT_LIBRARY, "G_main_add_ll_cluster", 1); (*g_main_add_cluster) (G_PRIORITY_HIGH, cluster->hb_conn, FALSE, ha_msg_dispatch, cluster->hb_conn, cluster->destroy); dlclose(handle); } const_uname = cluster->hb_conn->llc_ops->get_mynodeid(cluster->hb_conn); CRM_CHECK(const_uname != NULL, return FALSE); peer = crm_get_peer(0, const_uname); const_uuid = crm_peer_uuid(peer); CRM_CHECK(const_uuid != NULL, return FALSE); crm_info("Hostname: %s", const_uname); crm_info("UUID: %s", const_uuid); cluster->uname = strdup(const_uname); cluster->uuid = strdup(const_uuid); return TRUE; } gboolean ccm_have_quorum(oc_ed_t event) { if (event == OC_EV_MS_NEW_MEMBERSHIP || event == OC_EV_MS_PRIMARY_RESTORED) { return TRUE; } return FALSE; } const char * ccm_event_name(oc_ed_t event) { if (event == OC_EV_MS_NEW_MEMBERSHIP) { return "NEW MEMBERSHIP"; } else if (event == OC_EV_MS_NOT_PRIMARY) { return "NOT PRIMARY"; } else if (event == OC_EV_MS_PRIMARY_RESTORED) { return "PRIMARY RESTORED"; } else if (event == OC_EV_MS_EVICTED) { return "EVICTED"; } else if (event == OC_EV_MS_INVALID) { return "INVALID"; } return "NO QUORUM MEMBERSHIP"; } gboolean heartbeat_initialize_nodelist(void *cluster, gboolean force_member, xmlNode * xml_parent) { const char *ha_node = NULL; ll_cluster_t *conn = cluster; if (conn == NULL) { crm_debug("Not connected"); return FALSE; } /* Async get client status information in the cluster */ crm_info("Requesting the list of configured nodes"); conn->llc_ops->init_nodewalk(conn); do { xmlNode *node = NULL; crm_node_t *peer = NULL; const char *ha_node_type = NULL; const char *ha_node_uuid = NULL; ha_node = conn->llc_ops->nextnode(conn); if (ha_node == NULL) { continue; } ha_node_type = conn->llc_ops->node_type(conn, ha_node); if (safe_str_neq(NORMALNODE, ha_node_type)) { crm_debug("Node %s: skipping '%s'", ha_node, ha_node_type); continue; } peer = crm_get_peer(0, ha_node); ha_node_uuid = crm_peer_uuid(peer); if (ha_node_uuid == NULL) { crm_warn("Node %s: no uuid found", ha_node); continue; } crm_debug("Node: %s (uuid: %s)", ha_node, ha_node_uuid); node = create_xml_node(xml_parent, XML_CIB_TAG_NODE); crm_xml_add(node, XML_ATTR_ID, ha_node_uuid); crm_xml_add(node, XML_ATTR_UNAME, ha_node); crm_xml_add(node, XML_ATTR_TYPE, ha_node_type); } while (ha_node != NULL); conn->llc_ops->end_nodewalk(conn); return TRUE; } #endif