diff --git a/include/crm/cluster.h b/include/crm/cluster.h index eee9ddf43c..9c51901a4f 100644 --- a/include/crm/cluster.h +++ b/include/crm/cluster.h @@ -1,134 +1,132 @@ /* * Copyright 2004-2024 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #ifndef PCMK__CRM_CLUSTER__H # define PCMK__CRM_CLUSTER__H # include // uint32_t, uint64_t # include // gboolean, GHashTable # include // xmlNode # include # include #ifdef __cplusplus extern "C" { #endif -# if SUPPORT_COROSYNC -# include -# endif +#if SUPPORT_COROSYNC +#include // cpg_callbacks_t +#endif // @COMPAT Make this internal when we can break API backward compatibility //! \deprecated Do not use (public access will be removed in a future release) extern GHashTable *crm_peer_cache; // @COMPAT Make this internal when we can break API backward compatibility //! \deprecated Do not use (public access will be removed in a future release) extern GHashTable *crm_remote_peer_cache; // @COMPAT Make this internal when we can break API backward compatibility //! \deprecated Do not use (public access will be removed in a future release) extern unsigned long long crm_peer_seq; // @COMPAT Make this internal when we can break API backward compatibility //! \deprecated Do not use (public access will be removed in a future release) #define CRM_NODE_LOST "lost" // @COMPAT Make this internal when we can break API backward compatibility //! \deprecated Do not use (public access will be removed in a future release) #define CRM_NODE_MEMBER "member" // @COMPAT Make this internal when we can break API backward compatibility //!@{ //! \deprecated Do not use (public access will be removed in a future release) enum crm_join_phase { /* @COMPAT: crm_join_nack_quiet can be replaced by * pcmk__node_status_t:user_data at a compatibility break */ //! Not allowed to join, but don't send a nack message crm_join_nack_quiet = -2, crm_join_nack = -1, crm_join_none = 0, crm_join_welcomed = 1, crm_join_integrated = 2, crm_join_finalized = 3, crm_join_confirmed = 4, }; //!@} //! \internal Do not use typedef struct pcmk__cluster_private pcmk__cluster_private_t; // Implementation of pcmk_cluster_t // @COMPAT Make contents internal when we can break API backward compatibility //!@{ //! \deprecated Do not use (public access will be removed in a future release) struct pcmk__cluster { /* @COMPAT Once all members are moved to pcmk__cluster_private_t, we can * make that the pcmk_cluster_t implementation and drop this struct * altogether, leaving pcmk_cluster_t as an opaque public type. */ //! \internal Do not use pcmk__cluster_private_t *priv; // NOTE: sbd (as of at least 1.5.2) uses this //! \deprecated Call pcmk_cluster_set_destroy_fn() to set this void (*destroy) (gpointer); #if SUPPORT_COROSYNC // NOTE: sbd (as of at least 1.5.2) uses this /*! * \deprecated Call pcmk_cpg_set_deliver_fn() and pcmk_cpg_set_confchg_fn() * to set these */ cpg_callbacks_t cpg; - - cpg_handle_t cpg_handle; #endif // SUPPORT_COROSYNC }; //!@} //! Connection to a cluster layer typedef struct pcmk__cluster pcmk_cluster_t; int pcmk_cluster_connect(pcmk_cluster_t *cluster); int pcmk_cluster_disconnect(pcmk_cluster_t *cluster); pcmk_cluster_t *pcmk_cluster_new(void); void pcmk_cluster_free(pcmk_cluster_t *cluster); int pcmk_cluster_set_destroy_fn(pcmk_cluster_t *cluster, void (*fn)(gpointer)); #if SUPPORT_COROSYNC int pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn); int pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn); #endif // SUPPORT_COROSYNC /*! * \enum pcmk_cluster_layer * \brief Types of cluster layer */ enum pcmk_cluster_layer { pcmk_cluster_layer_unknown = 1, //!< Unknown cluster layer pcmk_cluster_layer_invalid = 2, //!< Invalid cluster layer pcmk_cluster_layer_corosync = 32, //!< Corosync Cluster Engine }; enum pcmk_cluster_layer pcmk_get_cluster_layer(void); const char *pcmk_cluster_layer_text(enum pcmk_cluster_layer layer); #ifdef __cplusplus } #endif #if !defined(PCMK_ALLOW_DEPRECATED) || (PCMK_ALLOW_DEPRECATED == 1) #include #endif #endif diff --git a/include/crm/cluster/internal.h b/include/crm/cluster/internal.h index 3483292603..1b64822220 100644 --- a/include/crm/cluster/internal.h +++ b/include/crm/cluster/internal.h @@ -1,329 +1,331 @@ /* * Copyright 2004-2024 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #ifndef PCMK__CRM_CLUSTER_INTERNAL__H #define PCMK__CRM_CLUSTER_INTERNAL__H #include #include // uint32_t, uint64_t #include // gboolean #include #if SUPPORT_COROSYNC -#include // cpg_name +#include // cpg_name, cpg_handle_t #endif #ifdef __cplusplus extern "C" { #endif /*! * \internal * \enum pcmk__cluster_msg * \brief Types of message sent via the cluster layer */ enum pcmk__cluster_msg { pcmk__cluster_msg_unknown, pcmk__cluster_msg_attrd, pcmk__cluster_msg_based, pcmk__cluster_msg_controld, pcmk__cluster_msg_execd, pcmk__cluster_msg_fenced, }; enum crm_proc_flag { /* @COMPAT When pcmk__node_status_t:processes is made internal, we can merge * this into node flags or turn it into a boolean. Until then, in theory * something could depend on these particular numeric values. */ crm_proc_none = 0x00000001, // Cluster layers crm_proc_cpg = 0x04000000, }; /*! * \internal * \enum pcmk__node_status_flags * \brief Boolean flags for a \c pcmk__node_status_t object * * Some flags may not be related to status specifically. However, we keep these * separate from enum pcmk__node_flags because they're used with * different object types. */ enum pcmk__node_status_flags { /*! * Node is a Pacemaker Remote node and should not be considered for cluster * membership */ pcmk__node_status_remote = (UINT32_C(1) << 0), //! Node's cache entry is dirty pcmk__node_status_dirty = (UINT32_C(1) << 1), }; // Used with node cache search functions enum pcmk__node_search_flags { //! Does not affect search pcmk__node_search_none = 0, //! Search for cluster nodes from membership cache pcmk__node_search_cluster_member = (1 << 0), //! Search for remote nodes pcmk__node_search_remote = (1 << 1), //! Search for cluster member nodes and remote nodes pcmk__node_search_any = pcmk__node_search_cluster_member |pcmk__node_search_remote, //! Search for cluster nodes from CIB (as of last cache refresh) pcmk__node_search_cluster_cib = (1 << 2), }; /*! * \internal * \enum pcmk__node_update * \brief Type of update to a \c pcmk__node_status_t object */ enum pcmk__node_update { pcmk__node_update_name, //!< Node name updated pcmk__node_update_state, //!< Node connection state updated pcmk__node_update_processes, //!< Node process group membership updated }; //! Implementation of pcmk__cluster_private_t struct pcmk__cluster_private { // @TODO Drop and replace with per-daemon cluster-layer ID global variables? uint32_t node_id; //!< Local node ID at cluster layer // @TODO Drop and replace with per-daemon node name global variables? char *node_name; //!< Local node name at cluster layer #if SUPPORT_COROSYNC /* @TODO Make these members a separate struct and use void *cluster_data * here instead, to abstract the cluster layer further. */ struct cpg_name group; //!< Corosync CPG name + + cpg_handle_t cpg_handle; //!< Corosync CPG handle #endif // SUPPORT_COROSYNC }; //! Node status data (may be a cluster node or a Pacemaker Remote node) typedef struct pcmk__node_status { //! Node name as known to cluster layer, or Pacemaker Remote node name char *name; /* @COMPAT This is less than ideal since the value is not a valid XML ID * (for Corosync, it's the string equivalent of the node's numeric node ID, * but XML IDs can't start with a number) and the three elements should have * different IDs. * * Ideally, we would use something like node-NODEID, node_state-NODEID, and * transient_attributes-NODEID as the element IDs. Unfortunately changing it * would be impractical due to backward compatibility; older nodes in a * rolling upgrade will always write and expect the value in the old format. */ /*! * Value of the PCMK_XA_ID XML attribute to use with the node's * PCMK_XE_NODE, PCMK_XE_NODE_STATE, and PCMK_XE_TRANSIENT_ATTRIBUTES * XML elements in the CIB */ char *xml_id; char *state; // @TODO change to enum //! Group of enum pcmk__node_status_flags uint32_t flags; /*! * Most recent cluster membership in which node was seen (0 for Pacemaker * Remote nodes) */ uint64_t membership_id; uint32_t processes; // @TODO most not needed, merge into flags /* @TODO When we can break public API compatibility, we can make the rest of * these members separate structs and use void *cluster_data and * void *user_data here instead, to abstract the cluster layer further. */ // Only used by controller enum crm_join_phase join; char *expected; time_t peer_lost; char *conn_host; time_t when_member; // Since when node has been a cluster member time_t when_online; // Since when peer has been online in CPG /* @TODO The following are currently needed only by the Corosync stack. * Eventually consider moving them to a cluster-layer-specific data object. */ uint32_t cluster_layer_id; //!< Cluster-layer numeric node ID time_t when_lost; //!< When CPG membership was last lost } pcmk__node_status_t; /*! * \internal * \brief Return the process bit corresponding to the current cluster stack * * \return Process flag if detectable, otherwise 0 */ static inline uint32_t crm_get_cluster_proc(void) { switch (pcmk_get_cluster_layer()) { case pcmk_cluster_layer_corosync: return crm_proc_cpg; default: break; } return crm_proc_none; } /*! * \internal * \brief Get log-friendly string description of a Corosync return code * * \param[in] error Corosync return code * * \return Log-friendly string description corresponding to \p error */ static inline const char * pcmk__cs_err_str(int error) { # if SUPPORT_COROSYNC switch (error) { case CS_OK: return "OK"; case CS_ERR_LIBRARY: return "Library error"; case CS_ERR_VERSION: return "Version error"; case CS_ERR_INIT: return "Initialization error"; case CS_ERR_TIMEOUT: return "Timeout"; case CS_ERR_TRY_AGAIN: return "Try again"; case CS_ERR_INVALID_PARAM: return "Invalid parameter"; case CS_ERR_NO_MEMORY: return "No memory"; case CS_ERR_BAD_HANDLE: return "Bad handle"; case CS_ERR_BUSY: return "Busy"; case CS_ERR_ACCESS: return "Access error"; case CS_ERR_NOT_EXIST: return "Doesn't exist"; case CS_ERR_NAME_TOO_LONG: return "Name too long"; case CS_ERR_EXIST: return "Exists"; case CS_ERR_NO_SPACE: return "No space"; case CS_ERR_INTERRUPT: return "Interrupt"; case CS_ERR_NAME_NOT_FOUND: return "Name not found"; case CS_ERR_NO_RESOURCES: return "No resources"; case CS_ERR_NOT_SUPPORTED: return "Not supported"; case CS_ERR_BAD_OPERATION: return "Bad operation"; case CS_ERR_FAILED_OPERATION: return "Failed operation"; case CS_ERR_MESSAGE_ERROR: return "Message error"; case CS_ERR_QUEUE_FULL: return "Queue full"; case CS_ERR_QUEUE_NOT_AVAILABLE: return "Queue not available"; case CS_ERR_BAD_FLAGS: return "Bad flags"; case CS_ERR_TOO_BIG: return "Too big"; case CS_ERR_NO_SECTIONS: return "No sections"; } # endif return "Corosync error"; } # if SUPPORT_COROSYNC #if 0 /* This is the new way to do it, but we still support all Corosync 2 versions, * and this isn't always available. A better alternative here would be to check * for support in the configure script and enable this conditionally. */ #define pcmk__init_cmap(handle) cmap_initialize_map((handle), CMAP_MAP_ICMAP) #else #define pcmk__init_cmap(handle) cmap_initialize(handle) #endif char *pcmk__corosync_cluster_name(void); bool pcmk__corosync_add_nodes(xmlNode *xml_parent); void pcmk__cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries); char *pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid, void *content, const char **from); # endif const char *pcmk__cluster_node_uuid(pcmk__node_status_t *node); char *pcmk__cluster_node_name(uint32_t nodeid); const char *pcmk__cluster_local_node_name(void); const char *pcmk__node_name_from_uuid(const char *uuid); pcmk__node_status_t *crm_update_peer_proc(const char *source, pcmk__node_status_t *peer, uint32_t flag, const char *status); pcmk__node_status_t *pcmk__update_peer_state(const char *source, pcmk__node_status_t *node, const char *state, uint64_t membership); void pcmk__update_peer_expected(const char *source, pcmk__node_status_t *node, const char *expected); void pcmk__reap_unseen_nodes(uint64_t ring_id); void pcmk__corosync_quorum_connect(gboolean (*dispatch)(unsigned long long, gboolean), void (*destroy) (gpointer)); enum pcmk__cluster_msg pcmk__cluster_parse_msg_type(const char *text); bool pcmk__cluster_send_message(const pcmk__node_status_t *node, enum pcmk__cluster_msg service, const xmlNode *data); // Membership bool pcmk__cluster_has_quorum(void); void pcmk__cluster_init_node_caches(void); void pcmk__cluster_destroy_node_caches(void); void pcmk__cluster_set_autoreap(bool enable); void pcmk__cluster_set_status_callback(void (*dispatch)(enum pcmk__node_update, pcmk__node_status_t *, const void *)); bool pcmk__cluster_is_node_active(const pcmk__node_status_t *node); unsigned int pcmk__cluster_num_active_nodes(void); unsigned int pcmk__cluster_num_remote_nodes(void); pcmk__node_status_t *pcmk__cluster_lookup_remote_node(const char *node_name); void pcmk__cluster_forget_cluster_node(uint32_t id, const char *node_name); void pcmk__cluster_forget_remote_node(const char *node_name); pcmk__node_status_t *pcmk__search_node_caches(unsigned int id, const char *uname, uint32_t flags); void pcmk__purge_node_from_cache(const char *node_name, uint32_t node_id); void pcmk__refresh_node_caches_from_cib(xmlNode *cib); pcmk__node_status_t *pcmk__get_node(unsigned int id, const char *uname, const char *uuid, uint32_t flags); #ifdef __cplusplus } #endif #endif // PCMK__CRM_CLUSTER_INTERNAL__H diff --git a/lib/cluster/cpg.c b/lib/cluster/cpg.c index 2e9440edb7..53b206b1be 100644 --- a/lib/cluster/cpg.c +++ b/lib/cluster/cpg.c @@ -1,1060 +1,1060 @@ /* * Copyright 2004-2024 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #include #include #include // PRIu32 #include #include #include #include // uint32_t #include #include // size_t #include #include #include #include #include #include #include #include #include #include #include #include // PCMK__SPECIAL_PID #include #include #include "crmcluster_private.h" /* @TODO Once we can update the public API to require pcmk_cluster_t* in more * functions, we can ditch this in favor of cluster->cpg_handle. */ static cpg_handle_t pcmk_cpg_handle = 0; // @TODO These could be moved to pcmk_cluster_t* at that time as well static bool cpg_evicted = false; static GList *cs_message_queue = NULL; static int cs_message_timer = 0; struct pcmk__cpg_host_s { uint32_t id; uint32_t pid; enum pcmk__cluster_msg type; uint32_t size; char uname[MAX_NAME]; } __attribute__ ((packed)); typedef struct pcmk__cpg_host_s pcmk__cpg_host_t; struct pcmk__cpg_msg_s { struct qb_ipc_response_header header __attribute__ ((aligned(8))); uint32_t id; gboolean is_compressed; pcmk__cpg_host_t host; pcmk__cpg_host_t sender; uint32_t size; uint32_t compressed_size; /* 584 bytes */ char data[0]; } __attribute__ ((packed)); typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t; static void crm_cs_flush(gpointer data); #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size) #define cs_repeat(rc, counter, max, code) do { \ rc = code; \ if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \ counter++; \ crm_debug("Retrying operation after %ds", counter); \ sleep(counter); \ } else { \ break; \ } \ } while (counter < max) /*! * \internal * \brief Get the local Corosync node ID (via CPG) * * \param[in] handle CPG connection to use (or 0 to use new connection) * * \return Corosync ID of local node (or 0 if not known) */ uint32_t pcmk__cpg_local_nodeid(cpg_handle_t handle) { cs_error_t rc = CS_OK; int retries = 0; static uint32_t local_nodeid = 0; cpg_handle_t local_handle = handle; cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0}; int fd = -1; uid_t found_uid = 0; gid_t found_gid = 0; pid_t found_pid = 0; int rv = 0; if (local_nodeid != 0) { return local_nodeid; } if (handle == 0) { crm_trace("Creating connection"); cs_repeat(rc, retries, 5, cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *) &cpg_model_info, NULL)); if (rc != CS_OK) { crm_err("Could not connect to the CPG API: %s (%d)", cs_strerror(rc), rc); return 0; } rc = cpg_fd_get(local_handle, &fd); if (rc != CS_OK) { crm_err("Could not obtain the CPG API connection: %s (%d)", cs_strerror(rc), rc); goto bail; } // CPG provider run as root (at least in given user namespace)? rv = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid, &found_uid, &found_gid); if (rv == 0) { crm_err("CPG provider is not authentic:" " process %lld (uid: %lld, gid: %lld)", (long long) PCMK__SPECIAL_PID_AS_0(found_pid), (long long) found_uid, (long long) found_gid); goto bail; } else if (rv < 0) { crm_err("Could not verify authenticity of CPG provider: %s (%d)", strerror(-rv), -rv); goto bail; } } if (rc == CS_OK) { retries = 0; crm_trace("Performing lookup"); cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid)); } if (rc != CS_OK) { crm_err("Could not get local node id from the CPG API: %s (%d)", pcmk__cs_err_str(rc), rc); } bail: if (handle == 0) { crm_trace("Closing connection"); cpg_finalize(local_handle); } crm_debug("Local nodeid is %u", local_nodeid); return local_nodeid; } /*! * \internal * \brief Callback function for Corosync message queue timer * * \param[in] data CPG handle * * \return FALSE (to indicate to glib that timer should not be removed) */ static gboolean crm_cs_flush_cb(gpointer data) { cs_message_timer = 0; crm_cs_flush(data); return FALSE; } // Send no more than this many CPG messages in one flush #define CS_SEND_MAX 200 /*! * \internal * \brief Send messages in Corosync CPG message queue * * \param[in] data CPG handle */ static void crm_cs_flush(gpointer data) { unsigned int sent = 0; guint queue_len = 0; cs_error_t rc = 0; cpg_handle_t *handle = (cpg_handle_t *) data; if (*handle == 0) { crm_trace("Connection is dead"); return; } queue_len = g_list_length(cs_message_queue); if (((queue_len % 1000) == 0) && (queue_len > 1)) { crm_err("CPG queue has grown to %d", queue_len); } else if (queue_len == CS_SEND_MAX) { crm_warn("CPG queue has grown to %d", queue_len); } if (cs_message_timer != 0) { /* There is already a timer, wait until it goes off */ crm_trace("Timer active %d", cs_message_timer); return; } while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) { struct iovec *iov = cs_message_queue->data; rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1); if (rc != CS_OK) { break; } sent++; crm_trace("CPG message sent, size=%llu", (unsigned long long) iov->iov_len); cs_message_queue = g_list_remove(cs_message_queue, iov); free(iov->iov_base); free(iov); } queue_len -= sent; do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE, "Sent %u CPG message%s (%d still queued): %s (rc=%d)", sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc), (int) rc); if (cs_message_queue) { uint32_t delay_ms = 100; if (rc != CS_OK) { /* Proportionally more if sending failed but cap at 1s */ delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len)); } cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data); } } /*! * \internal * \brief Dispatch function for CPG handle * * \param[in,out] user_data Cluster object * * \return 0 on success, -1 on error (per mainloop_io_t interface) */ static int pcmk_cpg_dispatch(gpointer user_data) { cs_error_t rc = CS_OK; pcmk_cluster_t *cluster = (pcmk_cluster_t *) user_data; - rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE); + rc = cpg_dispatch(cluster->priv->cpg_handle, CS_DISPATCH_ONE); if (rc != CS_OK) { crm_err("Connection to the CPG API failed: %s (%d)", pcmk__cs_err_str(rc), rc); - cpg_finalize(cluster->cpg_handle); - cluster->cpg_handle = 0; + cpg_finalize(cluster->priv->cpg_handle); + cluster->priv->cpg_handle = 0; return -1; } else if (cpg_evicted) { crm_err("Evicted from CPG membership"); return -1; } return 0; } static inline const char * ais_dest(const pcmk__cpg_host_t *host) { return (host->size > 0)? host->uname : ""; } static inline const char * msg_type2text(enum pcmk__cluster_msg type) { switch (type) { case pcmk__cluster_msg_attrd: return "attrd"; case pcmk__cluster_msg_based: return "cib"; case pcmk__cluster_msg_controld: return "crmd"; case pcmk__cluster_msg_execd: return "lrmd"; case pcmk__cluster_msg_fenced: return "stonith-ng"; default: return "unknown"; } } /*! * \internal * \brief Check whether a Corosync CPG message is valid * * \param[in] msg Corosync CPG message to check * * \return true if \p msg is valid, otherwise false */ static bool check_message_sanity(const pcmk__cpg_msg_t *msg) { int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t); if (payload_size < 1) { crm_err("%sCPG message %d from %s invalid: " "Claimed size of %d bytes is too small " QB_XS " from %s[%u] to %s@%s", (msg->is_compressed? "Compressed " : ""), msg->id, ais_dest(&(msg->sender)), (int) msg->header.size, msg_type2text(msg->sender.type), msg->sender.pid, msg_type2text(msg->host.type), ais_dest(&(msg->host))); return false; } if (msg->header.error != CS_OK) { crm_err("%sCPG message %d from %s invalid: " "Sender indicated error %d " QB_XS " from %s[%u] to %s@%s", (msg->is_compressed? "Compressed " : ""), msg->id, ais_dest(&(msg->sender)), msg->header.error, msg_type2text(msg->sender.type), msg->sender.pid, msg_type2text(msg->host.type), ais_dest(&(msg->host))); return false; } if (msg_data_len(msg) != payload_size) { crm_err("%sCPG message %d from %s invalid: " "Total size %d inconsistent with payload size %d " QB_XS " from %s[%u] to %s@%s", (msg->is_compressed? "Compressed " : ""), msg->id, ais_dest(&(msg->sender)), (int) msg->header.size, (int) msg_data_len(msg), msg_type2text(msg->sender.type), msg->sender.pid, msg_type2text(msg->host.type), ais_dest(&(msg->host))); return false; } if (!msg->is_compressed && /* msg->size != (strlen(msg->data) + 1) would be a stronger check, * but checking the last byte or two should be quick */ (((msg->size > 1) && (msg->data[msg->size - 2] == '\0')) || (msg->data[msg->size - 1] != '\0'))) { crm_err("CPG message %d from %s invalid: " "Payload does not end at byte %llu " QB_XS " from %s[%u] to %s@%s", msg->id, ais_dest(&(msg->sender)), (unsigned long long) msg->size, msg_type2text(msg->sender.type), msg->sender.pid, msg_type2text(msg->host.type), ais_dest(&(msg->host))); return false; } crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s", (int) msg->header.size, (msg->is_compressed? "compressed " : ""), msg->id, msg_type2text(msg->sender.type), msg->sender.pid, ais_dest(&(msg->sender)), msg_type2text(msg->host.type), ais_dest(&(msg->host))); return true; } /*! * \internal * \brief Extract text data from a Corosync CPG message * * \param[in] handle CPG connection (to get local node ID if not known) * \param[in] sender_id Corosync ID of node that sent message * \param[in] pid Process ID of message sender (for logging only) * \param[in,out] content CPG message * \param[out] from If not \c NULL, will be set to sender uname * (valid for the lifetime of \p content) * * \return Newly allocated string with message data * * \note The caller is responsible for freeing the return value using \c free(). */ char * pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid, void *content, const char **from) { char *data = NULL; pcmk__cpg_msg_t *msg = content; if (handle != 0) { // Do filtering and field massaging uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle); const char *local_name = pcmk__cluster_local_node_name(); if ((msg->sender.id != 0) && (msg->sender.id != sender_id)) { crm_err("Nodeid mismatch from %" PRIu32 ".%" PRIu32 ": claimed nodeid=%" PRIu32, sender_id, pid, msg->sender.id); return NULL; } if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) { crm_trace("Not for us: %" PRIu32" != %" PRIu32, msg->host.id, local_nodeid); return NULL; } if ((msg->host.size > 0) && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) { crm_trace("Not for us: %s != %s", msg->host.uname, local_name); return NULL; } msg->sender.id = sender_id; if (msg->sender.size == 0) { const pcmk__node_status_t *peer = pcmk__get_node(sender_id, NULL, NULL, pcmk__node_search_cluster_member); if (peer->name == NULL) { crm_err("No node name for peer with nodeid=%u", sender_id); } else { crm_notice("Fixing node name for peer with nodeid=%u", sender_id); msg->sender.size = strlen(peer->name); memset(msg->sender.uname, 0, MAX_NAME); memcpy(msg->sender.uname, peer->name, msg->sender.size); } } } crm_trace("Got new%s message (size=%d, %d, %d)", msg->is_compressed ? " compressed" : "", msg_data_len(msg), msg->size, msg->compressed_size); if (from != NULL) { *from = msg->sender.uname; } if (msg->is_compressed && (msg->size > 0)) { int rc = BZ_OK; char *uncompressed = NULL; unsigned int new_size = msg->size + 1; if (!check_message_sanity(msg)) { goto badmsg; } crm_trace("Decompressing message data"); uncompressed = pcmk__assert_alloc(1, new_size); rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0); rc = pcmk__bzlib2rc(rc); if (rc != pcmk_rc_ok) { crm_err("Decompression failed: %s " QB_XS " rc=%d", pcmk_rc_str(rc), rc); free(uncompressed); goto badmsg; } CRM_ASSERT(new_size == msg->size); data = uncompressed; } else if (!check_message_sanity(msg)) { goto badmsg; } else { data = strdup(msg->data); } // Is this necessary? pcmk__get_node(msg->sender.id, msg->sender.uname, NULL, pcmk__node_search_cluster_member); crm_trace("Payload: %.200s", data); return data; badmsg: crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):" " min=%d, total=%d, size=%d, bz2_size=%d", msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t), msg->header.size, msg->size, msg->compressed_size); free(data); return NULL; } /*! * \internal * \brief Compare cpg_address objects by node ID * * \param[in] first First cpg_address structure to compare * \param[in] second Second cpg_address structure to compare * * \return Negative number if first's node ID is lower, * positive number if first's node ID is greater, * or 0 if both node IDs are equal */ static int cmp_member_list_nodeid(const void *first, const void *second) { const struct cpg_address *const a = *((const struct cpg_address **) first), *const b = *((const struct cpg_address **) second); if (a->nodeid < b->nodeid) { return -1; } else if (a->nodeid > b->nodeid) { return 1; } /* don't bother with "reason" nor "pid" */ return 0; } /*! * \internal * \brief Get a readable string equivalent of a cpg_reason_t value * * \param[in] reason CPG reason value * * \return Readable string suitable for logging */ static const char * cpgreason2str(cpg_reason_t reason) { switch (reason) { case CPG_REASON_JOIN: return " via cpg_join"; case CPG_REASON_LEAVE: return " via cpg_leave"; case CPG_REASON_NODEDOWN: return " via cluster exit"; case CPG_REASON_NODEUP: return " via cluster join"; case CPG_REASON_PROCDOWN: return " for unknown reason"; default: break; } return ""; } /*! * \internal * \brief Get a log-friendly node name * * \param[in] peer Node to check * * \return Node's uname, or readable string if not known */ static inline const char * peer_name(const pcmk__node_status_t *peer) { return (peer != NULL)? pcmk__s(peer->name, "peer node") : "unknown node"; } /*! * \internal * \brief Process a CPG peer's leaving the cluster * * \param[in] cpg_group_name CPG group name (for logging) * \param[in] event_counter Event number (for logging) * \param[in] local_nodeid Node ID of local node * \param[in] cpg_peer CPG peer that left * \param[in] sorted_member_list List of remaining members, qsort()-ed by ID * \param[in] member_list_entries Number of entries in \p sorted_member_list */ static void node_left(const char *cpg_group_name, int event_counter, uint32_t local_nodeid, const struct cpg_address *cpg_peer, const struct cpg_address **sorted_member_list, size_t member_list_entries) { pcmk__node_status_t *peer = pcmk__search_node_caches(cpg_peer->nodeid, NULL, pcmk__node_search_cluster_member); const struct cpg_address **rival = NULL; /* Most CPG-related Pacemaker code assumes that only one process on a node * can be in the process group, but Corosync does not impose this * limitation, and more than one can be a member in practice due to a * daemon attempting to start while another instance is already running. * * Check for any such duplicate instances, because we don't want to process * their leaving as if our actual peer left. If the peer that left still has * an entry in sorted_member_list (with a different PID), we will ignore the * leaving. * * @TODO Track CPG members' PIDs so we can tell exactly who left. */ if (peer != NULL) { rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries, sizeof(const struct cpg_address *), cmp_member_list_nodeid); } if (rival == NULL) { crm_info("Group %s event %d: %s (node %u pid %u) left%s", cpg_group_name, event_counter, peer_name(peer), cpg_peer->nodeid, cpg_peer->pid, cpgreason2str(cpg_peer->reason)); if (peer != NULL) { crm_update_peer_proc(__func__, peer, crm_proc_cpg, PCMK_VALUE_OFFLINE); } } else if (cpg_peer->nodeid == local_nodeid) { crm_warn("Group %s event %d: duplicate local pid %u left%s", cpg_group_name, event_counter, cpg_peer->pid, cpgreason2str(cpg_peer->reason)); } else { crm_warn("Group %s event %d: " "%s (node %u) duplicate pid %u left%s (%u remains)", cpg_group_name, event_counter, peer_name(peer), cpg_peer->nodeid, cpg_peer->pid, cpgreason2str(cpg_peer->reason), (*rival)->pid); } } /*! * \internal * \brief Handle a CPG configuration change event * * \param[in] handle CPG connection * \param[in] group_name CPG group name * \param[in] member_list List of current CPG members * \param[in] member_list_entries Number of entries in \p member_list * \param[in] left_list List of CPG members that left * \param[in] left_list_entries Number of entries in \p left_list * \param[in] joined_list List of CPG members that joined * \param[in] joined_list_entries Number of entries in \p joined_list * * \note This is of type \c cpg_confchg_fn_t, intended to be used in a * \c cpg_callbacks_t object. */ void pcmk__cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries) { static int counter = 0; bool found = false; uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle); const struct cpg_address **sorted = NULL; sorted = pcmk__assert_alloc(member_list_entries, sizeof(const struct cpg_address *)); for (size_t iter = 0; iter < member_list_entries; iter++) { sorted[iter] = member_list + iter; } // So that the cross-matching of multiply-subscribed nodes is then cheap qsort(sorted, member_list_entries, sizeof(const struct cpg_address *), cmp_member_list_nodeid); for (int i = 0; i < left_list_entries; i++) { node_left(group_name->value, counter, local_nodeid, &left_list[i], sorted, member_list_entries); } free(sorted); sorted = NULL; for (int i = 0; i < joined_list_entries; i++) { crm_info("Group %s event %d: node %u pid %u joined%s", group_name->value, counter, joined_list[i].nodeid, joined_list[i].pid, cpgreason2str(joined_list[i].reason)); } for (int i = 0; i < member_list_entries; i++) { pcmk__node_status_t *peer = pcmk__get_node(member_list[i].nodeid, NULL, NULL, pcmk__node_search_cluster_member); if (member_list[i].nodeid == local_nodeid && member_list[i].pid != getpid()) { // See the note in node_left() crm_warn("Group %s event %d: detected duplicate local pid %u", group_name->value, counter, member_list[i].pid); continue; } crm_info("Group %s event %d: %s (node %u pid %u) is member", group_name->value, counter, peer_name(peer), member_list[i].nodeid, member_list[i].pid); /* If the caller left auto-reaping enabled, this will also update the * state to member. */ peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg, PCMK_VALUE_ONLINE); if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) { /* The node is a CPG member, but we currently think it's not a * cluster member. This is possible only if auto-reaping was * disabled. The node may be joining, and we happened to get the CPG * notification before the quorum notification; or the node may have * just died, and we are processing its final messages; or a bug * has affected the peer cache. */ time_t now = time(NULL); if (peer->when_lost == 0) { // Track when we first got into this contradictory state peer->when_lost = now; } else if (now > (peer->when_lost + 60)) { // If it persists for more than a minute, update the state crm_warn("Node %u is member of group %s but was believed " "offline", member_list[i].nodeid, group_name->value); pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0); } } if (local_nodeid == member_list[i].nodeid) { found = true; } } if (!found) { crm_err("Local node was evicted from group %s", group_name->value); cpg_evicted = true; } counter++; } /*! * \brief Set the CPG deliver callback function for a cluster object * * \param[in,out] cluster Cluster object * \param[in] fn Deliver callback function to set * * \return Standard Pacemaker return code */ int pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn) { if (cluster == NULL) { return EINVAL; } cluster->cpg.cpg_deliver_fn = fn; return pcmk_rc_ok; } /*! * \brief Set the CPG config change callback function for a cluster object * * \param[in,out] cluster Cluster object * \param[in] fn Configuration change callback function to set * * \return Standard Pacemaker return code */ int pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn) { if (cluster == NULL) { return EINVAL; } cluster->cpg.cpg_confchg_fn = fn; return pcmk_rc_ok; } /*! * \brief Connect to Corosync CPG * * \param[in,out] cluster Initialized cluster object to connect * * \return Standard Pacemaker return code */ int pcmk__cpg_connect(pcmk_cluster_t *cluster) { cs_error_t rc; int fd = -1; int retries = 0; uint32_t id = 0; pcmk__node_status_t *peer = NULL; cpg_handle_t handle = 0; const char *message_name = pcmk__message_name(crm_system_name); uid_t found_uid = 0; gid_t found_gid = 0; pid_t found_pid = 0; int rv; struct mainloop_fd_callbacks cpg_fd_callbacks = { .dispatch = pcmk_cpg_dispatch, .destroy = cluster->destroy, }; cpg_model_v1_data_t cpg_model_info = { .model = CPG_MODEL_V1, .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn, .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn, .cpg_totem_confchg_fn = NULL, .flags = 0, }; cpg_evicted = false; cluster->priv->group.length = 0; cluster->priv->group.value[0] = 0; /* group.value is char[128] */ strncpy(cluster->priv->group.value, message_name, 127); cluster->priv->group.value[127] = 0; cluster->priv->group.length = QB_MIN(127, strlen(cluster->priv->group.value)); cluster->priv->group.length++; cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL)); if (rc != CS_OK) { crm_err("Could not connect to the CPG API: %s (%d)", cs_strerror(rc), rc); goto bail; } rc = cpg_fd_get(handle, &fd); if (rc != CS_OK) { crm_err("Could not obtain the CPG API connection: %s (%d)", cs_strerror(rc), rc); goto bail; } /* CPG provider run as root (in given user namespace, anyway)? */ if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, &found_uid, &found_gid))) { crm_err("CPG provider is not authentic:" " process %lld (uid: %lld, gid: %lld)", (long long) PCMK__SPECIAL_PID_AS_0(found_pid), (long long) found_uid, (long long) found_gid); rc = CS_ERR_ACCESS; goto bail; } else if (rv < 0) { crm_err("Could not verify authenticity of CPG provider: %s (%d)", strerror(-rv), -rv); rc = CS_ERR_ACCESS; goto bail; } id = pcmk__cpg_local_nodeid(handle); if (id == 0) { crm_err("Could not get local node id from the CPG API"); goto bail; } cluster->priv->node_id = id; retries = 0; cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->priv->group)); if (rc != CS_OK) { crm_err("Could not join the CPG group '%s': %d", message_name, rc); goto bail; } pcmk_cpg_handle = handle; - cluster->cpg_handle = handle; + cluster->priv->cpg_handle = handle; mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks); bail: if (rc != CS_OK) { cpg_finalize(handle); // @TODO Map rc to more specific Pacemaker return code return ENOTCONN; } peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member); crm_update_peer_proc(__func__, peer, crm_proc_cpg, PCMK_VALUE_ONLINE); return pcmk_rc_ok; } /*! * \internal * \brief Disconnect from Corosync CPG * * \param[in,out] cluster Cluster object to disconnect */ void pcmk__cpg_disconnect(pcmk_cluster_t *cluster) { pcmk_cpg_handle = 0; - if (cluster->cpg_handle != 0) { + if (cluster->priv->cpg_handle != 0) { crm_trace("Disconnecting CPG"); - cpg_leave(cluster->cpg_handle, &cluster->priv->group); - cpg_finalize(cluster->cpg_handle); - cluster->cpg_handle = 0; + cpg_leave(cluster->priv->cpg_handle, &cluster->priv->group); + cpg_finalize(cluster->priv->cpg_handle); + cluster->priv->cpg_handle = 0; } else { crm_info("No CPG connection"); } } /*! * \internal * \brief Send string data via Corosync CPG * * \param[in] data Data to send * \param[in] node Cluster node to send message to * \param[in] dest Type of message to send * * \return \c true on success, or \c false otherwise */ static bool send_cpg_text(const char *data, const pcmk__node_status_t *node, enum pcmk__cluster_msg dest) { static int msg_id = 0; static int local_pid = 0; static int local_name_len = 0; static const char *local_name = NULL; char *target = NULL; struct iovec *iov; pcmk__cpg_msg_t *msg = NULL; if (local_name == NULL) { local_name = pcmk__cluster_local_node_name(); } if ((local_name_len == 0) && (local_name != NULL)) { local_name_len = strlen(local_name); } if (data == NULL) { data = ""; } if (local_pid == 0) { local_pid = getpid(); } msg = pcmk__assert_alloc(1, sizeof(pcmk__cpg_msg_t)); msg_id++; msg->id = msg_id; msg->header.error = CS_OK; msg->host.type = dest; if (node != NULL) { if (node->name != NULL) { target = pcmk__str_copy(node->name); msg->host.size = strlen(node->name); memset(msg->host.uname, 0, MAX_NAME); memcpy(msg->host.uname, node->name, msg->host.size); } else { target = crm_strdup_printf("%" PRIu32, node->cluster_layer_id); } msg->host.id = node->cluster_layer_id; } else { target = pcmk__str_copy("all"); } msg->sender.id = 0; msg->sender.type = pcmk__cluster_parse_msg_type(crm_system_name); msg->sender.pid = local_pid; msg->sender.size = local_name_len; memset(msg->sender.uname, 0, MAX_NAME); if ((local_name != NULL) && (msg->sender.size != 0)) { memcpy(msg->sender.uname, local_name, msg->sender.size); } msg->size = 1 + strlen(data); msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size; if (msg->size < CRM_BZ2_THRESHOLD) { msg = pcmk__realloc(msg, msg->header.size); memcpy(msg->data, data, msg->size); } else { char *compressed = NULL; unsigned int new_size = 0; if (pcmk__compress(data, (unsigned int) msg->size, 0, &compressed, &new_size) == pcmk_rc_ok) { msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size; msg = pcmk__realloc(msg, msg->header.size); memcpy(msg->data, compressed, new_size); msg->is_compressed = TRUE; msg->compressed_size = new_size; } else { // cppcheck seems not to understand the abort logic in pcmk__realloc // cppcheck-suppress memleak msg = pcmk__realloc(msg, msg->header.size); memcpy(msg->data, data, msg->size); } free(compressed); } iov = pcmk__assert_alloc(1, sizeof(struct iovec)); iov->iov_base = msg; iov->iov_len = msg->header.size; if (msg->compressed_size > 0) { crm_trace("Queueing CPG message %u to %s " "(%llu bytes, %d bytes compressed payload): %.200s", msg->id, target, (unsigned long long) iov->iov_len, msg->compressed_size, data); } else { crm_trace("Queueing CPG message %u to %s " "(%llu bytes, %d bytes payload): %.200s", msg->id, target, (unsigned long long) iov->iov_len, msg->size, data); } free(target); cs_message_queue = g_list_append(cs_message_queue, iov); crm_cs_flush(&pcmk_cpg_handle); return true; } /*! * \internal * \brief Send an XML message via Corosync CPG * * \param[in] msg XML message to send * \param[in] node Cluster node to send message to * \param[in] dest Type of message to send * * \return TRUE on success, otherwise FALSE */ bool pcmk__cpg_send_xml(const xmlNode *msg, const pcmk__node_status_t *node, enum pcmk__cluster_msg dest) { bool rc = true; GString *data = g_string_sized_new(1024); pcmk__xml_string(msg, 0, data, 0); rc = send_cpg_text(data->str, node, dest); g_string_free(data, TRUE); return rc; }