Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F3686666
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
39 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/include/crm/common/servers_internal.h b/include/crm/common/servers_internal.h
index da8f65e0ad..bb4edc8739 100644
--- a/include/crm/common/servers_internal.h
+++ b/include/crm/common/servers_internal.h
@@ -1,26 +1,27 @@
/*
* Copyright 2024 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#ifndef PCMK__CRM_COMMON_SERVERS_INTERNAL__H
#define PCMK__CRM_COMMON_SERVERS_INTERNAL__H
#include <crm/common/ipc.h> // enum pcmk_ipc_server
#ifdef __cplusplus
extern "C" {
#endif
const char *pcmk__server_log_name(enum pcmk_ipc_server server);
const char *pcmk__server_ipc_name(enum pcmk_ipc_server server);
+const char *pcmk__server_message_type(enum pcmk_ipc_server server);
#ifdef __cplusplus
}
#endif
#endif // PCMK__CRM_COMMON_SERVERS_INTERNAL__H
diff --git a/lib/cluster/cpg.c b/lib/cluster/cpg.c
index 42c369b8c6..dae41f6bd2 100644
--- a/lib/cluster/cpg.c
+++ b/lib/cluster/cpg.c
@@ -1,1060 +1,1049 @@
/*
* Copyright 2004-2024 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <arpa/inet.h>
#include <inttypes.h> // PRIu32
#include <netdb.h>
#include <netinet/in.h>
#include <stdbool.h>
#include <stdint.h> // uint32_t
#include <sys/socket.h>
#include <sys/types.h> // size_t
#include <sys/utsname.h>
#include <bzlib.h>
#include <corosync/corodefs.h>
#include <corosync/corotypes.h>
#include <corosync/hdb.h>
#include <corosync/cpg.h>
#include <qb/qbipc_common.h>
#include <qb/qbipcc.h>
#include <qb/qbutil.h>
#include <crm/cluster/internal.h>
#include <crm/common/ipc.h>
#include <crm/common/ipc_internal.h> // PCMK__SPECIAL_PID
#include <crm/common/mainloop.h>
#include <crm/common/xml.h>
#include "crmcluster_private.h"
/* @TODO Once we can update the public API to require pcmk_cluster_t* in more
* functions, we can ditch this in favor of cluster->cpg_handle.
*/
static cpg_handle_t pcmk_cpg_handle = 0;
// @TODO These could be moved to pcmk_cluster_t* at that time as well
static bool cpg_evicted = false;
static GList *cs_message_queue = NULL;
static int cs_message_timer = 0;
struct pcmk__cpg_host_s {
uint32_t id;
uint32_t pid;
enum pcmk_ipc_server type; // For logging only
uint32_t size;
char uname[MAX_NAME];
} __attribute__ ((packed));
typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
struct pcmk__cpg_msg_s {
struct qb_ipc_response_header header __attribute__ ((aligned(8)));
uint32_t id;
gboolean is_compressed;
pcmk__cpg_host_t host;
pcmk__cpg_host_t sender;
uint32_t size;
uint32_t compressed_size;
/* 584 bytes */
char data[0];
} __attribute__ ((packed));
typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
static void crm_cs_flush(gpointer data);
#define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
#define cs_repeat(rc, counter, max, code) do { \
rc = code; \
if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
counter++; \
crm_debug("Retrying operation after %ds", counter); \
sleep(counter); \
} else { \
break; \
} \
} while (counter < max)
/*!
* \internal
* \brief Get the local Corosync node ID (via CPG)
*
* \param[in] handle CPG connection to use (or 0 to use new connection)
*
* \return Corosync ID of local node (or 0 if not known)
*/
uint32_t
pcmk__cpg_local_nodeid(cpg_handle_t handle)
{
cs_error_t rc = CS_OK;
int retries = 0;
static uint32_t local_nodeid = 0;
cpg_handle_t local_handle = handle;
cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
int fd = -1;
uid_t found_uid = 0;
gid_t found_gid = 0;
pid_t found_pid = 0;
int rv = 0;
if (local_nodeid != 0) {
return local_nodeid;
}
if (handle == 0) {
crm_trace("Creating connection");
cs_repeat(rc, retries, 5,
cpg_model_initialize(&local_handle, CPG_MODEL_V1,
(cpg_model_data_t *) &cpg_model_info,
NULL));
if (rc != CS_OK) {
crm_err("Could not connect to the CPG API: %s (%d)",
cs_strerror(rc), rc);
return 0;
}
rc = cpg_fd_get(local_handle, &fd);
if (rc != CS_OK) {
crm_err("Could not obtain the CPG API connection: %s (%d)",
cs_strerror(rc), rc);
goto bail;
}
// CPG provider run as root (at least in given user namespace)?
rv = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid,
&found_uid, &found_gid);
if (rv == 0) {
crm_err("CPG provider is not authentic:"
" process %lld (uid: %lld, gid: %lld)",
(long long) PCMK__SPECIAL_PID_AS_0(found_pid),
(long long) found_uid, (long long) found_gid);
goto bail;
} else if (rv < 0) {
crm_err("Could not verify authenticity of CPG provider: %s (%d)",
strerror(-rv), -rv);
goto bail;
}
}
if (rc == CS_OK) {
retries = 0;
crm_trace("Performing lookup");
cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
}
if (rc != CS_OK) {
crm_err("Could not get local node id from the CPG API: %s (%d)",
pcmk__cs_err_str(rc), rc);
}
bail:
if (handle == 0) {
crm_trace("Closing connection");
cpg_finalize(local_handle);
}
crm_debug("Local nodeid is %u", local_nodeid);
return local_nodeid;
}
/*!
* \internal
* \brief Callback function for Corosync message queue timer
*
* \param[in] data CPG handle
*
* \return FALSE (to indicate to glib that timer should not be removed)
*/
static gboolean
crm_cs_flush_cb(gpointer data)
{
cs_message_timer = 0;
crm_cs_flush(data);
return FALSE;
}
// Send no more than this many CPG messages in one flush
#define CS_SEND_MAX 200
/*!
* \internal
* \brief Send messages in Corosync CPG message queue
*
* \param[in] data CPG handle
*/
static void
crm_cs_flush(gpointer data)
{
unsigned int sent = 0;
guint queue_len = 0;
cs_error_t rc = 0;
cpg_handle_t *handle = (cpg_handle_t *) data;
if (*handle == 0) {
crm_trace("Connection is dead");
return;
}
queue_len = g_list_length(cs_message_queue);
if (((queue_len % 1000) == 0) && (queue_len > 1)) {
crm_err("CPG queue has grown to %d", queue_len);
} else if (queue_len == CS_SEND_MAX) {
crm_warn("CPG queue has grown to %d", queue_len);
}
if (cs_message_timer != 0) {
/* There is already a timer, wait until it goes off */
crm_trace("Timer active %d", cs_message_timer);
return;
}
while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
struct iovec *iov = cs_message_queue->data;
rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
if (rc != CS_OK) {
break;
}
sent++;
crm_trace("CPG message sent, size=%llu",
(unsigned long long) iov->iov_len);
cs_message_queue = g_list_remove(cs_message_queue, iov);
free(iov->iov_base);
free(iov);
}
queue_len -= sent;
do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
"Sent %u CPG message%s (%d still queued): %s (rc=%d)",
sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
(int) rc);
if (cs_message_queue) {
uint32_t delay_ms = 100;
if (rc != CS_OK) {
/* Proportionally more if sending failed but cap at 1s */
delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
}
cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
}
}
/*!
* \internal
* \brief Dispatch function for CPG handle
*
* \param[in,out] user_data Cluster object
*
* \return 0 on success, -1 on error (per mainloop_io_t interface)
*/
static int
pcmk_cpg_dispatch(gpointer user_data)
{
cs_error_t rc = CS_OK;
pcmk_cluster_t *cluster = (pcmk_cluster_t *) user_data;
rc = cpg_dispatch(cluster->priv->cpg_handle, CS_DISPATCH_ONE);
if (rc != CS_OK) {
crm_err("Connection to the CPG API failed: %s (%d)",
pcmk__cs_err_str(rc), rc);
cpg_finalize(cluster->priv->cpg_handle);
cluster->priv->cpg_handle = 0;
return -1;
} else if (cpg_evicted) {
crm_err("Evicted from CPG membership");
return -1;
}
return 0;
}
static inline const char *
ais_dest(const pcmk__cpg_host_t *host)
{
return (host->size > 0)? host->uname : "<all>";
}
static inline const char *
msg_type2text(enum pcmk_ipc_server type)
{
- switch (type) {
- case pcmk_ipc_attrd:
- return "attrd";
- case pcmk_ipc_based:
- return "cib";
- case pcmk_ipc_controld:
- return "crmd";
- case pcmk_ipc_execd:
- return "lrmd";
- case pcmk_ipc_fenced:
- return "stonith-ng";
- default:
- return "unknown";
- }
+ const char *name = pcmk__server_message_type(type);
+
+ return pcmk__s(name, "unknown");
}
/*!
* \internal
* \brief Check whether a Corosync CPG message is valid
*
* \param[in] msg Corosync CPG message to check
*
* \return true if \p msg is valid, otherwise false
*/
static bool
check_message_sanity(const pcmk__cpg_msg_t *msg)
{
int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
if (payload_size < 1) {
crm_err("%sCPG message %d from %s invalid: "
"Claimed size of %d bytes is too small "
QB_XS " from %s[%u] to %s@%s",
(msg->is_compressed? "Compressed " : ""),
msg->id, ais_dest(&(msg->sender)),
(int) msg->header.size,
msg_type2text(msg->sender.type), msg->sender.pid,
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return false;
}
if (msg->header.error != CS_OK) {
crm_err("%sCPG message %d from %s invalid: "
"Sender indicated error %d "
QB_XS " from %s[%u] to %s@%s",
(msg->is_compressed? "Compressed " : ""),
msg->id, ais_dest(&(msg->sender)),
msg->header.error,
msg_type2text(msg->sender.type), msg->sender.pid,
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return false;
}
if (msg_data_len(msg) != payload_size) {
crm_err("%sCPG message %d from %s invalid: "
"Total size %d inconsistent with payload size %d "
QB_XS " from %s[%u] to %s@%s",
(msg->is_compressed? "Compressed " : ""),
msg->id, ais_dest(&(msg->sender)),
(int) msg->header.size, (int) msg_data_len(msg),
msg_type2text(msg->sender.type), msg->sender.pid,
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return false;
}
if (!msg->is_compressed &&
/* msg->size != (strlen(msg->data) + 1) would be a stronger check,
* but checking the last byte or two should be quick
*/
(((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
|| (msg->data[msg->size - 1] != '\0'))) {
crm_err("CPG message %d from %s invalid: "
"Payload does not end at byte %llu "
QB_XS " from %s[%u] to %s@%s",
msg->id, ais_dest(&(msg->sender)),
(unsigned long long) msg->size,
msg_type2text(msg->sender.type), msg->sender.pid,
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return false;
}
crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
(int) msg->header.size, (msg->is_compressed? "compressed " : ""),
msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
ais_dest(&(msg->sender)),
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return true;
}
/*!
* \internal
* \brief Extract text data from a Corosync CPG message
*
* \param[in] handle CPG connection (to get local node ID if not known)
* \param[in] sender_id Corosync ID of node that sent message
* \param[in] pid Process ID of message sender (for logging only)
* \param[in,out] content CPG message
* \param[out] from If not \c NULL, will be set to sender uname
* (valid for the lifetime of \p content)
*
* \return Newly allocated string with message data
*
* \note The caller is responsible for freeing the return value using \c free().
*/
char *
pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid,
void *content, const char **from)
{
char *data = NULL;
pcmk__cpg_msg_t *msg = content;
if (handle != 0) {
// Do filtering and field massaging
uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
const char *local_name = pcmk__cluster_local_node_name();
if ((msg->sender.id != 0) && (msg->sender.id != sender_id)) {
crm_err("Nodeid mismatch from %" PRIu32 ".%" PRIu32
": claimed nodeid=%" PRIu32,
sender_id, pid, msg->sender.id);
return NULL;
}
if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) {
crm_trace("Not for us: %" PRIu32" != %" PRIu32,
msg->host.id, local_nodeid);
return NULL;
}
if ((msg->host.size > 0)
&& !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
return NULL;
}
msg->sender.id = sender_id;
if (msg->sender.size == 0) {
const pcmk__node_status_t *peer =
pcmk__get_node(sender_id, NULL, NULL,
pcmk__node_search_cluster_member);
if (peer->name == NULL) {
crm_err("No node name for peer with nodeid=%u", sender_id);
} else {
crm_notice("Fixing node name for peer with nodeid=%u",
sender_id);
msg->sender.size = strlen(peer->name);
memset(msg->sender.uname, 0, MAX_NAME);
memcpy(msg->sender.uname, peer->name, msg->sender.size);
}
}
}
crm_trace("Got new%s message (size=%d, %d, %d)",
msg->is_compressed ? " compressed" : "",
msg_data_len(msg), msg->size, msg->compressed_size);
if (from != NULL) {
*from = msg->sender.uname;
}
if (msg->is_compressed && (msg->size > 0)) {
int rc = BZ_OK;
char *uncompressed = NULL;
unsigned int new_size = msg->size + 1;
if (!check_message_sanity(msg)) {
goto badmsg;
}
crm_trace("Decompressing message data");
uncompressed = pcmk__assert_alloc(1, new_size);
rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data,
msg->compressed_size, 1, 0);
rc = pcmk__bzlib2rc(rc);
if (rc != pcmk_rc_ok) {
crm_err("Decompression failed: %s " QB_XS " rc=%d",
pcmk_rc_str(rc), rc);
free(uncompressed);
goto badmsg;
}
CRM_ASSERT(new_size == msg->size);
data = uncompressed;
} else if (!check_message_sanity(msg)) {
goto badmsg;
} else {
data = strdup(msg->data);
}
// Is this necessary?
pcmk__get_node(msg->sender.id, msg->sender.uname, NULL,
pcmk__node_search_cluster_member);
crm_trace("Payload: %.200s", data);
return data;
badmsg:
crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
" min=%d, total=%d, size=%d, bz2_size=%d",
msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
msg->header.size, msg->size, msg->compressed_size);
free(data);
return NULL;
}
/*!
* \internal
* \brief Compare cpg_address objects by node ID
*
* \param[in] first First cpg_address structure to compare
* \param[in] second Second cpg_address structure to compare
*
* \return Negative number if first's node ID is lower,
* positive number if first's node ID is greater,
* or 0 if both node IDs are equal
*/
static int
cmp_member_list_nodeid(const void *first, const void *second)
{
const struct cpg_address *const a = *((const struct cpg_address **) first),
*const b = *((const struct cpg_address **) second);
if (a->nodeid < b->nodeid) {
return -1;
} else if (a->nodeid > b->nodeid) {
return 1;
}
/* don't bother with "reason" nor "pid" */
return 0;
}
/*!
* \internal
* \brief Get a readable string equivalent of a cpg_reason_t value
*
* \param[in] reason CPG reason value
*
* \return Readable string suitable for logging
*/
static const char *
cpgreason2str(cpg_reason_t reason)
{
switch (reason) {
case CPG_REASON_JOIN: return " via cpg_join";
case CPG_REASON_LEAVE: return " via cpg_leave";
case CPG_REASON_NODEDOWN: return " via cluster exit";
case CPG_REASON_NODEUP: return " via cluster join";
case CPG_REASON_PROCDOWN: return " for unknown reason";
default: break;
}
return "";
}
/*!
* \internal
* \brief Get a log-friendly node name
*
* \param[in] peer Node to check
*
* \return Node's uname, or readable string if not known
*/
static inline const char *
peer_name(const pcmk__node_status_t *peer)
{
return (peer != NULL)? pcmk__s(peer->name, "peer node") : "unknown node";
}
/*!
* \internal
* \brief Process a CPG peer's leaving the cluster
*
* \param[in] cpg_group_name CPG group name (for logging)
* \param[in] event_counter Event number (for logging)
* \param[in] local_nodeid Node ID of local node
* \param[in] cpg_peer CPG peer that left
* \param[in] sorted_member_list List of remaining members, qsort()-ed by ID
* \param[in] member_list_entries Number of entries in \p sorted_member_list
*/
static void
node_left(const char *cpg_group_name, int event_counter,
uint32_t local_nodeid, const struct cpg_address *cpg_peer,
const struct cpg_address **sorted_member_list,
size_t member_list_entries)
{
pcmk__node_status_t *peer =
pcmk__search_node_caches(cpg_peer->nodeid, NULL,
pcmk__node_search_cluster_member);
const struct cpg_address **rival = NULL;
/* Most CPG-related Pacemaker code assumes that only one process on a node
* can be in the process group, but Corosync does not impose this
* limitation, and more than one can be a member in practice due to a
* daemon attempting to start while another instance is already running.
*
* Check for any such duplicate instances, because we don't want to process
* their leaving as if our actual peer left. If the peer that left still has
* an entry in sorted_member_list (with a different PID), we will ignore the
* leaving.
*
* @TODO Track CPG members' PIDs so we can tell exactly who left.
*/
if (peer != NULL) {
rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
sizeof(const struct cpg_address *),
cmp_member_list_nodeid);
}
if (rival == NULL) {
crm_info("Group %s event %d: %s (node %u pid %u) left%s",
cpg_group_name, event_counter, peer_name(peer),
cpg_peer->nodeid, cpg_peer->pid,
cpgreason2str(cpg_peer->reason));
if (peer != NULL) {
crm_update_peer_proc(__func__, peer, crm_proc_cpg,
PCMK_VALUE_OFFLINE);
}
} else if (cpg_peer->nodeid == local_nodeid) {
crm_warn("Group %s event %d: duplicate local pid %u left%s",
cpg_group_name, event_counter,
cpg_peer->pid, cpgreason2str(cpg_peer->reason));
} else {
crm_warn("Group %s event %d: "
"%s (node %u) duplicate pid %u left%s (%u remains)",
cpg_group_name, event_counter, peer_name(peer),
cpg_peer->nodeid, cpg_peer->pid,
cpgreason2str(cpg_peer->reason), (*rival)->pid);
}
}
/*!
* \internal
* \brief Handle a CPG configuration change event
*
* \param[in] handle CPG connection
* \param[in] group_name CPG group name
* \param[in] member_list List of current CPG members
* \param[in] member_list_entries Number of entries in \p member_list
* \param[in] left_list List of CPG members that left
* \param[in] left_list_entries Number of entries in \p left_list
* \param[in] joined_list List of CPG members that joined
* \param[in] joined_list_entries Number of entries in \p joined_list
*
* \note This is of type \c cpg_confchg_fn_t, intended to be used in a
* \c cpg_callbacks_t object.
*/
void
pcmk__cpg_confchg_cb(cpg_handle_t handle,
const struct cpg_name *group_name,
const struct cpg_address *member_list,
size_t member_list_entries,
const struct cpg_address *left_list,
size_t left_list_entries,
const struct cpg_address *joined_list,
size_t joined_list_entries)
{
static int counter = 0;
bool found = false;
uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
const struct cpg_address **sorted = NULL;
sorted = pcmk__assert_alloc(member_list_entries,
sizeof(const struct cpg_address *));
for (size_t iter = 0; iter < member_list_entries; iter++) {
sorted[iter] = member_list + iter;
}
// So that the cross-matching of multiply-subscribed nodes is then cheap
qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
cmp_member_list_nodeid);
for (int i = 0; i < left_list_entries; i++) {
node_left(group_name->value, counter, local_nodeid, &left_list[i],
sorted, member_list_entries);
}
free(sorted);
sorted = NULL;
for (int i = 0; i < joined_list_entries; i++) {
crm_info("Group %s event %d: node %u pid %u joined%s",
group_name->value, counter, joined_list[i].nodeid,
joined_list[i].pid, cpgreason2str(joined_list[i].reason));
}
for (int i = 0; i < member_list_entries; i++) {
pcmk__node_status_t *peer =
pcmk__get_node(member_list[i].nodeid, NULL, NULL,
pcmk__node_search_cluster_member);
if (member_list[i].nodeid == local_nodeid
&& member_list[i].pid != getpid()) {
// See the note in node_left()
crm_warn("Group %s event %d: detected duplicate local pid %u",
group_name->value, counter, member_list[i].pid);
continue;
}
crm_info("Group %s event %d: %s (node %u pid %u) is member",
group_name->value, counter, peer_name(peer),
member_list[i].nodeid, member_list[i].pid);
/* If the caller left auto-reaping enabled, this will also update the
* state to member.
*/
peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
PCMK_VALUE_ONLINE);
if (peer && peer->state && strcmp(peer->state, PCMK_VALUE_MEMBER)) {
/* The node is a CPG member, but we currently think it's not a
* cluster member. This is possible only if auto-reaping was
* disabled. The node may be joining, and we happened to get the CPG
* notification before the quorum notification; or the node may have
* just died, and we are processing its final messages; or a bug
* has affected the peer cache.
*/
time_t now = time(NULL);
if (peer->when_lost == 0) {
// Track when we first got into this contradictory state
peer->when_lost = now;
} else if (now > (peer->when_lost + 60)) {
// If it persists for more than a minute, update the state
crm_warn("Node %u is member of group %s but was believed "
"offline",
member_list[i].nodeid, group_name->value);
pcmk__update_peer_state(__func__, peer, PCMK_VALUE_MEMBER, 0);
}
}
if (local_nodeid == member_list[i].nodeid) {
found = true;
}
}
if (!found) {
crm_err("Local node was evicted from group %s", group_name->value);
cpg_evicted = true;
}
counter++;
}
/*!
* \brief Set the CPG deliver callback function for a cluster object
*
* \param[in,out] cluster Cluster object
* \param[in] fn Deliver callback function to set
*
* \return Standard Pacemaker return code
*/
int
pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
{
if (cluster == NULL) {
return EINVAL;
}
cluster->cpg.cpg_deliver_fn = fn;
return pcmk_rc_ok;
}
/*!
* \brief Set the CPG config change callback function for a cluster object
*
* \param[in,out] cluster Cluster object
* \param[in] fn Configuration change callback function to set
*
* \return Standard Pacemaker return code
*/
int
pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
{
if (cluster == NULL) {
return EINVAL;
}
cluster->cpg.cpg_confchg_fn = fn;
return pcmk_rc_ok;
}
/*!
* \brief Connect to Corosync CPG
*
* \param[in,out] cluster Initialized cluster object to connect
*
* \return Standard Pacemaker return code
*/
int
pcmk__cpg_connect(pcmk_cluster_t *cluster)
{
cs_error_t rc;
int fd = -1;
int retries = 0;
uint32_t id = 0;
pcmk__node_status_t *peer = NULL;
cpg_handle_t handle = 0;
const char *message_name = pcmk__message_name(crm_system_name);
uid_t found_uid = 0;
gid_t found_gid = 0;
pid_t found_pid = 0;
int rv;
struct mainloop_fd_callbacks cpg_fd_callbacks = {
.dispatch = pcmk_cpg_dispatch,
.destroy = cluster->destroy,
};
cpg_model_v1_data_t cpg_model_info = {
.model = CPG_MODEL_V1,
.cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
.cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
.cpg_totem_confchg_fn = NULL,
.flags = 0,
};
cpg_evicted = false;
cluster->priv->group.length = 0;
cluster->priv->group.value[0] = 0;
/* group.value is char[128] */
strncpy(cluster->priv->group.value, message_name, 127);
cluster->priv->group.value[127] = 0;
cluster->priv->group.length = QB_MIN(127,
strlen(cluster->priv->group.value));
cluster->priv->group.length++;
cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
if (rc != CS_OK) {
crm_err("Could not connect to the CPG API: %s (%d)",
cs_strerror(rc), rc);
goto bail;
}
rc = cpg_fd_get(handle, &fd);
if (rc != CS_OK) {
crm_err("Could not obtain the CPG API connection: %s (%d)",
cs_strerror(rc), rc);
goto bail;
}
/* CPG provider run as root (in given user namespace, anyway)? */
if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
&found_uid, &found_gid))) {
crm_err("CPG provider is not authentic:"
" process %lld (uid: %lld, gid: %lld)",
(long long) PCMK__SPECIAL_PID_AS_0(found_pid),
(long long) found_uid, (long long) found_gid);
rc = CS_ERR_ACCESS;
goto bail;
} else if (rv < 0) {
crm_err("Could not verify authenticity of CPG provider: %s (%d)",
strerror(-rv), -rv);
rc = CS_ERR_ACCESS;
goto bail;
}
id = pcmk__cpg_local_nodeid(handle);
if (id == 0) {
crm_err("Could not get local node id from the CPG API");
goto bail;
}
cluster->priv->node_id = id;
retries = 0;
cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->priv->group));
if (rc != CS_OK) {
crm_err("Could not join the CPG group '%s': %d", message_name, rc);
goto bail;
}
pcmk_cpg_handle = handle;
cluster->priv->cpg_handle = handle;
mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
bail:
if (rc != CS_OK) {
cpg_finalize(handle);
// @TODO Map rc to more specific Pacemaker return code
return ENOTCONN;
}
peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member);
crm_update_peer_proc(__func__, peer, crm_proc_cpg, PCMK_VALUE_ONLINE);
return pcmk_rc_ok;
}
/*!
* \internal
* \brief Disconnect from Corosync CPG
*
* \param[in,out] cluster Cluster object to disconnect
*/
void
pcmk__cpg_disconnect(pcmk_cluster_t *cluster)
{
pcmk_cpg_handle = 0;
if (cluster->priv->cpg_handle != 0) {
crm_trace("Disconnecting CPG");
cpg_leave(cluster->priv->cpg_handle, &cluster->priv->group);
cpg_finalize(cluster->priv->cpg_handle);
cluster->priv->cpg_handle = 0;
} else {
crm_info("No CPG connection");
}
}
/*!
* \internal
* \brief Send string data via Corosync CPG
*
* \param[in] data Data to send
* \param[in] node Cluster node to send message to
* \param[in] dest Type of message to send
*
* \return \c true on success, or \c false otherwise
*/
static bool
send_cpg_text(const char *data, const pcmk__node_status_t *node,
enum pcmk_ipc_server dest)
{
static int msg_id = 0;
static int local_pid = 0;
static int local_name_len = 0;
static const char *local_name = NULL;
char *target = NULL;
struct iovec *iov;
pcmk__cpg_msg_t *msg = NULL;
if (local_name == NULL) {
local_name = pcmk__cluster_local_node_name();
}
if ((local_name_len == 0) && (local_name != NULL)) {
local_name_len = strlen(local_name);
}
if (data == NULL) {
data = "";
}
if (local_pid == 0) {
local_pid = getpid();
}
msg = pcmk__assert_alloc(1, sizeof(pcmk__cpg_msg_t));
msg_id++;
msg->id = msg_id;
msg->header.error = CS_OK;
msg->host.type = dest;
if (node != NULL) {
if (node->name != NULL) {
target = pcmk__str_copy(node->name);
msg->host.size = strlen(node->name);
memset(msg->host.uname, 0, MAX_NAME);
memcpy(msg->host.uname, node->name, msg->host.size);
} else {
target = crm_strdup_printf("%" PRIu32, node->cluster_layer_id);
}
msg->host.id = node->cluster_layer_id;
} else {
target = pcmk__str_copy("all");
}
msg->sender.id = 0;
msg->sender.type = pcmk__cluster_parse_msg_type(crm_system_name);
msg->sender.pid = local_pid;
msg->sender.size = local_name_len;
memset(msg->sender.uname, 0, MAX_NAME);
if ((local_name != NULL) && (msg->sender.size != 0)) {
memcpy(msg->sender.uname, local_name, msg->sender.size);
}
msg->size = 1 + strlen(data);
msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
if (msg->size < CRM_BZ2_THRESHOLD) {
msg = pcmk__realloc(msg, msg->header.size);
memcpy(msg->data, data, msg->size);
} else {
char *compressed = NULL;
unsigned int new_size = 0;
if (pcmk__compress(data, (unsigned int) msg->size, 0, &compressed,
&new_size) == pcmk_rc_ok) {
msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
msg = pcmk__realloc(msg, msg->header.size);
memcpy(msg->data, compressed, new_size);
msg->is_compressed = TRUE;
msg->compressed_size = new_size;
} else {
// cppcheck seems not to understand the abort logic in pcmk__realloc
// cppcheck-suppress memleak
msg = pcmk__realloc(msg, msg->header.size);
memcpy(msg->data, data, msg->size);
}
free(compressed);
}
iov = pcmk__assert_alloc(1, sizeof(struct iovec));
iov->iov_base = msg;
iov->iov_len = msg->header.size;
if (msg->compressed_size > 0) {
crm_trace("Queueing CPG message %u to %s "
"(%llu bytes, %d bytes compressed payload): %.200s",
msg->id, target, (unsigned long long) iov->iov_len,
msg->compressed_size, data);
} else {
crm_trace("Queueing CPG message %u to %s "
"(%llu bytes, %d bytes payload): %.200s",
msg->id, target, (unsigned long long) iov->iov_len,
msg->size, data);
}
free(target);
cs_message_queue = g_list_append(cs_message_queue, iov);
crm_cs_flush(&pcmk_cpg_handle);
return true;
}
/*!
* \internal
* \brief Send an XML message via Corosync CPG
*
* \param[in] msg XML message to send
* \param[in] node Cluster node to send message to
* \param[in] dest Type of message to send
*
* \return TRUE on success, otherwise FALSE
*/
bool
pcmk__cpg_send_xml(const xmlNode *msg, const pcmk__node_status_t *node,
enum pcmk_ipc_server dest)
{
bool rc = true;
GString *data = g_string_sized_new(1024);
pcmk__xml_string(msg, 0, data, 0);
rc = send_cpg_text(data->str, node, dest);
g_string_free(data, TRUE);
return rc;
}
diff --git a/lib/common/servers.c b/lib/common/servers.c
index f773a3f659..2e64c21841 100644
--- a/lib/common/servers.c
+++ b/lib/common/servers.c
@@ -1,127 +1,143 @@
/*
* Copyright 2024 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <stdio.h> // NULL
#include <crm/crm.h>
/* Each Pacemaker subdaemon offers an IPC interface, and most exchange cluster
* messages as well. Particular names need to be used for logging, connecting
* IPC, and IPC/cluster message types.
*
* This array is indexed by enum pcmk_ipc_server and gathers all those names for
* easier mapping. Most members are lists with the first value listed being the
* "main" one returned if another value is mapped to it.
*
* @COMPAT Ideally, we'd use a single string (such as the server's
* crm_system_name) as the sole IPC name and sole message type for each server,
* making most of this unnecessary. However, backward compatiblity with older
* nodes involved in a rolling upgrade or Pacemaker Remote connection would
* be a nightmare: we'd have to add duplicate message attributes, struct
* members, and libqb IPC server endpoints for both the old and new names, and
* could drop the old names only after we no longer supported connections with
* older nodes.
*/
static struct {
const char *log_name; // Readable server name for use in logs
const char *system_names[2]; // crm_system_name values (subdaemon names)
const char *ipc_names[3]; // libqb IPC names used to contact server
const char *message_types[3]; // IPC/cluster message types sent to server
} server_info[] = {
[pcmk_ipc_unknown] = {
NULL,
{ NULL, NULL, },
{ NULL, NULL, NULL, },
{ NULL, NULL, NULL, },
},
[pcmk_ipc_attrd] = {
"attribute manager",
{ "pacemaker-attrd", NULL, },
{ PCMK__VALUE_ATTRD, NULL, NULL, },
{ PCMK__VALUE_ATTRD, NULL, NULL, },
},
[pcmk_ipc_based] = {
"CIB manager",
{ "pacemaker-based", NULL, },
{ PCMK__SERVER_BASED_RW, PCMK__SERVER_BASED_RO,
PCMK__SERVER_BASED_SHM, },
{ CRM_SYSTEM_CIB, NULL, NULL, },
},
[pcmk_ipc_controld] = {
"controller",
{ "pacemaker-controld", NULL, },
{ PCMK__VALUE_CRMD, NULL, NULL, },
{ PCMK__VALUE_CRMD, CRM_SYSTEM_DC, CRM_SYSTEM_TENGINE, },
},
[pcmk_ipc_execd] = {
"executor",
{ "pacemaker-execd", "pacemaker-remoted", },
{ PCMK__VALUE_LRMD, NULL, NULL, },
{ PCMK__VALUE_LRMD, NULL, NULL, },
},
[pcmk_ipc_fenced] = {
"fencer",
{ "pacemaker-fenced", NULL, },
{ PCMK__VALUE_STONITH_NG, NULL, NULL, },
{ PCMK__VALUE_STONITH_NG, NULL, NULL, },
},
[pcmk_ipc_pacemakerd] = {
"launcher",
{ "pacemakerd", NULL, },
{ CRM_SYSTEM_MCP, NULL, NULL, },
{ CRM_SYSTEM_MCP, NULL, NULL, },
},
[pcmk_ipc_schedulerd] = {
"scheduler",
{ "pacemaker-schedulerd", NULL, },
{ CRM_SYSTEM_PENGINE, NULL, NULL, },
{ CRM_SYSTEM_PENGINE, NULL, NULL, },
},
};
/*!
* \internal
* \brief Return a readable description of server for logging
*
* \param[in] server Server to get log name for
*
* \return Log name for server (or NULL if invalid)
*/
const char *
pcmk__server_log_name(enum pcmk_ipc_server server)
{
CRM_CHECK((server > 0) && (server < PCMK__NELEM(server_info)),
return NULL);
return server_info[server].log_name;
}
/*!
* \internal
* \brief Return the (primary) IPC endpoint name for a server
*
* \param[in] server Server to get IPC endpoint for
*
* \return IPC endpoint for server (or NULL if invalid)
*/
const char *
pcmk__server_ipc_name(enum pcmk_ipc_server server)
{
CRM_CHECK((server > 0) && (server < PCMK__NELEM(server_info)),
return NULL);
return server_info[server].ipc_names[0];
}
+
+/*!
+ * \internal
+ * \brief Return the (primary) message type for a server
+ *
+ * \param[in] server Server to get message type for
+ *
+ * \return Message type for server (or NULL if invalid)
+ */
+const char *
+pcmk__server_message_type(enum pcmk_ipc_server server)
+{
+ CRM_CHECK((server > 0) && (server < PCMK__NELEM(server_info)),
+ return NULL);
+ return server_info[server].message_types[0];
+}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Mon, Apr 21, 2:40 PM (1 d, 14 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1664929
Default Alt Text
(39 KB)
Attached To
Mode
rP Pacemaker
Attached
Detach File
Event Timeline
Log In to Comment