diff --git a/daemons/pacemakerd/pcmkd_corosync.c b/daemons/pacemakerd/pcmkd_corosync.c index 3916fec9f2..2bfd7bf2cd 100644 --- a/daemons/pacemakerd/pcmkd_corosync.c +++ b/daemons/pacemakerd/pcmkd_corosync.c @@ -1,377 +1,378 @@ /* * Copyright 2010-2025 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU General Public License version 2 * or later (GPLv2+) WITHOUT ANY WARRANTY. */ #include #include "pacemakerd.h" #include "pcmkd_corosync.h" #include #include /* for calls to stat() */ #include /* For basename() and dirname() */ #include #include /* For getpwname() */ #include #include #include #include #include #include /* for crm_ipc_is_authentic_process */ #include #include /* PCMK__SPECIAL_PID* */ static corosync_cfg_handle_t cfg_handle = 0; static mainloop_timer_t *reconnect_timer = NULL; /* =::=::=::= CFG - Shutdown stuff =::=::=::= */ static void cfg_shutdown_callback(corosync_cfg_handle_t h, corosync_cfg_shutdown_flags_t flags) { crm_info("Corosync wants to shut down: %s", (flags == COROSYNC_CFG_SHUTDOWN_FLAG_IMMEDIATE) ? "immediate" : (flags == COROSYNC_CFG_SHUTDOWN_FLAG_REGARDLESS) ? "forced" : "optional"); /* Never allow corosync to shut down while we're running */ corosync_cfg_replyto_shutdown(h, COROSYNC_CFG_SHUTDOWN_FLAG_NO); } static corosync_cfg_callbacks_t cfg_callbacks = { .corosync_cfg_shutdown_callback = cfg_shutdown_callback, }; static int pcmk_cfg_dispatch(gpointer user_data) { corosync_cfg_handle_t *handle = (corosync_cfg_handle_t *) user_data; cs_error_t rc = corosync_cfg_dispatch(*handle, CS_DISPATCH_ALL); if (rc != CS_OK) { return -1; } return 0; } static void close_cfg(void) { if (cfg_handle != 0) { #ifdef HAVE_COROSYNC_CFG_TRACKSTART /* Ideally, we would call corosync_cfg_trackstop(cfg_handle) here, but a * bug in corosync 3.1.1 and 3.1.2 makes it hang forever. Thankfully, * it's not necessary since we exit immediately after this. */ #endif corosync_cfg_finalize(cfg_handle); cfg_handle = 0; } } static gboolean cluster_reconnect_cb(gpointer data) { if (cluster_connect_cfg()) { mainloop_timer_del(reconnect_timer); reconnect_timer = NULL; crm_notice("Cluster reconnect succeeded"); pacemakerd_read_config(); restart_cluster_subdaemons(); return G_SOURCE_REMOVE; } else { crm_info("Cluster reconnect failed " "(connection will be reattempted once per second)"); } /* * In theory this will continue forever. In practice the CIB connection from * attrd will timeout and shut down Pacemaker when it gets bored. */ return G_SOURCE_CONTINUE; } static void cfg_connection_destroy(gpointer user_data) { crm_warn("Lost connection to cluster layer " "(connection will be reattempted once per second)"); corosync_cfg_finalize(cfg_handle); cfg_handle = 0; reconnect_timer = mainloop_timer_add("corosync reconnect", 1000, TRUE, cluster_reconnect_cb, NULL); mainloop_timer_start(reconnect_timer); } void cluster_disconnect_cfg(void) { close_cfg(); if (reconnect_timer != NULL) { /* The mainloop should be gone by this point, so this isn't necessary, * but cleaning up memory should make valgrind happier. */ mainloop_timer_del(reconnect_timer); reconnect_timer = NULL; } } #define cs_repeat(counter, max, code) do { \ code; \ if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \ counter++; \ crm_debug("Retrying Corosync operation after %ds", counter); \ sleep(counter); \ } else { \ break; \ } \ } while(counter < max) gboolean cluster_connect_cfg(void) { cs_error_t rc; int fd = -1, retries = 0, rv; uid_t found_uid = 0; gid_t found_gid = 0; pid_t found_pid = 0; uint32_t nodeid; static struct mainloop_fd_callbacks cfg_fd_callbacks = { .dispatch = pcmk_cfg_dispatch, .destroy = cfg_connection_destroy, }; cs_repeat(retries, 30, rc = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)); if (rc != CS_OK) { crm_crit("Could not connect to Corosync CFG: %s " QB_XS " rc=%d", cs_strerror(rc), rc); return FALSE; } rc = corosync_cfg_fd_get(cfg_handle, &fd); if (rc != CS_OK) { crm_crit("Could not get Corosync CFG descriptor: %s " QB_XS " rc=%d", cs_strerror(rc), rc); goto bail; } /* CFG provider run as root (in given user namespace, anyway)? */ if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, &found_uid, &found_gid))) { crm_crit("Rejecting Corosync CFG provider because process %lld " "is running as uid %lld gid %lld, not root", (long long) PCMK__SPECIAL_PID_AS_0(found_pid), (long long) found_uid, (long long) found_gid); goto bail; } else if (rv < 0) { crm_crit("Could not authenticate Corosync CFG provider: %s " QB_XS " rc=%d", strerror(-rv), -rv); goto bail; } retries = 0; cs_repeat(retries, 30, rc = corosync_cfg_local_get(cfg_handle, &nodeid)); if (rc != CS_OK) { crm_crit("Could not get local node ID from Corosync: %s " QB_XS " rc=%d", cs_strerror(rc), rc); goto bail; } crm_debug("Corosync reports local node ID is %lu", (unsigned long) nodeid); #ifdef HAVE_COROSYNC_CFG_TRACKSTART retries = 0; cs_repeat(retries, 30, rc = corosync_cfg_trackstart(cfg_handle, 0)); if (rc != CS_OK) { crm_crit("Could not enable Corosync CFG shutdown tracker: %s " QB_XS " rc=%d", cs_strerror(rc), rc); goto bail; } #endif mainloop_add_fd("corosync-cfg", G_PRIORITY_DEFAULT, fd, &cfg_handle, &cfg_fd_callbacks); return TRUE; bail: corosync_cfg_finalize(cfg_handle); return FALSE; } void pcmkd_shutdown_corosync(void) { cs_error_t rc; if (cfg_handle == 0) { crm_warn("Unable to shut down Corosync: No connection"); return; } crm_info("Asking Corosync to shut down"); rc = corosync_cfg_try_shutdown(cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_IMMEDIATE); if (rc == CS_OK) { close_cfg(); } else { crm_warn("Corosync shutdown failed: %s " QB_XS " rc=%d", cs_strerror(rc), rc); } } bool pcmkd_corosync_connected(void) { cpg_handle_t local_handle = 0; cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0}; int fd = -1; if (cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *) &cpg_model_info, NULL) != CS_OK) { return false; } if (cpg_fd_get(local_handle, &fd) != CS_OK) { return false; } cpg_finalize(local_handle); return true; } /* =::=::=::= Configuration =::=::=::= */ static int get_config_opt(uint64_t unused, cmap_handle_t object_handle, const char *key, char **value, const char *fallback) { int rc = 0, retries = 0; cs_repeat(retries, 5, rc = cmap_get_string(object_handle, key, value)); if (rc != CS_OK) { crm_trace("Search for %s failed %d, defaulting to %s", key, rc, fallback); pcmk__str_update(value, fallback); } crm_trace("%s: %s", key, *value); return rc; } gboolean pacemakerd_read_config(void) { cs_error_t rc = CS_OK; int retries = 0; cmap_handle_t local_handle; uint64_t config = 0; int fd = -1; uid_t found_uid = 0; gid_t found_gid = 0; pid_t found_pid = 0; int rv; enum pcmk_cluster_layer cluster_layer = pcmk_cluster_layer_unknown; const char *cluster_layer_s = NULL; // There can be only one possibility do { rc = pcmk__init_cmap(&local_handle); if (rc != CS_OK) { retries++; crm_info("Could not connect to Corosync CMAP: %s (retrying in %ds) " QB_XS " rc=%d", cs_strerror(rc), retries, rc); sleep(retries); } else { break; } } while (retries < 5); if (rc != CS_OK) { crm_crit("Could not connect to Corosync CMAP: %s " QB_XS " rc=%d", cs_strerror(rc), rc); return FALSE; } rc = cmap_fd_get(local_handle, &fd); if (rc != CS_OK) { crm_crit("Could not get Corosync CMAP descriptor: %s " QB_XS " rc=%d", cs_strerror(rc), rc); cmap_finalize(local_handle); return FALSE; } /* CMAP provider run as root (in given user namespace, anyway)? */ if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, &found_uid, &found_gid))) { crm_crit("Rejecting Corosync CMAP provider because process %lld " "is running as uid %lld gid %lld, not root", (long long) PCMK__SPECIAL_PID_AS_0(found_pid), (long long) found_uid, (long long) found_gid); cmap_finalize(local_handle); return FALSE; } else if (rv < 0) { crm_crit("Could not authenticate Corosync CMAP provider: %s " QB_XS " rc=%d", strerror(-rv), -rv); cmap_finalize(local_handle); return FALSE; } cluster_layer = pcmk_get_cluster_layer(); cluster_layer_s = pcmk_cluster_layer_text(cluster_layer); if (cluster_layer != pcmk_cluster_layer_corosync) { crm_crit("Expected Corosync cluster layer but detected %s " QB_XS " cluster_layer=%d", cluster_layer_s, cluster_layer); return FALSE; } crm_info("Reading configuration for %s cluster layer", cluster_layer_s); pcmk__set_env_option(PCMK__ENV_CLUSTER_TYPE, PCMK_VALUE_COROSYNC, true); // If debug logging is not configured, check whether corosync has it if (pcmk__env_option(PCMK__ENV_DEBUG) == NULL) { char *debug_enabled = NULL; get_config_opt(config, local_handle, "logging.debug", &debug_enabled, PCMK_VALUE_OFF); if (pcmk__is_true(debug_enabled)) { pcmk__set_env_option(PCMK__ENV_DEBUG, "1", true); if (get_crm_log_level() < LOG_DEBUG) { set_crm_log_level(LOG_DEBUG); } } else { pcmk__set_env_option(PCMK__ENV_DEBUG, "0", true); } free(debug_enabled); } if(local_handle){ gid_t gid = 0; if (pcmk_daemon_user(NULL, &gid) < 0) { crm_warn("Could not authorize group with Corosync " QB_XS " No group found for user %s", CRM_DAEMON_USER); } else { char *key = crm_strdup_printf("uidgid.gid.%lld", (long long) gid); rc = cmap_set_uint8(local_handle, key, 1); free(key); if (rc != CS_OK) { crm_warn("Could not authorize group with Corosync: %s " QB_XS - " group=%u rc=%d", pcmk__cs_err_str(rc), gid, rc); + " group=%u rc=%d", pcmk_rc_str(pcmk__corosync2rc(rc)), + gid, rc); } } } cmap_finalize(local_handle); return TRUE; } diff --git a/include/crm/cluster/internal.h b/include/crm/cluster/internal.h index bf9438d303..d084450659 100644 --- a/include/crm/cluster/internal.h +++ b/include/crm/cluster/internal.h @@ -1,324 +1,279 @@ /* * Copyright 2004-2025 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #ifndef PCMK__CRM_CLUSTER_INTERNAL__H #define PCMK__CRM_CLUSTER_INTERNAL__H #include #include // uint32_t, uint64_t #include // gboolean #include // xmlNode #include // enum crm_ipc_server #include #if SUPPORT_COROSYNC #include // cpg_name, cpg_handle_t #endif #ifdef __cplusplus extern "C" { #endif // @TODO Replace this with a pcmk__node_status_flags value enum crm_proc_flag { crm_proc_none = 0x00000001, // Cluster layers crm_proc_cpg = 0x04000000, }; /*! * \internal * \brief Boolean flags for a \c pcmk__node_status_t object * * Some flags may not be related to status specifically. However, we keep these * separate from enum pcmk__node_flags because they're used with * different object types. */ enum pcmk__node_status_flags { /*! * Node is a Pacemaker Remote node and should not be considered for cluster * membership */ pcmk__node_status_remote = (UINT32_C(1) << 0), //! Node's cache entry is dirty pcmk__node_status_dirty = (UINT32_C(1) << 1), }; // Used with node cache search functions enum pcmk__node_search_flags { //! Does not affect search pcmk__node_search_none = 0, //! Search for cluster nodes from membership cache pcmk__node_search_cluster_member = (1 << 0), //! Search for remote nodes pcmk__node_search_remote = (1 << 1), //! Search for cluster member nodes and remote nodes pcmk__node_search_any = pcmk__node_search_cluster_member |pcmk__node_search_remote, //! Search for cluster nodes from CIB (as of last cache refresh) pcmk__node_search_cluster_cib = (1 << 2), }; /*! * \internal * \brief Type of update to a \c pcmk__node_status_t object */ enum pcmk__node_update { pcmk__node_update_name, //!< Node name updated pcmk__node_update_state, //!< Node connection state updated pcmk__node_update_processes, //!< Node process group membership updated }; typedef struct pcmk__election pcmk__election_t; //! Implementation of pcmk__cluster_private_t struct pcmk__cluster_private { enum pcmk_ipc_server server; //!< Server this connection is for (if any) char *node_name; //!< Local node name at cluster layer char *node_xml_id; //!< Local node XML ID in CIB pcmk__election_t *election; //!< Election state (if election is needed) /* @TODO Corosync uses an integer node ID, but cluster layers in the * abstract do not necessarily need to */ uint32_t node_id; //!< Local node ID at cluster layer #if SUPPORT_COROSYNC /* @TODO Make these members a separate struct and use void *cluster_data * here instead, to abstract the cluster layer further. */ struct cpg_name group; //!< Corosync CPG name cpg_handle_t cpg_handle; //!< Corosync CPG handle #endif // SUPPORT_COROSYNC }; //! Node status data (may be a cluster node or a Pacemaker Remote node) typedef struct pcmk__node_status { //! Node name as known to cluster layer, or Pacemaker Remote node name char *name; /* @COMPAT This is less than ideal since the value is not a valid XML ID * (for Corosync, it's the string equivalent of the node's numeric node ID, * but XML IDs can't start with a number) and the three elements should have * different IDs. * * Ideally, we would use something like node-NODEID, node_state-NODEID, and * transient_attributes-NODEID as the element IDs. Unfortunately changing it * would be impractical due to backward compatibility; older nodes in a * rolling upgrade will always write and expect the value in the old format. */ /*! * Value of the PCMK_XA_ID XML attribute to use with the node's * PCMK_XE_NODE, PCMK_XE_NODE_STATE, and PCMK_XE_TRANSIENT_ATTRIBUTES * XML elements in the CIB */ char *xml_id; char *state; // @TODO change to enum //! Group of enum pcmk__node_status_flags uint32_t flags; /*! * Most recent cluster membership in which node was seen (0 for Pacemaker * Remote nodes) */ uint64_t membership_id; uint32_t processes; // @TODO most not needed, merge into flags /* @TODO When we can break public API compatibility, we can make the rest of * these members separate structs and use void *cluster_data and * void *user_data here instead, to abstract the cluster layer further. */ //! Arbitrary data (must be freeable by \c free()) void *user_data; char *expected; time_t peer_lost; char *conn_host; time_t when_member; // Since when node has been a cluster member time_t when_online; // Since when peer has been online in CPG /* @TODO The following are currently needed only by the Corosync stack. * Eventually consider moving them to a cluster-layer-specific data object. */ uint32_t cluster_layer_id; //!< Cluster-layer numeric node ID time_t when_lost; //!< When CPG membership was last lost } pcmk__node_status_t; /*! * \internal * \brief Return the process bit corresponding to the current cluster stack * * \return Process flag if detectable, otherwise 0 */ static inline uint32_t crm_get_cluster_proc(void) { switch (pcmk_get_cluster_layer()) { case pcmk_cluster_layer_corosync: return crm_proc_cpg; default: break; } return crm_proc_none; } -/*! - * \internal - * \brief Get log-friendly string description of a Corosync return code - * - * \param[in] error Corosync return code - * - * \return Log-friendly string description corresponding to \p error - */ -static inline const char * -pcmk__cs_err_str(int error) -{ -# if SUPPORT_COROSYNC - switch (error) { - case CS_OK: return "OK"; - case CS_ERR_LIBRARY: return "Library error"; - case CS_ERR_VERSION: return "Version error"; - case CS_ERR_INIT: return "Initialization error"; - case CS_ERR_TIMEOUT: return "Timeout"; - case CS_ERR_TRY_AGAIN: return "Try again"; - case CS_ERR_INVALID_PARAM: return "Invalid parameter"; - case CS_ERR_NO_MEMORY: return "No memory"; - case CS_ERR_BAD_HANDLE: return "Bad handle"; - case CS_ERR_BUSY: return "Busy"; - case CS_ERR_ACCESS: return "Access error"; - case CS_ERR_NOT_EXIST: return "Doesn't exist"; - case CS_ERR_NAME_TOO_LONG: return "Name too long"; - case CS_ERR_EXIST: return "Exists"; - case CS_ERR_NO_SPACE: return "No space"; - case CS_ERR_INTERRUPT: return "Interrupt"; - case CS_ERR_NAME_NOT_FOUND: return "Name not found"; - case CS_ERR_NO_RESOURCES: return "No resources"; - case CS_ERR_NOT_SUPPORTED: return "Not supported"; - case CS_ERR_BAD_OPERATION: return "Bad operation"; - case CS_ERR_FAILED_OPERATION: return "Failed operation"; - case CS_ERR_MESSAGE_ERROR: return "Message error"; - case CS_ERR_QUEUE_FULL: return "Queue full"; - case CS_ERR_QUEUE_NOT_AVAILABLE: return "Queue not available"; - case CS_ERR_BAD_FLAGS: return "Bad flags"; - case CS_ERR_TOO_BIG: return "Too big"; - case CS_ERR_NO_SECTIONS: return "No sections"; - } -# endif - return "Corosync error"; -} - # if SUPPORT_COROSYNC #if 0 /* This is the new way to do it, but we still support all Corosync 2 versions, * and this isn't always available. A better alternative here would be to check * for support in the configure script and enable this conditionally. */ #define pcmk__init_cmap(handle) cmap_initialize_map((handle), CMAP_MAP_ICMAP) #else #define pcmk__init_cmap(handle) cmap_initialize(handle) #endif char *pcmk__corosync_cluster_name(void); bool pcmk__corosync_add_nodes(xmlNode *xml_parent); void pcmk__cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries); char *pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid, void *content, const char **from); # endif const char *pcmk__cluster_get_xml_id(pcmk__node_status_t *node); char *pcmk__cluster_node_name(uint32_t nodeid); const char *pcmk__cluster_local_node_name(void); const char *pcmk__node_name_from_uuid(const char *uuid); pcmk__node_status_t *crm_update_peer_proc(const char *source, pcmk__node_status_t *peer, uint32_t flag, const char *status); pcmk__node_status_t *pcmk__update_peer_state(const char *source, pcmk__node_status_t *node, const char *state, uint64_t membership); void pcmk__update_peer_expected(const char *source, pcmk__node_status_t *node, const char *expected); void pcmk__reap_unseen_nodes(uint64_t ring_id); void pcmk__corosync_quorum_connect(gboolean (*dispatch)(unsigned long long, gboolean), void (*destroy) (gpointer)); bool pcmk__cluster_send_message(const pcmk__node_status_t *node, enum pcmk_ipc_server service, const xmlNode *data); // Membership extern GHashTable *pcmk__peer_cache; extern GHashTable *pcmk__remote_peer_cache; bool pcmk__cluster_has_quorum(void); void pcmk__cluster_init_node_caches(void); void pcmk__cluster_destroy_node_caches(void); void pcmk__cluster_set_autoreap(bool enable); void pcmk__cluster_set_status_callback(void (*dispatch)(enum pcmk__node_update, pcmk__node_status_t *, const void *)); bool pcmk__cluster_is_node_active(const pcmk__node_status_t *node); unsigned int pcmk__cluster_num_active_nodes(void); unsigned int pcmk__cluster_num_remote_nodes(void); pcmk__node_status_t *pcmk__cluster_lookup_remote_node(const char *node_name); void pcmk__cluster_forget_cluster_node(uint32_t id, const char *node_name); void pcmk__cluster_forget_remote_node(const char *node_name); pcmk__node_status_t *pcmk__search_node_caches(unsigned int id, const char *uname, const char *xml_id, uint32_t flags); void pcmk__purge_node_from_cache(const char *node_name, uint32_t node_id); void pcmk__refresh_node_caches_from_cib(xmlNode *cib); pcmk__node_status_t *pcmk__get_node(unsigned int id, const char *uname, const char *xml_id, uint32_t flags); #ifdef __cplusplus } #endif #endif // PCMK__CRM_CLUSTER_INTERNAL__H diff --git a/lib/cluster/corosync.c b/lib/cluster/corosync.c index c7737bdcfa..95f15c3bb0 100644 --- a/lib/cluster/corosync.c +++ b/lib/cluster/corosync.c @@ -1,821 +1,821 @@ /* * Copyright 2004-2025 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #include #include #include // PRIu64, etc. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include // PCMK__SPECIAL_PID #include #include #include "crmcluster_private.h" static quorum_handle_t pcmk_quorum_handle = 0; static gboolean (*quorum_app_callback)(unsigned long long seq, gboolean quorate) = NULL; /*! * \internal * \brief Get the Corosync UUID associated with a Pacemaker node * * \param[in] node Pacemaker node * * \return Newly allocated string with node's Corosync UUID, or NULL if unknown * \note It is the caller's responsibility to free the result with free(). */ char * pcmk__corosync_uuid(const pcmk__node_status_t *node) { pcmk__assert(pcmk_get_cluster_layer() == pcmk_cluster_layer_corosync); if (node != NULL) { if (node->cluster_layer_id > 0) { return crm_strdup_printf("%" PRIu32, node->cluster_layer_id); } else { crm_info("Node %s is not yet known by Corosync", node->name); } } return NULL; } static bool node_name_is_valid(const char *key, const char *name) { int octet; if (name == NULL) { crm_trace("%s is empty", key); return false; } else if (sscanf(name, "%d.%d.%d.%d", &octet, &octet, &octet, &octet) == 4) { crm_trace("%s contains an IPv4 address (%s), ignoring", key, name); return false; } else if (strstr(name, ":") != NULL) { crm_trace("%s contains an IPv6 address (%s), ignoring", key, name); return false; } crm_trace("'%s: %s' is valid", key, name); return true; } /* * \internal * \brief Get Corosync node name corresponding to a node ID * * \param[in] cmap_handle Connection to Corosync CMAP * \param[in] nodeid Node ID to check * * \return Newly allocated string with name or (if no name) IP address * associated with first address assigned to a Corosync node ID (or NULL * if unknown) * \note It is the caller's responsibility to free the result with free(). */ char * pcmk__corosync_name(uint64_t /*cmap_handle_t */ cmap_handle, uint32_t nodeid) { // Originally based on corosync-quorumtool.c:node_name() int lpc = 0; cs_error_t rc = CS_OK; int retries = 0; char *name = NULL; cmap_handle_t local_handle = 0; int fd = -1; uid_t found_uid = 0; gid_t found_gid = 0; pid_t found_pid = 0; int rv; if (nodeid == 0) { nodeid = pcmk__cpg_local_nodeid(0); } if (cmap_handle == 0 && local_handle == 0) { retries = 0; crm_trace("Initializing CMAP connection"); do { rc = pcmk__init_cmap(&local_handle); if (rc != CS_OK) { retries++; crm_debug("API connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries); sleep(retries); } } while (retries < 5 && rc != CS_OK); if (rc != CS_OK) { crm_warn("Could not connect to Cluster Configuration Database API, error %s", cs_strerror(rc)); local_handle = 0; } } if (cmap_handle == 0) { cmap_handle = local_handle; rc = cmap_fd_get(cmap_handle, &fd); if (rc != CS_OK) { crm_err("Could not obtain the CMAP API connection: %s (%d)", cs_strerror(rc), rc); goto bail; } /* CMAP provider run as root (in given user namespace, anyway)? */ if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, &found_uid, &found_gid))) { crm_err("CMAP provider is not authentic:" " process %lld (uid: %lld, gid: %lld)", (long long) PCMK__SPECIAL_PID_AS_0(found_pid), (long long) found_uid, (long long) found_gid); goto bail; } else if (rv < 0) { crm_err("Could not verify authenticity of CMAP provider: %s (%d)", strerror(-rv), -rv); goto bail; } } while (name == NULL && cmap_handle != 0) { uint32_t id = 0; char *key = NULL; key = crm_strdup_printf("nodelist.node.%d.nodeid", lpc); rc = cmap_get_uint32(cmap_handle, key, &id); crm_trace("Checking %u vs %u from %s", nodeid, id, key); free(key); if (rc != CS_OK) { break; } if (nodeid == id) { crm_trace("Searching for node name for %u in nodelist.node.%d %s", nodeid, lpc, pcmk__s(name, "")); if (name == NULL) { key = crm_strdup_printf("nodelist.node.%d.name", lpc); cmap_get_string(cmap_handle, key, &name); crm_trace("%s = %s", key, pcmk__s(name, "")); free(key); } if (name == NULL) { key = crm_strdup_printf("nodelist.node.%d.ring0_addr", lpc); cmap_get_string(cmap_handle, key, &name); crm_trace("%s = %s", key, pcmk__s(name, "")); if (!node_name_is_valid(key, name)) { free(name); name = NULL; } free(key); } break; } lpc++; } bail: if(local_handle) { cmap_finalize(local_handle); } if (name == NULL) { crm_info("Unable to get node name for nodeid %u", nodeid); } return name; } /*! * \internal * \brief Disconnect from Corosync cluster * * \param[in,out] cluster Cluster object to disconnect */ void pcmk__corosync_disconnect(pcmk_cluster_t *cluster) { pcmk__cpg_disconnect(cluster); if (pcmk_quorum_handle != 0) { quorum_finalize(pcmk_quorum_handle); pcmk_quorum_handle = 0; } crm_notice("Disconnected from Corosync"); } /*! * \internal * \brief Dispatch function for quorum connection file descriptor * * \param[in] user_data Ignored * * \return 0 on success, -1 on error (per mainloop_io_t interface) */ static int quorum_dispatch_cb(gpointer user_data) { int rc = quorum_dispatch(pcmk_quorum_handle, CS_DISPATCH_ALL); if (rc < 0) { crm_err("Connection to the Quorum API failed: %d", rc); quorum_finalize(pcmk_quorum_handle); pcmk_quorum_handle = 0; return -1; } return 0; } /*! * \internal * \brief Notification callback for Corosync quorum connection * * \param[in] handle Corosync quorum connection * \param[in] quorate Whether cluster is quorate * \param[in] ring_id Corosync ring ID * \param[in] view_list_entries Number of entries in \p view_list * \param[in] view_list Corosync node IDs in membership */ static void quorum_notification_cb(quorum_handle_t handle, uint32_t quorate, uint64_t ring_id, uint32_t view_list_entries, uint32_t *view_list) { int i; GHashTableIter iter; pcmk__node_status_t *node = NULL; static gboolean init_phase = TRUE; bool is_quorate = (quorate != 0); bool was_quorate = pcmk__cluster_has_quorum(); if (is_quorate && !was_quorate) { crm_notice("Quorum acquired " QB_XS " membership=%" PRIu64 " members=%" PRIu32, ring_id, view_list_entries); pcmk__cluster_set_quorum(true); } else if (!is_quorate && was_quorate) { crm_warn("Quorum lost " QB_XS " membership=%" PRIu64 " members=" PRIu32, ring_id, view_list_entries); pcmk__cluster_set_quorum(false); } else { crm_info("Quorum %s " QB_XS " membership=%" PRIu64 " members=%" PRIu32, (is_quorate? "retained" : "still lost"), ring_id, view_list_entries); } if (view_list_entries == 0 && init_phase) { crm_info("Corosync membership is still forming, ignoring"); return; } init_phase = FALSE; /* Reset membership_id for all cached nodes so we can tell which ones aren't * in the view list */ g_hash_table_iter_init(&iter, pcmk__peer_cache); while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) { node->membership_id = 0; } /* Update the peer cache for each node in view list */ for (i = 0; i < view_list_entries; i++) { uint32_t id = view_list[i]; crm_debug("Member[%d] %u ", i, id); /* Get this node's peer cache entry (adding one if not already there) */ node = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member); if (node->name == NULL) { char *name = pcmk__corosync_name(0, id); crm_info("Obtaining name for new node %u", id); node = pcmk__get_node(id, name, NULL, pcmk__node_search_cluster_member); free(name); } // Update the node state (including updating membership_id to ring_id) pcmk__update_peer_state(__func__, node, PCMK_VALUE_MEMBER, ring_id); } /* Remove any peer cache entries we didn't update */ pcmk__reap_unseen_nodes(ring_id); if (quorum_app_callback) { quorum_app_callback(ring_id, is_quorate); } } /*! * \internal * \brief Connect to Corosync quorum service * * \param[in] dispatch Connection dispatch callback * \param[in] destroy Connection destroy callback */ void pcmk__corosync_quorum_connect(gboolean (*dispatch)(unsigned long long, gboolean), void (*destroy)(gpointer)) { cs_error_t rc; int fd = 0; int quorate = 0; uint32_t quorum_type = 0; struct mainloop_fd_callbacks quorum_fd_callbacks; uid_t found_uid = 0; gid_t found_gid = 0; pid_t found_pid = 0; int rv; quorum_fd_callbacks.dispatch = quorum_dispatch_cb; quorum_fd_callbacks.destroy = destroy; crm_debug("Configuring Pacemaker to obtain quorum from Corosync"); { #if 0 // New way but not supported by all Corosync 2 versions quorum_model_v0_data_t quorum_model_data = { .model = QUORUM_MODEL_V0, .quorum_notify_fn = quorum_notification_cb, }; rc = quorum_model_initialize(&pcmk_quorum_handle, QUORUM_MODEL_V0, (quorum_model_data_t *) &quorum_model_data, &quorum_type, NULL); #else quorum_callbacks_t quorum_callbacks = { .quorum_notify_fn = quorum_notification_cb, }; rc = quorum_initialize(&pcmk_quorum_handle, &quorum_callbacks, &quorum_type); #endif } if (rc != CS_OK) { crm_err("Could not connect to the Quorum API: %s (%d)", cs_strerror(rc), rc); goto bail; } else if (quorum_type != QUORUM_SET) { crm_err("Corosync quorum is not configured"); goto bail; } rc = quorum_fd_get(pcmk_quorum_handle, &fd); if (rc != CS_OK) { crm_err("Could not obtain the Quorum API connection: %s (%d)", strerror(rc), rc); goto bail; } /* Quorum provider run as root (in given user namespace, anyway)? */ if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, &found_uid, &found_gid))) { crm_err("Quorum provider is not authentic:" " process %lld (uid: %lld, gid: %lld)", (long long) PCMK__SPECIAL_PID_AS_0(found_pid), (long long) found_uid, (long long) found_gid); rc = CS_ERR_ACCESS; goto bail; } else if (rv < 0) { crm_err("Could not verify authenticity of Quorum provider: %s (%d)", strerror(-rv), -rv); rc = CS_ERR_ACCESS; goto bail; } rc = quorum_getquorate(pcmk_quorum_handle, &quorate); if (rc != CS_OK) { crm_err("Could not obtain the current Quorum API state: %d", rc); goto bail; } if (quorate) { crm_notice("Quorum acquired"); } else { crm_warn("No quorum"); } quorum_app_callback = dispatch; pcmk__cluster_set_quorum(quorate != 0); rc = quorum_trackstart(pcmk_quorum_handle, CS_TRACK_CHANGES | CS_TRACK_CURRENT); if (rc != CS_OK) { crm_err("Could not setup Quorum API notifications: %d", rc); goto bail; } mainloop_add_fd("quorum", G_PRIORITY_HIGH, fd, dispatch, &quorum_fd_callbacks); pcmk__corosync_add_nodes(NULL); bail: if (rc != CS_OK) { quorum_finalize(pcmk_quorum_handle); } } /*! * \internal * \brief Connect to Corosync cluster layer * * \param[in,out] cluster Initialized cluster object to connect * * \return Standard Pacemaker return code */ int pcmk__corosync_connect(pcmk_cluster_t *cluster) { const enum pcmk_cluster_layer cluster_layer = pcmk_get_cluster_layer(); const char *cluster_layer_s = pcmk_cluster_layer_text(cluster_layer); pcmk__node_status_t *local_node = NULL; int rc = pcmk_rc_ok; pcmk__cluster_init_node_caches(); if (cluster_layer != pcmk_cluster_layer_corosync) { crm_err("Invalid cluster layer: %s " QB_XS " cluster_layer=%d", cluster_layer_s, cluster_layer); return EINVAL; } rc = pcmk__cpg_connect(cluster); if (rc != pcmk_rc_ok) { // Error message was logged by pcmk__cpg_connect() return rc; } crm_info("Connection to %s established", cluster_layer_s); cluster->priv->node_id = pcmk__cpg_local_nodeid(0); if (cluster->priv->node_id == 0) { crm_err("Could not determine local node ID"); return ENXIO; } cluster->priv->node_name = pcmk__cluster_node_name(0); if (cluster->priv->node_name == NULL) { crm_err("Could not determine local node name"); return ENXIO; } // Ensure local node always exists in peer cache local_node = pcmk__get_node(cluster->priv->node_id, cluster->priv->node_name, NULL, pcmk__node_search_cluster_member); cluster->priv->node_xml_id = pcmk__corosync_uuid(local_node); CRM_LOG_ASSERT(cluster->priv->node_xml_id != NULL); return pcmk_rc_ok; } /*! * \internal * \brief Check whether a Corosync cluster is active * * \return \c true if Corosync is found active, or \c false otherwise */ bool pcmk__corosync_is_active(void) { cmap_handle_t handle; int rc = pcmk__init_cmap(&handle); if (rc == CS_OK) { cmap_finalize(handle); return true; } crm_info("Failed to initialize the cmap API: %s (%d)", - pcmk__cs_err_str(rc), rc); + pcmk_rc_str(pcmk__corosync2rc(rc)), rc); return false; } /*! * \internal * \brief Check whether a Corosync cluster peer is active * * \param[in] node Node to check * * \return \c true if \p node is an active Corosync peer, or \c false otherwise */ bool pcmk__corosync_is_peer_active(const pcmk__node_status_t *node) { if (node == NULL) { crm_trace("Corosync peer inactive: NULL"); return false; } if (!pcmk__str_eq(node->state, PCMK_VALUE_MEMBER, pcmk__str_none)) { crm_trace("Corosync peer %s inactive: state=%s", node->name, node->state); return false; } if (!pcmk_is_set(node->processes, crm_proc_cpg)) { crm_trace("Corosync peer %s inactive " QB_XS " processes=%.16" PRIx32, node->name, node->processes); return false; } return true; } /*! * \internal * \brief Load Corosync node list (via CMAP) into peer cache and optionally XML * * \param[in,out] xml_parent If not NULL, add entry here for each node * * \return true if any nodes were found, false otherwise */ bool pcmk__corosync_add_nodes(xmlNode *xml_parent) { int lpc = 0; cs_error_t rc = CS_OK; int retries = 0; bool any = false; cmap_handle_t cmap_handle; int fd = -1; uid_t found_uid = 0; gid_t found_gid = 0; pid_t found_pid = 0; int rv; do { rc = pcmk__init_cmap(&cmap_handle); if (rc != CS_OK) { retries++; crm_debug("API connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries); sleep(retries); } } while (retries < 5 && rc != CS_OK); if (rc != CS_OK) { crm_warn("Could not connect to Cluster Configuration Database API, error %d", rc); return false; } rc = cmap_fd_get(cmap_handle, &fd); if (rc != CS_OK) { crm_err("Could not obtain the CMAP API connection: %s (%d)", cs_strerror(rc), rc); goto bail; } /* CMAP provider run as root (in given user namespace, anyway)? */ if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, &found_uid, &found_gid))) { crm_err("CMAP provider is not authentic:" " process %lld (uid: %lld, gid: %lld)", (long long) PCMK__SPECIAL_PID_AS_0(found_pid), (long long) found_uid, (long long) found_gid); goto bail; } else if (rv < 0) { crm_err("Could not verify authenticity of CMAP provider: %s (%d)", strerror(-rv), -rv); goto bail; } pcmk__cluster_init_node_caches(); crm_trace("Initializing Corosync node list"); for (lpc = 0; TRUE; lpc++) { uint32_t nodeid = 0; char *name = NULL; char *key = NULL; key = crm_strdup_printf("nodelist.node.%d.nodeid", lpc); rc = cmap_get_uint32(cmap_handle, key, &nodeid); free(key); if (rc != CS_OK) { break; } name = pcmk__corosync_name(cmap_handle, nodeid); if (name != NULL) { GHashTableIter iter; pcmk__node_status_t *node = NULL; g_hash_table_iter_init(&iter, pcmk__peer_cache); while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) { if ((node != NULL) && (node->cluster_layer_id > 0) && (node->cluster_layer_id != nodeid) && pcmk__str_eq(node->name, name, pcmk__str_casei)) { crm_crit("Nodes %" PRIu32 " and %" PRIu32 " share the " "same name '%s': shutting down", node->cluster_layer_id, nodeid, name); crm_exit(CRM_EX_FATAL); } } } if (nodeid > 0 || name != NULL) { crm_trace("Initializing node[%d] %u = %s", lpc, nodeid, name); pcmk__get_node(nodeid, name, NULL, pcmk__node_search_cluster_member); } if (nodeid > 0 && name != NULL) { any = true; if (xml_parent) { xmlNode *node = pcmk__xe_create(xml_parent, PCMK_XE_NODE); pcmk__xe_set_ll(node, PCMK_XA_ID, (long long) nodeid); pcmk__xe_set(node, PCMK_XA_UNAME, name); } } free(name); } bail: cmap_finalize(cmap_handle); return any; } /*! * \internal * \brief Get cluster name from Corosync configuration (via CMAP) * * \return Newly allocated string with cluster name if configured, or NULL */ char * pcmk__corosync_cluster_name(void) { cmap_handle_t handle; char *cluster_name = NULL; cs_error_t rc = CS_OK; int fd = -1; uid_t found_uid = 0; gid_t found_gid = 0; pid_t found_pid = 0; int rv; rc = pcmk__init_cmap(&handle); if (rc != CS_OK) { crm_info("Failed to initialize the cmap API: %s (%d)", cs_strerror(rc), rc); return NULL; } rc = cmap_fd_get(handle, &fd); if (rc != CS_OK) { crm_err("Could not obtain the CMAP API connection: %s (%d)", cs_strerror(rc), rc); goto bail; } /* CMAP provider run as root (in given user namespace, anyway)? */ if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, &found_uid, &found_gid))) { crm_err("CMAP provider is not authentic:" " process %lld (uid: %lld, gid: %lld)", (long long) PCMK__SPECIAL_PID_AS_0(found_pid), (long long) found_uid, (long long) found_gid); goto bail; } else if (rv < 0) { crm_err("Could not verify authenticity of CMAP provider: %s (%d)", strerror(-rv), -rv); goto bail; } rc = cmap_get_string(handle, "totem.cluster_name", &cluster_name); if (rc != CS_OK) { crm_info("Cannot get totem.cluster_name: %s (%d)", cs_strerror(rc), rc); } else { crm_debug("cmap totem.cluster_name = '%s'", cluster_name); } bail: cmap_finalize(handle); return cluster_name; } /*! * \internal * \brief Check (via CMAP) whether Corosync configuration has a node list * * \return true if Corosync has node list, otherwise false */ bool pcmk__corosync_has_nodelist(void) { cs_error_t cs_rc = CS_OK; int retries = 0; cmap_handle_t cmap_handle; cmap_iter_handle_t iter_handle; char key_name[CMAP_KEYNAME_MAXLEN + 1]; int fd = -1; uid_t found_uid = 0; gid_t found_gid = 0; pid_t found_pid = 0; int rc = pcmk_ok; static bool got_result = false; static bool result = false; if (got_result) { return result; } // Connect to CMAP do { cs_rc = pcmk__init_cmap(&cmap_handle); if (cs_rc != CS_OK) { retries++; crm_debug("CMAP connection failed: %s (rc=%d, retrying in %ds)", cs_strerror(cs_rc), cs_rc, retries); sleep(retries); } } while ((retries < 5) && (cs_rc != CS_OK)); if (cs_rc != CS_OK) { crm_warn("Assuming Corosync does not have node list: " "CMAP connection failed (%s) " QB_XS " rc=%d", cs_strerror(cs_rc), cs_rc); return false; } // Get CMAP connection file descriptor cs_rc = cmap_fd_get(cmap_handle, &fd); if (cs_rc != CS_OK) { crm_warn("Assuming Corosync does not have node list: " "CMAP unusable (%s) " QB_XS " rc=%d", cs_strerror(cs_rc), cs_rc); goto bail; } // Check whether CMAP connection is authentic (i.e. provided by root) rc = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid, &found_uid, &found_gid); if (rc == 0) { crm_warn("Assuming Corosync does not have node list: " "CMAP provider is inauthentic " QB_XS " pid=%lld uid=%lld gid=%lld", (long long) PCMK__SPECIAL_PID_AS_0(found_pid), (long long) found_uid, (long long) found_gid); goto bail; } else if (rc < 0) { crm_warn("Assuming Corosync does not have node list: " "Could not verify CMAP authenticity (%s) " QB_XS " rc=%d", pcmk_strerror(rc), rc); goto bail; } // Check whether nodelist section is presetn cs_rc = cmap_iter_init(cmap_handle, "nodelist", &iter_handle); if (cs_rc != CS_OK) { crm_warn("Assuming Corosync does not have node list: " "CMAP not readable (%s) " QB_XS " rc=%d", cs_strerror(cs_rc), cs_rc); goto bail; } cs_rc = cmap_iter_next(cmap_handle, iter_handle, key_name, NULL, NULL); if (cs_rc == CS_OK) { result = true; } cmap_iter_finalize(cmap_handle, iter_handle); got_result = true; crm_debug("Corosync %s node list", (result? "has" : "does not have")); bail: cmap_finalize(cmap_handle); return result; } diff --git a/lib/cluster/cpg.c b/lib/cluster/cpg.c index e2fdfc2fdd..f3fad13458 100644 --- a/lib/cluster/cpg.c +++ b/lib/cluster/cpg.c @@ -1,1050 +1,1050 @@ /* * Copyright 2004-2025 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #include #include #include // PRIu32 #include #include #include #include // uint32_t #include #include // size_t #include #include #include #include #include #include #include #include #include #include #include #include // PCMK__SPECIAL_PID #include #include #include "crmcluster_private.h" /* @TODO Once we can update the public API to require pcmk_cluster_t* in more * functions, we can ditch this in favor of cluster->cpg_handle. */ static cpg_handle_t pcmk_cpg_handle = 0; // @TODO These could be moved to pcmk_cluster_t* at that time as well static bool cpg_evicted = false; static GList *cs_message_queue = NULL; static int cs_message_timer = 0; /* @COMPAT Any changes to these structs (other than renames) will break all * rolling upgrades, and should be avoided if possible or done at a major * version bump if not */ struct pcmk__cpg_host_s { uint32_t id; uint32_t pid; gboolean local; // Unused but needed for compatibility enum pcmk_ipc_server type; // For logging only uint32_t size; char uname[MAX_NAME]; } __attribute__ ((packed)); typedef struct pcmk__cpg_host_s pcmk__cpg_host_t; struct pcmk__cpg_msg_s { struct qb_ipc_response_header header __attribute__ ((aligned(8))); uint32_t id; gboolean is_compressed; pcmk__cpg_host_t host; pcmk__cpg_host_t sender; uint32_t size; uint32_t compressed_size; /* 584 bytes */ char data[0]; } __attribute__ ((packed)); typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t; static void crm_cs_flush(gpointer data); #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size) #define cs_repeat(rc, counter, max, code) do { \ rc = code; \ if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \ counter++; \ crm_debug("Retrying operation after %ds", counter); \ sleep(counter); \ } else { \ break; \ } \ } while (counter < max) /*! * \internal * \brief Get the local Corosync node ID (via CPG) * * \param[in] handle CPG connection to use (or 0 to use new connection) * * \return Corosync ID of local node (or 0 if not known) */ uint32_t pcmk__cpg_local_nodeid(cpg_handle_t handle) { cs_error_t rc = CS_OK; int retries = 0; static uint32_t local_nodeid = 0; cpg_handle_t local_handle = handle; cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0}; int fd = -1; uid_t found_uid = 0; gid_t found_gid = 0; pid_t found_pid = 0; int rv = 0; if (local_nodeid != 0) { return local_nodeid; } if (handle == 0) { crm_trace("Creating connection"); cs_repeat(rc, retries, 5, cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *) &cpg_model_info, NULL)); if (rc != CS_OK) { crm_err("Could not connect to the CPG API: %s (%d)", cs_strerror(rc), rc); return 0; } rc = cpg_fd_get(local_handle, &fd); if (rc != CS_OK) { crm_err("Could not obtain the CPG API connection: %s (%d)", cs_strerror(rc), rc); goto bail; } // CPG provider run as root (at least in given user namespace)? rv = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid, &found_uid, &found_gid); if (rv == 0) { crm_err("CPG provider is not authentic:" " process %lld (uid: %lld, gid: %lld)", (long long) PCMK__SPECIAL_PID_AS_0(found_pid), (long long) found_uid, (long long) found_gid); goto bail; } else if (rv < 0) { crm_err("Could not verify authenticity of CPG provider: %s (%d)", strerror(-rv), -rv); goto bail; } } if (rc == CS_OK) { retries = 0; crm_trace("Performing lookup"); cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid)); } if (rc != CS_OK) { crm_err("Could not get local node id from the CPG API: %s (%d)", - pcmk__cs_err_str(rc), rc); + pcmk_rc_str(pcmk__corosync2rc(rc)), rc); } bail: if (handle == 0) { crm_trace("Closing connection"); cpg_finalize(local_handle); } crm_debug("Local nodeid is %u", local_nodeid); return local_nodeid; } /*! * \internal * \brief Callback function for Corosync message queue timer * * \param[in] data CPG handle * * \return FALSE (to indicate to glib that timer should not be removed) */ static gboolean crm_cs_flush_cb(gpointer data) { cs_message_timer = 0; crm_cs_flush(data); return FALSE; } // Send no more than this many CPG messages in one flush #define CS_SEND_MAX 200 /*! * \internal * \brief Send messages in Corosync CPG message queue * * \param[in] data CPG handle */ static void crm_cs_flush(gpointer data) { unsigned int sent = 0; guint queue_len = 0; cs_error_t rc = 0; cpg_handle_t *handle = (cpg_handle_t *) data; if (*handle == 0) { crm_trace("Connection is dead"); return; } queue_len = g_list_length(cs_message_queue); if (((queue_len % 1000) == 0) && (queue_len > 1)) { crm_err("CPG queue has grown to %d", queue_len); } else if (queue_len == CS_SEND_MAX) { crm_warn("CPG queue has grown to %d", queue_len); } if (cs_message_timer != 0) { /* There is already a timer, wait until it goes off */ crm_trace("Timer active %d", cs_message_timer); return; } while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) { struct iovec *iov = cs_message_queue->data; rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1); if (rc != CS_OK) { break; } sent++; crm_trace("CPG message sent, size=%zu", iov->iov_len); cs_message_queue = g_list_remove(cs_message_queue, iov); free(iov->iov_base); free(iov); } queue_len -= sent; do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE, "Sent %u CPG message%s (%d still queued): %s (rc=%d)", - sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc), - (int) rc); + sent, pcmk__plural_s(sent), queue_len, + pcmk_rc_str(pcmk__corosync2rc(rc)), (int) rc); if (cs_message_queue) { uint32_t delay_ms = 100; if (rc != CS_OK) { /* Proportionally more if sending failed but cap at 1s */ delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len)); } cs_message_timer = pcmk__create_timer(delay_ms, crm_cs_flush_cb, data); } } /*! * \internal * \brief Dispatch function for CPG handle * * \param[in,out] user_data Cluster object * * \return 0 on success, -1 on error (per mainloop_io_t interface) */ static int pcmk_cpg_dispatch(gpointer user_data) { cs_error_t rc = CS_OK; pcmk_cluster_t *cluster = (pcmk_cluster_t *) user_data; rc = cpg_dispatch(cluster->priv->cpg_handle, CS_DISPATCH_ONE); if (rc != CS_OK) { crm_err("Connection to the CPG API failed: %s (%d)", - pcmk__cs_err_str(rc), rc); + pcmk_rc_str(pcmk__corosync2rc(rc)), rc); cpg_finalize(cluster->priv->cpg_handle); cluster->priv->cpg_handle = 0; return -1; } else if (cpg_evicted) { crm_err("Evicted from CPG membership"); return -1; } return 0; } static inline const char * ais_dest(const pcmk__cpg_host_t *host) { return (host->size > 0)? host->uname : ""; } static inline const char * msg_type2text(enum pcmk_ipc_server type) { const char *name = pcmk__server_message_type(type); return pcmk__s(name, "unknown"); } /*! * \internal * \brief Check whether a Corosync CPG message is valid * * \param[in] msg Corosync CPG message to check * * \return true if \p msg is valid, otherwise false */ static bool check_message_sanity(const pcmk__cpg_msg_t *msg) { int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t); if (payload_size < 1) { crm_err("%sCPG message %d from %s invalid: " "Claimed size of %d bytes is too small " QB_XS " from %s[%u] to %s@%s", (msg->is_compressed? "Compressed " : ""), msg->id, ais_dest(&(msg->sender)), (int) msg->header.size, msg_type2text(msg->sender.type), msg->sender.pid, msg_type2text(msg->host.type), ais_dest(&(msg->host))); return false; } if (msg->header.error != CS_OK) { crm_err("%sCPG message %d from %s invalid: " "Sender indicated error %d " QB_XS " from %s[%u] to %s@%s", (msg->is_compressed? "Compressed " : ""), msg->id, ais_dest(&(msg->sender)), msg->header.error, msg_type2text(msg->sender.type), msg->sender.pid, msg_type2text(msg->host.type), ais_dest(&(msg->host))); return false; } if (msg_data_len(msg) != payload_size) { crm_err("%sCPG message %d from %s invalid: " "Total size %d inconsistent with payload size %d " QB_XS " from %s[%u] to %s@%s", (msg->is_compressed? "Compressed " : ""), msg->id, ais_dest(&(msg->sender)), (int) msg->header.size, (int) msg_data_len(msg), msg_type2text(msg->sender.type), msg->sender.pid, msg_type2text(msg->host.type), ais_dest(&(msg->host))); return false; } if (!msg->is_compressed && /* msg->size != (strlen(msg->data) + 1) would be a stronger check, * but checking the last byte or two should be quick */ (((msg->size > 1) && (msg->data[msg->size - 2] == '\0')) || (msg->data[msg->size - 1] != '\0'))) { crm_err("CPG message %d from %s invalid: " "Payload does not end at byte %" PRIu32 " " QB_XS " from %s[%u] to %s@%s", msg->id, ais_dest(&(msg->sender)), msg->size, msg_type2text(msg->sender.type), msg->sender.pid, msg_type2text(msg->host.type), ais_dest(&(msg->host))); return false; } crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s", (int) msg->header.size, (msg->is_compressed? "compressed " : ""), msg->id, msg_type2text(msg->sender.type), msg->sender.pid, ais_dest(&(msg->sender)), msg_type2text(msg->host.type), ais_dest(&(msg->host))); return true; } /*! * \internal * \brief Extract text data from a Corosync CPG message * * \param[in] handle CPG connection (to get local node ID if not known) * \param[in] sender_id Corosync ID of node that sent message * \param[in] pid Process ID of message sender (for logging only) * \param[in,out] content CPG message * \param[out] from If not \c NULL, will be set to sender uname * (valid for the lifetime of \p content) * * \return Newly allocated string with message data, or NULL for errors and * messages not intended for the local node * * \note The caller is responsible for freeing the return value using \c free(). */ char * pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid, void *content, const char **from) { char *data = NULL; pcmk__cpg_msg_t *msg = content; if (from != NULL) { *from = NULL; } if (handle != 0) { uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle); const char *local_name = pcmk__cluster_local_node_name(); // Update or validate message sender ID if (msg->sender.id == 0) { msg->sender.id = sender_id; } else if (msg->sender.id != sender_id) { crm_warn("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32 ": claimed ID %" PRIu32, sender_id, pid, msg->sender.id); return NULL; } // Ignore messages that aren't for the local node if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) { crm_trace("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32 ": for ID %" PRIu32 " not %" PRIu32, sender_id, pid, msg->host.id, local_nodeid); return NULL; } if ((msg->host.size > 0) && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) { crm_trace("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32 ": for name %s not %s", sender_id, pid, msg->host.uname, local_name); return NULL; } // Add sender name if not in original message if (msg->sender.size == 0) { const pcmk__node_status_t *peer = pcmk__get_node(sender_id, NULL, NULL, pcmk__node_search_cluster_member); if (peer->name == NULL) { crm_debug("Received CPG message from node with ID %" PRIu32 " but its name is unknown", sender_id); } else { crm_debug("Updating name of CPG message sender with ID %" PRIu32 " to %s", sender_id, peer->name); msg->sender.size = strlen(peer->name); memset(msg->sender.uname, 0, MAX_NAME); memcpy(msg->sender.uname, peer->name, msg->sender.size); } } } // Ensure sender is in peer cache (though it should already be) pcmk__get_node(msg->sender.id, msg->sender.uname, NULL, pcmk__node_search_cluster_member); if (from != NULL) { *from = msg->sender.uname; } if (!check_message_sanity(msg)) { return NULL; } if (msg->is_compressed && (msg->size > 0)) { int rc = BZ_OK; unsigned int new_size = msg->size + 1; char *uncompressed = pcmk__assert_alloc(1, new_size); rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0); rc = pcmk__bzlib2rc(rc); if ((rc == pcmk_rc_ok) && (msg->size != new_size)) { // libbz2 bug? rc = pcmk_rc_compression; } if (rc != pcmk_rc_ok) { free(uncompressed); crm_warn("Ignoring compressed CPG message %d from %s (ID %" PRIu32 " PID %" PRIu32 "): %s", msg->id, ais_dest(&(msg->sender)), sender_id, pid, pcmk_rc_str(rc)); return NULL; } data = uncompressed; } else { data = pcmk__str_copy(msg->data); } crm_trace("Received %sCPG message %d from %s (ID %" PRIu32 " PID %" PRIu32 "): %.40s...", (msg->is_compressed? "compressed " : ""), msg->id, ais_dest(&(msg->sender)), sender_id, pid, msg->data); return data; } /*! * \internal * \brief Compare cpg_address objects by node ID * * \param[in] first First cpg_address structure to compare * \param[in] second Second cpg_address structure to compare * * \return Negative number if first's node ID is lower, * positive number if first's node ID is greater, * or 0 if both node IDs are equal */ static int cmp_member_list_nodeid(const void *first, const void *second) { const struct cpg_address *const a = *((const struct cpg_address **) first), *const b = *((const struct cpg_address **) second); if (a->nodeid < b->nodeid) { return -1; } else if (a->nodeid > b->nodeid) { return 1; } /* don't bother with "reason" nor "pid" */ return 0; } /*! * \internal * \brief Get a readable string equivalent of a cpg_reason_t value * * \param[in] reason CPG reason value * * \return Readable string suitable for logging */ static const char * cpgreason2str(cpg_reason_t reason) { switch (reason) { case CPG_REASON_JOIN: return " via cpg_join"; case CPG_REASON_LEAVE: return " via cpg_leave"; case CPG_REASON_NODEDOWN: return " via cluster exit"; case CPG_REASON_NODEUP: return " via cluster join"; case CPG_REASON_PROCDOWN: return " for unknown reason"; default: break; } return ""; } /*! * \internal * \brief Get a log-friendly node name * * \param[in] peer Node to check * * \return Node's uname, or readable string if not known */ static inline const char * peer_name(const pcmk__node_status_t *peer) { return (peer != NULL)? pcmk__s(peer->name, "peer node") : "unknown node"; } /*! * \internal * \brief Process a CPG peer's leaving the cluster * * \param[in] cpg_group_name CPG group name (for logging) * \param[in] event_counter Event number (for logging) * \param[in] local_nodeid Node ID of local node * \param[in] cpg_peer CPG peer that left * \param[in] sorted_member_list List of remaining members, qsort()-ed by ID * \param[in] member_list_entries Number of entries in \p sorted_member_list */ static void node_left(const char *cpg_group_name, int event_counter, uint32_t local_nodeid, const struct cpg_address *cpg_peer, const struct cpg_address **sorted_member_list, size_t member_list_entries) { pcmk__node_status_t *peer = pcmk__search_node_caches(cpg_peer->nodeid, NULL, NULL, pcmk__node_search_cluster_member); const struct cpg_address **rival = NULL; /* Most CPG-related Pacemaker code assumes that only one process on a node * can be in the process group, but Corosync does not impose this * limitation, and more than one can be a member in practice due to a * daemon attempting to start while another instance is already running. * * Check for any such duplicate instances, because we don't want to process * their leaving as if our actual peer left. If the peer that left still has * an entry in sorted_member_list (with a different PID), we will ignore the * leaving. * * @TODO Track CPG members' PIDs so we can tell exactly who left. */ if (peer != NULL) { rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries, sizeof(const struct cpg_address *), cmp_member_list_nodeid); } if (rival == NULL) { crm_info("Group %s event %d: %s (node %u pid %u) left%s", cpg_group_name, event_counter, peer_name(peer), cpg_peer->nodeid, cpg_peer->pid, cpgreason2str(cpg_peer->reason)); if (peer != NULL) { crm_update_peer_proc(__func__, peer, crm_proc_cpg, PCMK_VALUE_OFFLINE); } } else if (cpg_peer->nodeid == local_nodeid) { crm_warn("Group %s event %d: duplicate local pid %u left%s", cpg_group_name, event_counter, cpg_peer->pid, cpgreason2str(cpg_peer->reason)); } else { crm_warn("Group %s event %d: " "%s (node %u) duplicate pid %u left%s (%u remains)", cpg_group_name, event_counter, peer_name(peer), cpg_peer->nodeid, cpg_peer->pid, cpgreason2str(cpg_peer->reason), (*rival)->pid); } } /*! * \internal * \brief Handle a CPG configuration change event * * \param[in] handle CPG connection * \param[in] group_name CPG group name * \param[in] member_list List of current CPG members * \param[in] member_list_entries Number of entries in \p member_list * \param[in] left_list List of CPG members that left * \param[in] left_list_entries Number of entries in \p left_list * \param[in] joined_list List of CPG members that joined * \param[in] joined_list_entries Number of entries in \p joined_list * * \note This is of type \c cpg_confchg_fn_t, intended to be used in a * \c cpg_callbacks_t object. */ void pcmk__cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries) { static int counter = 0; bool found = false; uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle); const struct cpg_address **sorted = NULL; sorted = pcmk__assert_alloc(member_list_entries, sizeof(const struct cpg_address *)); for (size_t iter = 0; iter < member_list_entries; iter++) { sorted[iter] = member_list + iter; } // So that the cross-matching of multiply-subscribed nodes is then cheap qsort(sorted, member_list_entries, sizeof(const struct cpg_address *), cmp_member_list_nodeid); for (int i = 0; i < left_list_entries; i++) { node_left(group_name->value, counter, local_nodeid, &left_list[i], sorted, member_list_entries); } free(sorted); sorted = NULL; for (int i = 0; i < joined_list_entries; i++) { crm_info("Group %s event %d: node %u pid %u joined%s", group_name->value, counter, joined_list[i].nodeid, joined_list[i].pid, cpgreason2str(joined_list[i].reason)); } for (int i = 0; i < member_list_entries; i++) { pcmk__node_status_t *peer = pcmk__get_node(member_list[i].nodeid, NULL, NULL, pcmk__node_search_cluster_member); if (member_list[i].nodeid == local_nodeid && member_list[i].pid != getpid()) { // See the note in node_left() crm_warn("Group %s event %d: detected duplicate local pid %u", group_name->value, counter, member_list[i].pid); continue; } crm_info("Group %s event %d: %s (node %u pid %u) is member", group_name->value, counter, peer_name(peer), member_list[i].nodeid, member_list[i].pid); /* If the caller left auto-reaping enabled, this will also update the * state to member. */ peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg, PCMK_VALUE_ONLINE); if (peer && peer->state && strcmp(peer->state, PCMK_VALUE_MEMBER)) { /* The node is a CPG member, but we currently think it's not a * cluster member. This is possible only if auto-reaping was * disabled. The node may be joining, and we happened to get the CPG * notification before the quorum notification; or the node may have * just died, and we are processing its final messages; or a bug * has affected the peer cache. */ time_t now = time(NULL); if (peer->when_lost == 0) { // Track when we first got into this contradictory state peer->when_lost = now; } else if (now > (peer->when_lost + 60)) { // If it persists for more than a minute, update the state crm_warn("Node %u is member of group %s but was believed " "offline", member_list[i].nodeid, group_name->value); pcmk__update_peer_state(__func__, peer, PCMK_VALUE_MEMBER, 0); } } if (local_nodeid == member_list[i].nodeid) { found = true; } } if (!found) { crm_err("Local node was evicted from group %s", group_name->value); cpg_evicted = true; } counter++; } /*! * \brief Set the CPG deliver callback function for a cluster object * * \param[in,out] cluster Cluster object * \param[in] fn Deliver callback function to set * * \return Standard Pacemaker return code */ int pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn) { if (cluster == NULL) { return EINVAL; } cluster->cpg.cpg_deliver_fn = fn; return pcmk_rc_ok; } /*! * \brief Set the CPG config change callback function for a cluster object * * \param[in,out] cluster Cluster object * \param[in] fn Configuration change callback function to set * * \return Standard Pacemaker return code */ int pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn) { if (cluster == NULL) { return EINVAL; } cluster->cpg.cpg_confchg_fn = fn; return pcmk_rc_ok; } /*! * \brief Connect to Corosync CPG * * \param[in,out] cluster Initialized cluster object to connect * * \return Standard Pacemaker return code */ int pcmk__cpg_connect(pcmk_cluster_t *cluster) { cs_error_t rc; int fd = -1; int retries = 0; uint32_t id = 0; pcmk__node_status_t *peer = NULL; cpg_handle_t handle = 0; const char *cpg_group_name = NULL; uid_t found_uid = 0; gid_t found_gid = 0; pid_t found_pid = 0; int rv; struct mainloop_fd_callbacks cpg_fd_callbacks = { .dispatch = pcmk_cpg_dispatch, .destroy = cluster->destroy, }; cpg_model_v1_data_t cpg_model_info = { .model = CPG_MODEL_V1, .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn, .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn, .cpg_totem_confchg_fn = NULL, .flags = 0, }; cpg_evicted = false; if (cluster->priv->server != pcmk_ipc_unknown) { cpg_group_name = pcmk__server_message_type(cluster->priv->server); } if (cpg_group_name == NULL) { /* The name will already be non-NULL for Pacemaker servers. If a * command-line tool or external caller connects to the cluster, * they will join this CPG group. */ cpg_group_name = pcmk__s(crm_system_name, "unknown"); } memset(cluster->priv->group.value, 0, 128); strncpy(cluster->priv->group.value, cpg_group_name, 127); cluster->priv->group.length = strlen(cluster->priv->group.value) + 1; cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL)); if (rc != CS_OK) { crm_err("Could not connect to the CPG API: %s (%d)", cs_strerror(rc), rc); goto bail; } rc = cpg_fd_get(handle, &fd); if (rc != CS_OK) { crm_err("Could not obtain the CPG API connection: %s (%d)", cs_strerror(rc), rc); goto bail; } /* CPG provider run as root (in given user namespace, anyway)? */ if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, &found_uid, &found_gid))) { crm_err("CPG provider is not authentic:" " process %lld (uid: %lld, gid: %lld)", (long long) PCMK__SPECIAL_PID_AS_0(found_pid), (long long) found_uid, (long long) found_gid); rc = CS_ERR_ACCESS; goto bail; } else if (rv < 0) { crm_err("Could not verify authenticity of CPG provider: %s (%d)", strerror(-rv), -rv); rc = CS_ERR_ACCESS; goto bail; } id = pcmk__cpg_local_nodeid(handle); if (id == 0) { crm_err("Could not get local node id from the CPG API"); goto bail; } cluster->priv->node_id = id; retries = 0; cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->priv->group)); if (rc != CS_OK) { crm_err("Could not join the CPG group '%s': %d", cpg_group_name, rc); goto bail; } pcmk_cpg_handle = handle; cluster->priv->cpg_handle = handle; mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks); bail: if (rc != CS_OK) { cpg_finalize(handle); // @TODO Map rc to more specific Pacemaker return code return ENOTCONN; } peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member); crm_update_peer_proc(__func__, peer, crm_proc_cpg, PCMK_VALUE_ONLINE); return pcmk_rc_ok; } /*! * \internal * \brief Disconnect from Corosync CPG * * \param[in,out] cluster Cluster object to disconnect */ void pcmk__cpg_disconnect(pcmk_cluster_t *cluster) { pcmk_cpg_handle = 0; if (cluster->priv->cpg_handle != 0) { crm_trace("Disconnecting CPG"); cpg_leave(cluster->priv->cpg_handle, &cluster->priv->group); cpg_finalize(cluster->priv->cpg_handle); cluster->priv->cpg_handle = 0; } else { crm_info("No CPG connection"); } } /*! * \internal * \brief Send string data via Corosync CPG * * \param[in] data Data to send * \param[in] node Cluster node to send message to * \param[in] dest Type of message to send * * \return \c true on success, or \c false otherwise */ static bool send_cpg_text(const char *data, const pcmk__node_status_t *node, enum pcmk_ipc_server dest) { static int msg_id = 0; static int local_pid = 0; static int local_name_len = 0; static const char *local_name = NULL; char *target = NULL; struct iovec *iov; pcmk__cpg_msg_t *msg = NULL; if (local_name == NULL) { local_name = pcmk__cluster_local_node_name(); } if ((local_name_len == 0) && (local_name != NULL)) { local_name_len = strlen(local_name); } if (data == NULL) { data = ""; } if (local_pid == 0) { local_pid = getpid(); } msg = pcmk__assert_alloc(1, sizeof(pcmk__cpg_msg_t)); msg_id++; msg->id = msg_id; msg->header.error = CS_OK; msg->host.type = dest; if (node != NULL) { if (node->name != NULL) { target = pcmk__str_copy(node->name); msg->host.size = strlen(node->name); memset(msg->host.uname, 0, MAX_NAME); memcpy(msg->host.uname, node->name, msg->host.size); } else { target = crm_strdup_printf("%" PRIu32, node->cluster_layer_id); } msg->host.id = node->cluster_layer_id; } else { target = pcmk__str_copy("all"); } msg->sender.id = 0; msg->sender.type = pcmk__parse_server(crm_system_name); msg->sender.pid = local_pid; msg->sender.size = local_name_len; memset(msg->sender.uname, 0, MAX_NAME); if ((local_name != NULL) && (msg->sender.size != 0)) { memcpy(msg->sender.uname, local_name, msg->sender.size); } msg->size = 1 + strlen(data); msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size; if (msg->size < PCMK__BZ2_THRESHOLD) { msg = pcmk__realloc(msg, msg->header.size); memcpy(msg->data, data, msg->size); } else { char *compressed = NULL; unsigned int new_size = 0; if (pcmk__compress(data, (unsigned int) msg->size, 0, &compressed, &new_size) == pcmk_rc_ok) { msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size; msg = pcmk__realloc(msg, msg->header.size); memcpy(msg->data, compressed, new_size); msg->is_compressed = TRUE; msg->compressed_size = new_size; } else { msg = pcmk__realloc(msg, msg->header.size); memcpy(msg->data, data, msg->size); } free(compressed); } iov = pcmk__assert_alloc(1, sizeof(struct iovec)); iov->iov_base = msg; iov->iov_len = msg->header.size; if (msg->compressed_size > 0) { crm_trace("Queueing CPG message %" PRIu32 " to %s " "(%zu bytes, %" PRIu32 " bytes compressed payload): %.200s", msg->id, target, iov->iov_len, msg->compressed_size, data); } else { crm_trace("Queueing CPG message %" PRIu32 " to %s " "(%zu bytes, %" PRIu32 " bytes payload): %.200s", msg->id, target, iov->iov_len, msg->size, data); } free(target); cs_message_queue = g_list_append(cs_message_queue, iov); crm_cs_flush(&pcmk_cpg_handle); return true; } /*! * \internal * \brief Send an XML message via Corosync CPG * * \param[in] msg XML message to send * \param[in] node Cluster node to send message to * \param[in] dest Type of message to send * * \return TRUE on success, otherwise FALSE */ bool pcmk__cpg_send_xml(const xmlNode *msg, const pcmk__node_status_t *node, enum pcmk_ipc_server dest) { bool rc = true; GString *data = g_string_sized_new(1024); pcmk__xml_string(msg, 0, data, 0); rc = send_cpg_text(data->str, node, dest); g_string_free(data, TRUE); return rc; }