Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/daemons/attrd/attrd_corosync.c b/daemons/attrd/attrd_corosync.c
index ef205e6e13..086ec03d95 100644
--- a/daemons/attrd/attrd_corosync.c
+++ b/daemons/attrd/attrd_corosync.c
@@ -1,620 +1,620 @@
/*
* Copyright 2013-2023 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU General Public License version 2
* or later (GPLv2+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <errno.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <crm/cluster.h>
#include <crm/cluster/internal.h>
#include <crm/common/logging.h>
#include <crm/common/results.h>
#include <crm/common/strings_internal.h>
#include <crm/msg_xml.h>
#include "pacemaker-attrd.h"
extern crm_exit_t attrd_exit_status;
static xmlNode *
attrd_confirmation(int callid)
{
xmlNode *node = create_xml_node(NULL, __func__);
crm_xml_add(node, F_TYPE, T_ATTRD);
crm_xml_add(node, F_ORIG, get_local_node_name());
crm_xml_add(node, PCMK__XA_TASK, PCMK__ATTRD_CMD_CONFIRM);
crm_xml_add_int(node, XML_LRM_ATTR_CALLID, callid);
return node;
}
static void
attrd_peer_message(crm_node_t *peer, xmlNode *xml)
{
const char *election_op = crm_element_value(xml, F_CRM_TASK);
if (election_op) {
attrd_handle_election_op(peer, xml);
return;
}
if (attrd_shutting_down()) {
/* If we're shutting down, we want to continue responding to election
* ops as long as we're a cluster member (because our vote may be
* needed). Ignore all other messages.
*/
return;
} else {
pcmk__request_t request = {
.ipc_client = NULL,
.ipc_id = 0,
.ipc_flags = 0,
.peer = peer->uname,
.xml = xml,
.call_options = 0,
.result = PCMK__UNKNOWN_RESULT,
};
request.op = crm_element_value_copy(request.xml, PCMK__XA_TASK);
CRM_CHECK(request.op != NULL, return);
attrd_handle_request(&request);
/* Having finished handling the request, check to see if the originating
* peer requested confirmation. If so, send that confirmation back now.
*/
if (pcmk__xe_attr_is_true(xml, PCMK__XA_CONFIRM) &&
!pcmk__str_eq(request.op, PCMK__ATTRD_CMD_CONFIRM, pcmk__str_none)) {
int callid = 0;
xmlNode *reply = NULL;
/* Add the confirmation ID for the message we are confirming to the
* response so the originating peer knows what they're a confirmation
* for.
*/
crm_element_value_int(xml, XML_LRM_ATTR_CALLID, &callid);
reply = attrd_confirmation(callid);
/* And then send the confirmation back to the originating peer. This
* ends up right back in this same function (attrd_peer_message) on the
* peer where it will have to do something with a PCMK__XA_CONFIRM type
* message.
*/
crm_debug("Sending %s a confirmation", peer->uname);
attrd_send_message(peer, reply, false);
free_xml(reply);
}
pcmk__reset_request(&request);
}
}
static void
attrd_cpg_dispatch(cpg_handle_t handle,
const struct cpg_name *groupName,
uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
{
uint32_t kind = 0;
xmlNode *xml = NULL;
const char *from = NULL;
char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
if(data == NULL) {
return;
}
if (kind == crm_class_cluster) {
xml = string2xml(data);
}
if (xml == NULL) {
crm_err("Bad message of class %d received from %s[%u]: '%.120s'", kind, from, nodeid, data);
} else {
crm_node_t *peer = crm_get_peer(nodeid, from);
attrd_peer_message(peer, xml);
}
free_xml(xml);
free(data);
}
static void
attrd_cpg_destroy(gpointer unused)
{
if (attrd_shutting_down()) {
crm_info("Corosync disconnection complete");
} else {
crm_crit("Lost connection to cluster layer, shutting down");
attrd_exit_status = CRM_EX_DISCONNECT;
attrd_shutdown(0);
}
}
/*!
* \internal
* \brief Override an attribute sync with a local value
*
* Broadcast the local node's value for an attribute that's different from the
* value provided in a peer's attribute synchronization response. This ensures a
* node's values for itself take precedence and all peers are kept in sync.
*
* \param[in] a Attribute entry to override
*
* \return Local instance of attribute value
*/
static attribute_value_t *
broadcast_local_value(const attribute_t *a)
{
attribute_value_t *v = g_hash_table_lookup(a->values, attrd_cluster->uname);
xmlNode *sync = create_xml_node(NULL, __func__);
crm_xml_add(sync, PCMK__XA_TASK, PCMK__ATTRD_CMD_SYNC_RESPONSE);
attrd_add_value_xml(sync, a, v, false);
attrd_send_message(NULL, sync, false);
free_xml(sync);
return v;
}
/*!
* \internal
* \brief Ensure a Pacemaker Remote node is in the correct peer cache
*
* \param[in] node_name Name of Pacemaker Remote node to check
*/
static void
cache_remote_node(const char *node_name)
{
/* If we previously assumed this node was an unseen cluster node,
* remove its entry from the cluster peer cache.
*/
- crm_node_t *dup = pcmk__search_cluster_node_cache(0, node_name);
+ crm_node_t *dup = pcmk__search_cluster_node_cache(0, node_name, NULL);
if (dup && (dup->uuid == NULL)) {
reap_crm_member(0, node_name);
}
// Ensure node is in the remote peer cache
CRM_ASSERT(crm_remote_peer_get(node_name) != NULL);
}
#define state_text(state) pcmk__s((state), "in unknown state")
/*!
* \internal
* \brief Return host's hash table entry (creating one if needed)
*
* \param[in,out] values Hash table of values
* \param[in] host Name of peer to look up
* \param[in] xml XML describing the attribute
*
* \return Pointer to new or existing hash table entry
*/
static attribute_value_t *
attrd_lookup_or_create_value(GHashTable *values, const char *host,
const xmlNode *xml)
{
attribute_value_t *v = g_hash_table_lookup(values, host);
int is_remote = 0;
crm_element_value_int(xml, PCMK__XA_ATTR_IS_REMOTE, &is_remote);
if (is_remote) {
cache_remote_node(host);
}
if (v == NULL) {
v = calloc(1, sizeof(attribute_value_t));
CRM_ASSERT(v != NULL);
pcmk__str_update(&v->nodename, host);
v->is_remote = is_remote;
g_hash_table_replace(values, v->nodename, v);
}
return(v);
}
static void
attrd_peer_change_cb(enum crm_status_type kind, crm_node_t *peer, const void *data)
{
bool gone = false;
bool is_remote = pcmk_is_set(peer->flags, crm_remote_node);
switch (kind) {
case crm_status_uname:
crm_debug("%s node %s is now %s",
(is_remote? "Remote" : "Cluster"),
peer->uname, state_text(peer->state));
break;
case crm_status_processes:
if (!pcmk_is_set(peer->processes, crm_get_cluster_proc())) {
gone = true;
}
crm_debug("Node %s is %s a peer",
peer->uname, (gone? "no longer" : "now"));
break;
case crm_status_nstate:
crm_debug("%s node %s is now %s (was %s)",
(is_remote? "Remote" : "Cluster"),
peer->uname, state_text(peer->state), state_text(data));
if (pcmk__str_eq(peer->state, CRM_NODE_MEMBER, pcmk__str_casei)) {
/* If we're the writer, send new peers a list of all attributes
* (unless it's a remote node, which doesn't run its own attrd)
*/
if (attrd_election_won()
&& !pcmk_is_set(peer->flags, crm_remote_node)) {
attrd_peer_sync(peer, NULL);
}
} else {
// Remove all attribute values associated with lost nodes
attrd_peer_remove(peer->uname, false, "loss");
gone = true;
}
break;
}
// Remove votes from cluster nodes that leave, in case election in progress
if (gone && !is_remote) {
attrd_remove_voter(peer);
attrd_remove_peer_protocol_ver(peer->uname);
attrd_do_not_expect_from_peer(peer->uname);
// Ensure remote nodes that come up are in the remote node cache
} else if (!gone && is_remote) {
cache_remote_node(peer->uname);
}
}
static void
record_peer_nodeid(attribute_value_t *v, const char *host)
{
crm_node_t *known_peer = crm_get_peer(v->nodeid, host);
crm_trace("Learned %s has node id %s", known_peer->uname, known_peer->uuid);
if (attrd_election_won()) {
attrd_write_attributes(false, false);
}
}
static void
update_attr_on_host(attribute_t *a, const crm_node_t *peer, const xmlNode *xml,
const char *attr, const char *value, const char *host,
bool filter, int is_force_write)
{
attribute_value_t *v = NULL;
v = attrd_lookup_or_create_value(a->values, host, xml);
if (filter && !pcmk__str_eq(v->current, value, pcmk__str_casei)
&& pcmk__str_eq(host, attrd_cluster->uname, pcmk__str_casei)) {
crm_notice("%s[%s]: local value '%s' takes priority over '%s' from %s",
attr, host, v->current, value, peer->uname);
v = broadcast_local_value(a);
} else if (!pcmk__str_eq(v->current, value, pcmk__str_casei)) {
crm_notice("Setting %s[%s]%s%s: %s -> %s "
CRM_XS " from %s with %s write delay",
attr, host, a->set_type ? " in " : "",
pcmk__s(a->set_type, ""), pcmk__s(v->current, "(unset)"),
pcmk__s(value, "(unset)"), peer->uname,
(a->timeout_ms == 0)? "no" : pcmk__readable_interval(a->timeout_ms));
pcmk__str_update(&v->current, value);
a->changed = true;
if (pcmk__str_eq(host, attrd_cluster->uname, pcmk__str_casei)
&& pcmk__str_eq(attr, XML_CIB_ATTR_SHUTDOWN, pcmk__str_none)) {
if (!pcmk__str_eq(value, "0", pcmk__str_null_matches)) {
attrd_set_requesting_shutdown();
} else {
attrd_clear_requesting_shutdown();
}
}
// Write out new value or start dampening timer
if (a->timeout_ms && a->timer) {
crm_trace("Delayed write out (%dms) for %s", a->timeout_ms, attr);
mainloop_timer_start(a->timer);
} else {
attrd_write_or_elect_attribute(a);
}
} else {
if (is_force_write == 1 && a->timeout_ms && a->timer) {
/* Save forced writing and set change flag. */
/* The actual attribute is written by Writer after election. */
crm_trace("Unchanged %s[%s] from %s is %s(Set the forced write flag)",
attr, host, peer->uname, value);
a->force_write = TRUE;
} else {
crm_trace("Unchanged %s[%s] from %s is %s", attr, host, peer->uname, value);
}
}
/* Set the seen flag for attribute processing held only in the own node. */
v->seen = TRUE;
/* If this is a cluster node whose node ID we are learning, remember it */
if ((v->nodeid == 0) && (v->is_remote == FALSE)
&& (crm_element_value_int(xml, PCMK__XA_ATTR_NODE_ID,
(int*)&v->nodeid) == 0) && (v->nodeid > 0)) {
record_peer_nodeid(v, host);
}
}
static void
attrd_peer_update_one(const crm_node_t *peer, xmlNode *xml, bool filter)
{
attribute_t *a = NULL;
const char *attr = crm_element_value(xml, PCMK__XA_ATTR_NAME);
const char *value = crm_element_value(xml, PCMK__XA_ATTR_VALUE);
const char *host = crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME);
int is_force_write = 0;
if (attr == NULL) {
crm_warn("Could not update attribute: peer did not specify name");
return;
}
crm_element_value_int(xml, PCMK__XA_ATTR_FORCE, &is_force_write);
a = attrd_populate_attribute(xml, attr);
if (a == NULL) {
return;
}
if (host == NULL) {
// If no host was specified, update all hosts
GHashTableIter vIter;
crm_debug("Setting %s for all hosts to %s", attr, value);
xml_remove_prop(xml, PCMK__XA_ATTR_NODE_ID);
g_hash_table_iter_init(&vIter, a->values);
while (g_hash_table_iter_next(&vIter, (gpointer *) & host, NULL)) {
update_attr_on_host(a, peer, xml, attr, value, host, filter, is_force_write);
}
} else {
// Update attribute value for the given host
update_attr_on_host(a, peer, xml, attr, value, host, filter, is_force_write);
}
/* If this is a message from some attrd instance broadcasting its protocol
* version, check to see if it's a new minimum version.
*/
if (pcmk__str_eq(attr, CRM_ATTR_PROTOCOL, pcmk__str_none)) {
attrd_update_minimum_protocol_ver(peer->uname, value);
}
}
static void
broadcast_unseen_local_values(void)
{
GHashTableIter aIter;
GHashTableIter vIter;
attribute_t *a = NULL;
attribute_value_t *v = NULL;
xmlNode *sync = NULL;
g_hash_table_iter_init(&aIter, attributes);
while (g_hash_table_iter_next(&aIter, NULL, (gpointer *) & a)) {
g_hash_table_iter_init(&vIter, a->values);
while (g_hash_table_iter_next(&vIter, NULL, (gpointer *) & v)) {
if (!(v->seen) && pcmk__str_eq(v->nodename, attrd_cluster->uname,
pcmk__str_casei)) {
if (sync == NULL) {
sync = create_xml_node(NULL, __func__);
crm_xml_add(sync, PCMK__XA_TASK, PCMK__ATTRD_CMD_SYNC_RESPONSE);
}
attrd_add_value_xml(sync, a, v, a->timeout_ms && a->timer);
}
}
}
if (sync != NULL) {
crm_debug("Broadcasting local-only values");
attrd_send_message(NULL, sync, false);
free_xml(sync);
}
}
int
attrd_cluster_connect(void)
{
attrd_cluster = pcmk_cluster_new();
attrd_cluster->destroy = attrd_cpg_destroy;
attrd_cluster->cpg.cpg_deliver_fn = attrd_cpg_dispatch;
attrd_cluster->cpg.cpg_confchg_fn = pcmk_cpg_membership;
crm_set_status_callback(&attrd_peer_change_cb);
if (crm_cluster_connect(attrd_cluster) == FALSE) {
crm_err("Cluster connection failed");
return -ENOTCONN;
}
return pcmk_ok;
}
void
attrd_peer_clear_failure(pcmk__request_t *request)
{
xmlNode *xml = request->xml;
const char *rsc = crm_element_value(xml, PCMK__XA_ATTR_RESOURCE);
const char *host = crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME);
const char *op = crm_element_value(xml, PCMK__XA_ATTR_OPERATION);
const char *interval_spec = crm_element_value(xml, PCMK__XA_ATTR_INTERVAL);
guint interval_ms = crm_parse_interval_spec(interval_spec);
char *attr = NULL;
GHashTableIter iter;
regex_t regex;
crm_node_t *peer = crm_get_peer(0, request->peer);
if (attrd_failure_regex(&regex, rsc, op, interval_ms) != pcmk_ok) {
crm_info("Ignoring invalid request to clear failures for %s",
pcmk__s(rsc, "all resources"));
return;
}
crm_xml_add(xml, PCMK__XA_TASK, PCMK__ATTRD_CMD_UPDATE);
/* Make sure value is not set, so we delete */
if (crm_element_value(xml, PCMK__XA_ATTR_VALUE)) {
crm_xml_replace(xml, PCMK__XA_ATTR_VALUE, NULL);
}
g_hash_table_iter_init(&iter, attributes);
while (g_hash_table_iter_next(&iter, (gpointer *) &attr, NULL)) {
if (regexec(&regex, attr, 0, NULL, 0) == 0) {
crm_trace("Matched %s when clearing %s",
attr, pcmk__s(rsc, "all resources"));
crm_xml_add(xml, PCMK__XA_ATTR_NAME, attr);
attrd_peer_update(peer, xml, host, false);
}
}
regfree(&regex);
}
/*!
* \internal
* \brief Load attributes from a peer sync response
*
* \param[in] peer Peer that sent clear request
* \param[in] peer_won Whether peer is the attribute writer
* \param[in,out] xml Request XML
*/
void
attrd_peer_sync_response(const crm_node_t *peer, bool peer_won, xmlNode *xml)
{
crm_info("Processing " PCMK__ATTRD_CMD_SYNC_RESPONSE " from %s",
peer->uname);
if (peer_won) {
/* Initialize the "seen" flag for all attributes to cleared, so we can
* detect attributes that local node has but the writer doesn't.
*/
attrd_clear_value_seen();
}
// Process each attribute update in the sync response
for (xmlNode *child = pcmk__xml_first_child(xml); child != NULL;
child = pcmk__xml_next(child)) {
attrd_peer_update(peer, child,
crm_element_value(child, PCMK__XA_ATTR_NODE_NAME),
true);
}
if (peer_won) {
/* If any attributes are still not marked as seen, the writer doesn't
* know about them, so send all peers an update with them.
*/
broadcast_unseen_local_values();
}
}
/*!
* \internal
* \brief Remove all attributes and optionally peer cache entries for a node
*
* \param[in] host Name of node to purge
* \param[in] uncache If true, remove node from peer caches
* \param[in] source Who requested removal (only used for logging)
*/
void
attrd_peer_remove(const char *host, bool uncache, const char *source)
{
attribute_t *a = NULL;
GHashTableIter aIter;
CRM_CHECK(host != NULL, return);
crm_notice("Removing all %s attributes for peer %s", host, source);
g_hash_table_iter_init(&aIter, attributes);
while (g_hash_table_iter_next(&aIter, NULL, (gpointer *) & a)) {
if(g_hash_table_remove(a->values, host)) {
crm_debug("Removed %s[%s] for peer %s", a->id, host, source);
}
}
if (uncache) {
crm_remote_peer_cache_remove(host);
reap_crm_member(0, host);
}
}
void
attrd_peer_sync(crm_node_t *peer, xmlNode *xml)
{
GHashTableIter aIter;
GHashTableIter vIter;
attribute_t *a = NULL;
attribute_value_t *v = NULL;
xmlNode *sync = create_xml_node(NULL, __func__);
crm_xml_add(sync, PCMK__XA_TASK, PCMK__ATTRD_CMD_SYNC_RESPONSE);
g_hash_table_iter_init(&aIter, attributes);
while (g_hash_table_iter_next(&aIter, NULL, (gpointer *) & a)) {
g_hash_table_iter_init(&vIter, a->values);
while (g_hash_table_iter_next(&vIter, NULL, (gpointer *) & v)) {
crm_debug("Syncing %s[%s] = %s to %s", a->id, v->nodename, v->current, peer?peer->uname:"everyone");
attrd_add_value_xml(sync, a, v, false);
}
}
crm_debug("Syncing values to %s", peer?peer->uname:"everyone");
attrd_send_message(peer, sync, false);
free_xml(sync);
}
void
attrd_peer_update(const crm_node_t *peer, xmlNode *xml, const char *host,
bool filter)
{
bool handle_sync_point = false;
if (xml_has_children(xml)) {
for (xmlNode *child = first_named_child(xml, XML_ATTR_OP); child != NULL;
child = crm_next_same_xml(child)) {
attrd_copy_xml_attributes(xml, child);
attrd_peer_update_one(peer, child, filter);
if (attrd_request_has_sync_point(child)) {
handle_sync_point = true;
}
}
} else {
attrd_peer_update_one(peer, xml, filter);
if (attrd_request_has_sync_point(xml)) {
handle_sync_point = true;
}
}
/* If the update XML specified that the client wanted to wait for a sync
* point, process that now.
*/
if (handle_sync_point) {
crm_trace("Hit local sync point for attribute update");
attrd_ack_waitlist_clients(attrd_sync_point_local, xml);
}
}
diff --git a/daemons/attrd/attrd_ipc.c b/daemons/attrd/attrd_ipc.c
index 9d3dfff23a..8b7040b57f 100644
--- a/daemons/attrd/attrd_ipc.c
+++ b/daemons/attrd/attrd_ipc.c
@@ -1,628 +1,629 @@
/*
* Copyright 2004-2023 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU General Public License version 2
* or later (GPLv2+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <errno.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/types.h>
#include <crm/cluster.h>
#include <crm/cluster/internal.h>
#include <crm/msg_xml.h>
#include <crm/common/acl_internal.h>
#include <crm/common/ipc_internal.h>
#include <crm/common/logging.h>
#include <crm/common/results.h>
#include <crm/common/strings_internal.h>
#include <crm/common/util.h>
#include "pacemaker-attrd.h"
static qb_ipcs_service_t *ipcs = NULL;
/*!
* \internal
* \brief Build the XML reply to a client query
*
* param[in] attr Name of requested attribute
* param[in] host Name of requested host (or NULL for all hosts)
*
* \return New XML reply
* \note Caller is responsible for freeing the resulting XML
*/
static xmlNode *build_query_reply(const char *attr, const char *host)
{
xmlNode *reply = create_xml_node(NULL, __func__);
attribute_t *a;
if (reply == NULL) {
return NULL;
}
crm_xml_add(reply, F_TYPE, T_ATTRD);
crm_xml_add(reply, F_SUBTYPE, PCMK__ATTRD_CMD_QUERY);
crm_xml_add(reply, PCMK__XA_ATTR_VERSION, ATTRD_PROTOCOL_VERSION);
/* If desired attribute exists, add its value(s) to the reply */
a = g_hash_table_lookup(attributes, attr);
if (a) {
attribute_value_t *v;
xmlNode *host_value;
crm_xml_add(reply, PCMK__XA_ATTR_NAME, attr);
/* Allow caller to use "localhost" to refer to local node */
if (pcmk__str_eq(host, "localhost", pcmk__str_casei)) {
host = attrd_cluster->uname;
crm_trace("Mapped localhost to %s", host);
}
/* If a specific node was requested, add its value */
if (host) {
v = g_hash_table_lookup(a->values, host);
host_value = create_xml_node(reply, XML_CIB_TAG_NODE);
if (host_value == NULL) {
free_xml(reply);
return NULL;
}
pcmk__xe_add_node(host_value, host, 0);
crm_xml_add(host_value, PCMK__XA_ATTR_VALUE,
(v? v->current : NULL));
/* Otherwise, add all nodes' values */
} else {
GHashTableIter iter;
g_hash_table_iter_init(&iter, a->values);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &v)) {
host_value = create_xml_node(reply, XML_CIB_TAG_NODE);
if (host_value == NULL) {
free_xml(reply);
return NULL;
}
pcmk__xe_add_node(host_value, v->nodename, 0);
crm_xml_add(host_value, PCMK__XA_ATTR_VALUE, v->current);
}
}
}
return reply;
}
xmlNode *
attrd_client_clear_failure(pcmk__request_t *request)
{
xmlNode *xml = request->xml;
const char *rsc, *op, *interval_spec;
if (minimum_protocol_version >= 2) {
/* Propagate to all peers (including ourselves).
* This ends up at attrd_peer_message().
*/
attrd_send_message(NULL, xml, false);
pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
return NULL;
}
rsc = crm_element_value(xml, PCMK__XA_ATTR_RESOURCE);
op = crm_element_value(xml, PCMK__XA_ATTR_OPERATION);
interval_spec = crm_element_value(xml, PCMK__XA_ATTR_INTERVAL);
/* Map this to an update */
crm_xml_add(xml, PCMK__XA_TASK, PCMK__ATTRD_CMD_UPDATE);
/* Add regular expression matching desired attributes */
if (rsc) {
char *pattern;
if (op == NULL) {
pattern = crm_strdup_printf(ATTRD_RE_CLEAR_ONE, rsc);
} else {
guint interval_ms = crm_parse_interval_spec(interval_spec);
pattern = crm_strdup_printf(ATTRD_RE_CLEAR_OP,
rsc, op, interval_ms);
}
crm_xml_add(xml, PCMK__XA_ATTR_PATTERN, pattern);
free(pattern);
} else {
crm_xml_add(xml, PCMK__XA_ATTR_PATTERN, ATTRD_RE_CLEAR_ALL);
}
/* Make sure attribute and value are not set, so we delete via regex */
if (crm_element_value(xml, PCMK__XA_ATTR_NAME)) {
crm_xml_replace(xml, PCMK__XA_ATTR_NAME, NULL);
}
if (crm_element_value(xml, PCMK__XA_ATTR_VALUE)) {
crm_xml_replace(xml, PCMK__XA_ATTR_VALUE, NULL);
}
return attrd_client_update(request);
}
xmlNode *
attrd_client_peer_remove(pcmk__request_t *request)
{
xmlNode *xml = request->xml;
// Host and ID are not used in combination, rather host has precedence
const char *host = crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME);
char *host_alloc = NULL;
attrd_send_ack(request->ipc_client, request->ipc_id, request->ipc_flags);
if (host == NULL) {
int nodeid = 0;
crm_element_value_int(xml, PCMK__XA_ATTR_NODE_ID, &nodeid);
if (nodeid > 0) {
- crm_node_t *node = pcmk__search_cluster_node_cache(nodeid, NULL);
+ crm_node_t *node = pcmk__search_cluster_node_cache(nodeid, NULL,
+ NULL);
char *host_alloc = NULL;
if (node && node->uname) {
// Use cached name if available
host = node->uname;
} else {
// Otherwise ask cluster layer
host_alloc = get_node_name(nodeid);
host = host_alloc;
}
pcmk__xe_add_node(xml, host, 0);
}
}
if (host) {
crm_info("Client %s is requesting all values for %s be removed",
pcmk__client_name(request->ipc_client), host);
attrd_send_message(NULL, xml, false); /* ends up at attrd_peer_message() */
free(host_alloc);
} else {
crm_info("Ignoring request by client %s to remove all peer values without specifying peer",
pcmk__client_name(request->ipc_client));
}
pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
return NULL;
}
xmlNode *
attrd_client_query(pcmk__request_t *request)
{
xmlNode *query = request->xml;
xmlNode *reply = NULL;
const char *attr = NULL;
crm_debug("Query arrived from %s", pcmk__client_name(request->ipc_client));
/* Request must specify attribute name to query */
attr = crm_element_value(query, PCMK__XA_ATTR_NAME);
if (attr == NULL) {
pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR,
"Ignoring malformed query from %s (no attribute name given)",
pcmk__client_name(request->ipc_client));
return NULL;
}
/* Build the XML reply */
reply = build_query_reply(attr, crm_element_value(query,
PCMK__XA_ATTR_NODE_NAME));
if (reply == NULL) {
pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR,
"Could not respond to query from %s: could not create XML reply",
pcmk__client_name(request->ipc_client));
return NULL;
} else {
pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
}
request->ipc_client->request_id = 0;
return reply;
}
xmlNode *
attrd_client_refresh(pcmk__request_t *request)
{
crm_info("Updating all attributes");
attrd_send_ack(request->ipc_client, request->ipc_id, request->ipc_flags);
attrd_write_attributes(true, true);
pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
return NULL;
}
static void
handle_missing_host(xmlNode *xml)
{
const char *host = crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME);
if (host == NULL) {
crm_trace("Inferring host");
pcmk__xe_add_node(xml, attrd_cluster->uname, attrd_cluster->nodeid);
}
}
/* Convert a single IPC message with a regex into one with multiple children, one
* for each regex match.
*/
static int
expand_regexes(xmlNode *xml, const char *attr, const char *value, const char *regex)
{
if (attr == NULL && regex) {
bool matched = false;
GHashTableIter aIter;
regex_t r_patt;
crm_debug("Setting %s to %s", regex, value);
if (regcomp(&r_patt, regex, REG_EXTENDED|REG_NOSUB)) {
return EINVAL;
}
g_hash_table_iter_init(&aIter, attributes);
while (g_hash_table_iter_next(&aIter, (gpointer *) & attr, NULL)) {
int status = regexec(&r_patt, attr, 0, NULL, 0);
if (status == 0) {
xmlNode *child = create_xml_node(xml, XML_ATTR_OP);
crm_trace("Matched %s with %s", attr, regex);
matched = true;
/* Copy all the attributes from the parent over, but remove the
* regex and replace it with the name.
*/
attrd_copy_xml_attributes(xml, child);
crm_xml_replace(child, PCMK__XA_ATTR_PATTERN, NULL);
crm_xml_add(child, PCMK__XA_ATTR_NAME, attr);
}
}
regfree(&r_patt);
/* Return a code if we never matched anything. This should not be treated
* as an error. It indicates there was a regex, and it was a valid regex,
* but simply did not match anything and the caller should not continue
* doing any regex-related processing.
*/
if (!matched) {
return pcmk_rc_op_unsatisfied;
}
} else if (attr == NULL) {
return pcmk_rc_bad_nvpair;
}
return pcmk_rc_ok;
}
static int
handle_regexes(pcmk__request_t *request)
{
xmlNode *xml = request->xml;
int rc = pcmk_rc_ok;
const char *attr = crm_element_value(xml, PCMK__XA_ATTR_NAME);
const char *value = crm_element_value(xml, PCMK__XA_ATTR_VALUE);
const char *regex = crm_element_value(xml, PCMK__XA_ATTR_PATTERN);
rc = expand_regexes(xml, attr, value, regex);
if (rc == EINVAL) {
pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR,
"Bad regex '%s' for update from client %s", regex,
pcmk__client_name(request->ipc_client));
} else if (rc == pcmk_rc_bad_nvpair) {
crm_err("Update request did not specify attribute or regular expression");
pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR,
"Client %s update request did not specify attribute or regular expression",
pcmk__client_name(request->ipc_client));
}
return rc;
}
static int
handle_value_expansion(const char **value, xmlNode *xml, const char *op,
const char *attr)
{
attribute_t *a = g_hash_table_lookup(attributes, attr);
if (a == NULL && pcmk__str_eq(op, PCMK__ATTRD_CMD_UPDATE_DELAY, pcmk__str_none)) {
return EINVAL;
}
if (*value && attrd_value_needs_expansion(*value)) {
int int_value;
attribute_value_t *v = NULL;
if (a) {
const char *host = crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME);
v = g_hash_table_lookup(a->values, host);
}
int_value = attrd_expand_value(*value, (v? v->current : NULL));
crm_info("Expanded %s=%s to %d", attr, *value, int_value);
crm_xml_add_int(xml, PCMK__XA_ATTR_VALUE, int_value);
/* Replacing the value frees the previous memory, so re-query it */
*value = crm_element_value(xml, PCMK__XA_ATTR_VALUE);
}
return pcmk_rc_ok;
}
static void
send_update_msg_to_cluster(pcmk__request_t *request, xmlNode *xml)
{
if (pcmk__str_eq(attrd_request_sync_point(xml), PCMK__VALUE_CLUSTER, pcmk__str_none)) {
/* The client is waiting on the cluster-wide sync point. In this case,
* the response ACK is not sent until this attrd broadcasts the update
* and receives its own confirmation back from all peers.
*/
attrd_expect_confirmations(request, attrd_cluster_sync_point_update);
attrd_send_message(NULL, xml, true); /* ends up at attrd_peer_message() */
} else {
/* The client is either waiting on the local sync point or was not
* waiting on any sync point at all. For the local sync point, the
* response ACK is sent in attrd_peer_update. For clients not
* waiting on any sync point, the response ACK is sent in
* handle_update_request immediately before this function was called.
*/
attrd_send_message(NULL, xml, false); /* ends up at attrd_peer_message() */
}
}
static int
send_child_update(xmlNode *child, void *data)
{
pcmk__request_t *request = (pcmk__request_t *) data;
/* Calling pcmk__set_result is handled by one of these calls to
* attrd_client_update, so no need to do it again here.
*/
request->xml = child;
attrd_client_update(request);
return pcmk_rc_ok;
}
xmlNode *
attrd_client_update(pcmk__request_t *request)
{
xmlNode *xml = request->xml;
const char *attr, *value, *regex;
/* If the message has children, that means it is a message from a newer
* client that supports sending multiple operations at a time. There are
* two ways we can handle that.
*/
if (xml_has_children(xml)) {
if (ATTRD_SUPPORTS_MULTI_MESSAGE(minimum_protocol_version)) {
/* First, if all peers support a certain protocol version, we can
* just broadcast the big message and they'll handle it. However,
* we also need to apply all the transformations in this function
* to the children since they don't happen anywhere else.
*/
for (xmlNode *child = first_named_child(xml, XML_ATTR_OP); child != NULL;
child = crm_next_same_xml(child)) {
attr = crm_element_value(child, PCMK__XA_ATTR_NAME);
value = crm_element_value(child, PCMK__XA_ATTR_VALUE);
handle_missing_host(child);
if (handle_value_expansion(&value, child, request->op, attr) == EINVAL) {
pcmk__format_result(&request->result, CRM_EX_NOSUCH, PCMK_EXEC_ERROR,
"Attribute %s does not exist", attr);
return NULL;
}
}
send_update_msg_to_cluster(request, xml);
pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
} else {
/* Save the original xml node pointer so it can be restored after iterating
* over all the children.
*/
xmlNode *orig_xml = request->xml;
/* Second, if they do not support that protocol version, split it
* up into individual messages and call attrd_client_update on
* each one.
*/
pcmk__xe_foreach_child(xml, XML_ATTR_OP, send_child_update, request);
request->xml = orig_xml;
}
return NULL;
}
attr = crm_element_value(xml, PCMK__XA_ATTR_NAME);
value = crm_element_value(xml, PCMK__XA_ATTR_VALUE);
regex = crm_element_value(xml, PCMK__XA_ATTR_PATTERN);
if (handle_regexes(request) != pcmk_rc_ok) {
/* Error handling was already dealt with in handle_regexes, so just return. */
return NULL;
} else if (regex) {
/* Recursively call attrd_client_update on the new message with regexes
* expanded. If supported by the attribute daemon, this means that all
* matches can also be handled atomically.
*/
return attrd_client_update(request);
}
handle_missing_host(xml);
if (handle_value_expansion(&value, xml, request->op, attr) == EINVAL) {
pcmk__format_result(&request->result, CRM_EX_NOSUCH, PCMK_EXEC_ERROR,
"Attribute %s does not exist", attr);
return NULL;
}
crm_debug("Broadcasting %s[%s]=%s%s", attr, crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME),
value, (attrd_election_won()? " (writer)" : ""));
send_update_msg_to_cluster(request, xml);
pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
return NULL;
}
/*!
* \internal
* \brief Accept a new client IPC connection
*
* \param[in,out] c New connection
* \param[in] uid Client user id
* \param[in] gid Client group id
*
* \return pcmk_ok on success, -errno otherwise
*/
static int32_t
attrd_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid)
{
crm_trace("New client connection %p", c);
if (attrd_shutting_down()) {
crm_info("Ignoring new connection from pid %d during shutdown",
pcmk__client_pid(c));
return -EPERM;
}
if (pcmk__new_client(c, uid, gid) == NULL) {
return -EIO;
}
return pcmk_ok;
}
/*!
* \internal
* \brief Destroy a client IPC connection
*
* \param[in] c Connection to destroy
*
* \return FALSE (i.e. do not re-run this callback)
*/
static int32_t
attrd_ipc_closed(qb_ipcs_connection_t *c)
{
pcmk__client_t *client = pcmk__find_client(c);
if (client == NULL) {
crm_trace("Ignoring request to clean up unknown connection %p", c);
} else {
crm_trace("Cleaning up closed client connection %p", c);
/* Remove the client from the sync point waitlist if it's present. */
attrd_remove_client_from_waitlist(client);
/* And no longer wait for confirmations from any peers. */
attrd_do_not_wait_for_client(client);
pcmk__free_client(client);
}
return FALSE;
}
/*!
* \internal
* \brief Destroy a client IPC connection
*
* \param[in,out] c Connection to destroy
*
* \note We handle a destroyed connection the same as a closed one,
* but we need a separate handler because the return type is different.
*/
static void
attrd_ipc_destroy(qb_ipcs_connection_t *c)
{
crm_trace("Destroying client connection %p", c);
attrd_ipc_closed(c);
}
static int32_t
attrd_ipc_dispatch(qb_ipcs_connection_t * c, void *data, size_t size)
{
uint32_t id = 0;
uint32_t flags = 0;
pcmk__client_t *client = pcmk__find_client(c);
xmlNode *xml = NULL;
// Sanity-check, and parse XML from IPC data
CRM_CHECK((c != NULL) && (client != NULL), return 0);
if (data == NULL) {
crm_debug("No IPC data from PID %d", pcmk__client_pid(c));
return 0;
}
xml = pcmk__client_data2xml(client, data, &id, &flags);
if (xml == NULL) {
crm_debug("Unrecognizable IPC data from PID %d", pcmk__client_pid(c));
pcmk__ipc_send_ack(client, id, flags, "ack", NULL, CRM_EX_PROTOCOL);
return 0;
} else {
pcmk__request_t request = {
.ipc_client = client,
.ipc_id = id,
.ipc_flags = flags,
.peer = NULL,
.xml = xml,
.call_options = 0,
.result = PCMK__UNKNOWN_RESULT,
};
CRM_ASSERT(client->user != NULL);
pcmk__update_acl_user(xml, PCMK__XA_ATTR_USER, client->user);
request.op = crm_element_value_copy(request.xml, PCMK__XA_TASK);
CRM_CHECK(request.op != NULL, return 0);
attrd_handle_request(&request);
pcmk__reset_request(&request);
}
free_xml(xml);
return 0;
}
static struct qb_ipcs_service_handlers ipc_callbacks = {
.connection_accept = attrd_ipc_accept,
.connection_created = NULL,
.msg_process = attrd_ipc_dispatch,
.connection_closed = attrd_ipc_closed,
.connection_destroyed = attrd_ipc_destroy
};
void
attrd_ipc_fini(void)
{
if (ipcs != NULL) {
pcmk__drop_all_clients(ipcs);
qb_ipcs_destroy(ipcs);
ipcs = NULL;
}
}
/*!
* \internal
* \brief Set up attrd IPC communication
*/
void
attrd_init_ipc(void)
{
pcmk__serve_attrd_ipc(&ipcs, &ipc_callbacks);
}
diff --git a/daemons/based/based_messages.c b/daemons/based/based_messages.c
index a1258eda05..5e221039b9 100644
--- a/daemons/based/based_messages.c
+++ b/daemons/based/based_messages.c
@@ -1,418 +1,418 @@
/*
* Copyright 2004-2023 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU General Public License version 2
* or later (GPLv2+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <fcntl.h>
#include <time.h>
#include <sys/param.h>
#include <sys/types.h>
#include <crm/crm.h>
#include <crm/cib/internal.h>
#include <crm/msg_xml.h>
#include <crm/common/xml.h>
#include <crm/common/ipc_internal.h>
#include <crm/common/xml_internal.h>
#include <crm/cluster/internal.h>
#include <pacemaker-based.h>
/* Maximum number of diffs to ignore while waiting for a resync */
#define MAX_DIFF_RETRY 5
bool based_is_primary = false;
xmlNode *the_cib = NULL;
int
cib_process_shutdown_req(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
const char *host = crm_element_value(req, F_ORIG);
*answer = NULL;
if (crm_element_value(req, F_CIB_ISREPLY) == NULL) {
crm_info("Peer %s is requesting to shut down", host);
return pcmk_ok;
}
if (cib_shutdown_flag == FALSE) {
crm_err("Peer %s mistakenly thinks we wanted to shut down", host);
return -EINVAL;
}
crm_info("Peer %s has acknowledged our shutdown request", host);
terminate_cib(__func__, 0);
return pcmk_ok;
}
// @COMPAT: Remove when PCMK__CIB_REQUEST_NOOP is removed
int
cib_process_noop(const char *op, int options, const char *section, xmlNode *req,
xmlNode *input, xmlNode *existing_cib, xmlNode **result_cib,
xmlNode **answer)
{
crm_trace("Processing \"%s\" event", op);
*answer = NULL;
return pcmk_ok;
}
int
cib_process_readwrite(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
int result = pcmk_ok;
crm_trace("Processing \"%s\" event", op);
if (pcmk__str_eq(op, PCMK__CIB_REQUEST_IS_PRIMARY, pcmk__str_none)) {
if (based_is_primary) {
result = pcmk_ok;
} else {
result = -EPERM;
}
return result;
}
if (pcmk__str_eq(op, PCMK__CIB_REQUEST_PRIMARY, pcmk__str_none)) {
if (!based_is_primary) {
crm_info("We are now in R/W mode");
based_is_primary = true;
} else {
crm_debug("We are still in R/W mode");
}
} else if (based_is_primary) {
crm_info("We are now in R/O mode");
based_is_primary = false;
}
return result;
}
/* Set to 1 when a sync is requested, incremented when a diff is ignored,
* reset to 0 when a sync is received
*/
static int sync_in_progress = 0;
void
send_sync_request(const char *host)
{
xmlNode *sync_me = create_xml_node(NULL, "sync-me");
crm_info("Requesting re-sync from %s", (host? host : "all peers"));
sync_in_progress = 1;
crm_xml_add(sync_me, F_TYPE, "cib");
crm_xml_add(sync_me, F_CIB_OPERATION, PCMK__CIB_REQUEST_SYNC_TO_ONE);
crm_xml_add(sync_me, F_CIB_DELEGATED,
stand_alone? "localhost" : crm_cluster->uname);
send_cluster_message(host ? crm_get_peer(0, host) : NULL, crm_msg_cib, sync_me, FALSE);
free_xml(sync_me);
}
int
cib_process_ping(const char *op, int options, const char *section, xmlNode * req, xmlNode * input,
xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer)
{
const char *host = crm_element_value(req, F_ORIG);
const char *seq = crm_element_value(req, F_CIB_PING_ID);
char *digest = calculate_xml_versioned_digest(the_cib, FALSE, TRUE, CRM_FEATURE_SET);
crm_trace("Processing \"%s\" event %s from %s", op, seq, host);
*answer = create_xml_node(NULL, XML_CRM_TAG_PING);
crm_xml_add(*answer, XML_ATTR_CRM_VERSION, CRM_FEATURE_SET);
crm_xml_add(*answer, XML_ATTR_DIGEST, digest);
crm_xml_add(*answer, F_CIB_PING_ID, seq);
pcmk__if_tracing(
{
// Append additional detail so the receiver can log the differences
add_message_xml(*answer, F_CIB_CALLDATA, the_cib);
},
{
// Always include at least the version details
const char *tag = TYPE(the_cib);
xmlNode *shallow = create_xml_node(NULL, tag);
copy_in_properties(shallow, the_cib);
add_message_xml(*answer, F_CIB_CALLDATA, shallow);
free_xml(shallow);
}
);
crm_info("Reporting our current digest to %s: %s for %s.%s.%s",
host, digest,
crm_element_value(existing_cib, XML_ATTR_GENERATION_ADMIN),
crm_element_value(existing_cib, XML_ATTR_GENERATION),
crm_element_value(existing_cib, XML_ATTR_NUMUPDATES));
free(digest);
return pcmk_ok;
}
int
cib_process_sync(const char *op, int options, const char *section, xmlNode * req, xmlNode * input,
xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer)
{
return sync_our_cib(req, TRUE);
}
int
cib_process_upgrade_server(const char *op, int options, const char *section, xmlNode * req, xmlNode * input,
xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer)
{
int rc = pcmk_ok;
*answer = NULL;
if(crm_element_value(req, F_CIB_SCHEMA_MAX)) {
/* The originator of an upgrade request sends it to the DC, without
* F_CIB_SCHEMA_MAX. If an upgrade is needed, the DC re-broadcasts the
* request with F_CIB_SCHEMA_MAX, and each node performs the upgrade
* (and notifies its local clients) here.
*/
return cib_process_upgrade(
op, options, section, req, input, existing_cib, result_cib, answer);
} else {
int new_version = 0;
int current_version = 0;
xmlNode *scratch = copy_xml(existing_cib);
const char *host = crm_element_value(req, F_ORIG);
const char *value = crm_element_value(existing_cib, XML_ATTR_VALIDATION);
const char *client_id = crm_element_value(req, F_CIB_CLIENTID);
const char *call_opts = crm_element_value(req, F_CIB_CALLOPTS);
const char *call_id = crm_element_value(req, F_CIB_CALLID);
crm_trace("Processing \"%s\" event", op);
if (value != NULL) {
current_version = get_schema_version(value);
}
rc = update_validation(&scratch, &new_version, 0, TRUE, TRUE);
if (new_version > current_version) {
xmlNode *up = create_xml_node(NULL, __func__);
rc = pcmk_ok;
crm_notice("Upgrade request from %s verified", host);
crm_xml_add(up, F_TYPE, "cib");
crm_xml_add(up, F_CIB_OPERATION, PCMK__CIB_REQUEST_UPGRADE);
crm_xml_add(up, F_CIB_SCHEMA_MAX, get_schema_name(new_version));
crm_xml_add(up, F_CIB_DELEGATED, host);
crm_xml_add(up, F_CIB_CLIENTID, client_id);
crm_xml_add(up, F_CIB_CALLOPTS, call_opts);
crm_xml_add(up, F_CIB_CALLID, call_id);
if (cib_legacy_mode() && based_is_primary) {
rc = cib_process_upgrade(
op, options, section, up, input, existing_cib, result_cib, answer);
} else {
send_cluster_message(NULL, crm_msg_cib, up, FALSE);
}
free_xml(up);
} else if(rc == pcmk_ok) {
rc = -pcmk_err_schema_unchanged;
}
if (rc != pcmk_ok) {
// Notify originating peer so it can notify its local clients
- crm_node_t *origin = pcmk__search_cluster_node_cache(0, host);
+ crm_node_t *origin = pcmk__search_cluster_node_cache(0, host, NULL);
crm_info("Rejecting upgrade request from %s: %s "
CRM_XS " rc=%d peer=%s", host, pcmk_strerror(rc), rc,
(origin? origin->uname : "lost"));
if (origin) {
xmlNode *up = create_xml_node(NULL, __func__);
crm_xml_add(up, F_TYPE, "cib");
crm_xml_add(up, F_CIB_OPERATION, PCMK__CIB_REQUEST_UPGRADE);
crm_xml_add(up, F_CIB_DELEGATED, host);
crm_xml_add(up, F_CIB_ISREPLY, host);
crm_xml_add(up, F_CIB_CLIENTID, client_id);
crm_xml_add(up, F_CIB_CALLOPTS, call_opts);
crm_xml_add(up, F_CIB_CALLID, call_id);
crm_xml_add_int(up, F_CIB_UPGRADE_RC, rc);
if (send_cluster_message(origin, crm_msg_cib, up, TRUE)
== FALSE) {
crm_warn("Could not send CIB upgrade result to %s", host);
}
free_xml(up);
}
}
free_xml(scratch);
}
return rc;
}
int
cib_process_sync_one(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
return sync_our_cib(req, FALSE);
}
int
cib_server_process_diff(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
int rc = pcmk_ok;
if (sync_in_progress > MAX_DIFF_RETRY) {
/* Don't ignore diffs forever; the last request may have been lost.
* If the diff fails, we'll ask for another full resync.
*/
sync_in_progress = 0;
}
// The primary instance should never ignore a diff
if (sync_in_progress && !based_is_primary) {
int diff_add_updates = 0;
int diff_add_epoch = 0;
int diff_add_admin_epoch = 0;
int diff_del_updates = 0;
int diff_del_epoch = 0;
int diff_del_admin_epoch = 0;
cib_diff_version_details(input,
&diff_add_admin_epoch, &diff_add_epoch, &diff_add_updates,
&diff_del_admin_epoch, &diff_del_epoch, &diff_del_updates);
sync_in_progress++;
crm_notice("Not applying diff %d.%d.%d -> %d.%d.%d (sync in progress)",
diff_del_admin_epoch, diff_del_epoch, diff_del_updates,
diff_add_admin_epoch, diff_add_epoch, diff_add_updates);
return -pcmk_err_diff_resync;
}
rc = cib_process_diff(op, options, section, req, input, existing_cib, result_cib, answer);
crm_trace("result: %s (%d), %s", pcmk_strerror(rc), rc,
(based_is_primary? "primary": "secondary"));
if ((rc == -pcmk_err_diff_resync) && !based_is_primary) {
free_xml(*result_cib);
*result_cib = NULL;
send_sync_request(NULL);
} else if (rc == -pcmk_err_diff_resync) {
rc = -pcmk_err_diff_failed;
if (options & cib_force_diff) {
crm_warn("Not requesting full refresh in R/W mode");
}
} else if ((rc != pcmk_ok) && !based_is_primary && cib_legacy_mode()) {
crm_warn("Requesting full CIB refresh because update failed: %s"
CRM_XS " rc=%d", pcmk_strerror(rc), rc);
pcmk__output_set_log_level(logger_out, LOG_INFO);
logger_out->message(logger_out, "xml-patchset", input);
free_xml(*result_cib);
*result_cib = NULL;
send_sync_request(NULL);
}
return rc;
}
int
cib_process_replace_svr(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
const char *tag = crm_element_name(input);
int rc =
cib_process_replace(op, options, section, req, input, existing_cib, result_cib, answer);
if (rc == pcmk_ok && pcmk__str_eq(tag, XML_TAG_CIB, pcmk__str_casei)) {
sync_in_progress = 0;
}
return rc;
}
// @COMPAT: Remove when PCMK__CIB_REQUEST_ABS_DELETE is removed
int
cib_process_delete_absolute(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
return -EINVAL;
}
int
sync_our_cib(xmlNode * request, gboolean all)
{
int result = pcmk_ok;
char *digest = NULL;
const char *host = crm_element_value(request, F_ORIG);
const char *op = crm_element_value(request, F_CIB_OPERATION);
xmlNode *replace_request = NULL;
CRM_CHECK(the_cib != NULL, return -EINVAL);
replace_request = cib_msg_copy(request);
CRM_CHECK(replace_request != NULL, return -EINVAL);
crm_debug("Syncing CIB to %s", all ? "all peers" : host);
if (all == FALSE && host == NULL) {
crm_log_xml_err(request, "bad sync");
}
/* remove the "all == FALSE" condition
*
* sync_from was failing, the local client wasn't being notified
* because it didn't know it was a reply
* setting this does not prevent the other nodes from applying it
* if all == TRUE
*/
if (host != NULL) {
crm_xml_add(replace_request, F_CIB_ISREPLY, host);
}
if (all) {
xml_remove_prop(replace_request, F_CIB_HOST);
}
crm_xml_add(replace_request, F_CIB_OPERATION, PCMK__CIB_REQUEST_REPLACE);
crm_xml_add(replace_request, "original_" F_CIB_OPERATION, op);
pcmk__xe_set_bool_attr(replace_request, F_CIB_GLOBAL_UPDATE, true);
crm_xml_add(replace_request, XML_ATTR_CRM_VERSION, CRM_FEATURE_SET);
digest = calculate_xml_versioned_digest(the_cib, FALSE, TRUE, CRM_FEATURE_SET);
crm_xml_add(replace_request, XML_ATTR_DIGEST, digest);
add_message_xml(replace_request, F_CIB_CALLDATA, the_cib);
if (send_cluster_message
(all ? NULL : crm_get_peer(0, host), crm_msg_cib, replace_request, FALSE) == FALSE) {
result = -ENOTCONN;
}
free_xml(replace_request);
free(digest);
return result;
}
diff --git a/daemons/controld/controld_corosync.c b/daemons/controld/controld_corosync.c
index 4378b30b58..e7818a5e51 100644
--- a/daemons/controld/controld_corosync.c
+++ b/daemons/controld/controld_corosync.c
@@ -1,164 +1,165 @@
/*
* Copyright 2004-2022 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU General Public License version 2
* or later (GPLv2+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <sys/param.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <crm/crm.h>
#include <crm/cluster/internal.h>
#include <crm/common/xml.h>
#include <pacemaker-controld.h>
#if SUPPORT_COROSYNC
extern void post_cache_update(int seq);
/* A_HA_CONNECT */
static void
crmd_cs_dispatch(cpg_handle_t handle, const struct cpg_name *groupName,
uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
{
uint32_t kind = 0;
const char *from = NULL;
char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
if(data == NULL) {
return;
}
if (kind == crm_class_cluster) {
crm_node_t *peer = NULL;
xmlNode *xml = string2xml(data);
if (xml == NULL) {
crm_err("Could not parse message content (%d): %.100s", kind, data);
free(data);
return;
}
crm_xml_add(xml, F_ORIG, from);
/* crm_xml_add_int(xml, F_SEQ, wrapper->id); Fake? */
peer = crm_get_peer(0, from);
if (!pcmk_is_set(peer->processes, crm_proc_cpg)) {
/* If we can still talk to our peer process on that node,
* then it must be part of the corosync membership
*/
crm_warn("Receiving messages from a node we think is dead: %s[%d]",
peer->uname, peer->id);
crm_update_peer_proc(__func__, peer, crm_proc_cpg,
ONLINESTATUS);
}
crmd_ha_msg_filter(xml);
free_xml(xml);
} else {
crm_err("Invalid message class (%d): %.100s", kind, data);
}
free(data);
}
static gboolean
crmd_quorum_callback(unsigned long long seq, gboolean quorate)
{
crm_update_quorum(quorate, FALSE);
post_cache_update(seq);
return TRUE;
}
static void
crmd_cs_destroy(gpointer user_data)
{
if (!pcmk_is_set(controld_globals.fsa_input_register, R_HA_DISCONNECTED)) {
crm_crit("Lost connection to cluster layer, shutting down");
crmd_exit(CRM_EX_DISCONNECT);
} else {
crm_info("Corosync connection closed");
}
}
/*!
* \brief Handle a Corosync notification of a CPG configuration change
*
* \param[in] handle CPG connection
* \param[in] cpg_name CPG group name
* \param[in] member_list List of current CPG members
* \param[in] member_list_entries Number of entries in \p member_list
* \param[in] left_list List of CPG members that left
* \param[in] left_list_entries Number of entries in \p left_list
* \param[in] joined_list List of CPG members that joined
* \param[in] joined_list_entries Number of entries in \p joined_list
*/
static void
cpg_membership_callback(cpg_handle_t handle, const struct cpg_name *cpg_name,
const struct cpg_address *member_list,
size_t member_list_entries,
const struct cpg_address *left_list,
size_t left_list_entries,
const struct cpg_address *joined_list,
size_t joined_list_entries)
{
/* When nodes leave CPG, the DC clears their transient node attributes.
*
* However if there is no DC, or the DC is among the nodes that left, each
* remaining node needs to do the clearing, to ensure it gets done.
* Otherwise, the attributes would persist when the nodes rejoin, which
* could have serious consequences for unfencing, agents that use attributes
* for internal logic, etc.
*
* Here, we set a global boolean if the DC is among the nodes that left, for
* use by the peer callback.
*/
if (controld_globals.dc_name != NULL) {
crm_node_t *peer = NULL;
- peer = pcmk__search_cluster_node_cache(0, controld_globals.dc_name);
+ peer = pcmk__search_cluster_node_cache(0, controld_globals.dc_name,
+ NULL);
if (peer != NULL) {
for (int i = 0; i < left_list_entries; ++i) {
if (left_list[i].nodeid == peer->id) {
controld_set_global_flags(controld_dc_left);
break;
}
}
}
}
// Process the change normally, which will call the peer callback as needed
pcmk_cpg_membership(handle, cpg_name, member_list, member_list_entries,
left_list, left_list_entries,
joined_list, joined_list_entries);
controld_clear_global_flags(controld_dc_left);
}
extern gboolean crm_connect_corosync(crm_cluster_t * cluster);
gboolean
crm_connect_corosync(crm_cluster_t * cluster)
{
if (is_corosync_cluster()) {
crm_set_status_callback(&peer_update_callback);
cluster->cpg.cpg_deliver_fn = crmd_cs_dispatch;
cluster->cpg.cpg_confchg_fn = cpg_membership_callback;
cluster->destroy = crmd_cs_destroy;
if (crm_cluster_connect(cluster)) {
pcmk__corosync_quorum_connect(crmd_quorum_callback,
crmd_cs_destroy);
return TRUE;
}
}
return FALSE;
}
#endif
diff --git a/daemons/controld/controld_messages.c b/daemons/controld/controld_messages.c
index 54b27ec33d..553dee67cd 100644
--- a/daemons/controld/controld_messages.c
+++ b/daemons/controld/controld_messages.c
@@ -1,1307 +1,1307 @@
/*
* Copyright 2004-2023 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU General Public License version 2
* or later (GPLv2+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <sys/param.h>
#include <string.h>
#include <time.h>
#include <crm/crm.h>
#include <crm/msg_xml.h>
#include <crm/common/xml.h>
#include <crm/cluster/internal.h>
#include <crm/cib.h>
#include <crm/common/ipc_internal.h>
#include <pacemaker-controld.h>
extern void crm_shutdown(int nsig);
static enum crmd_fsa_input handle_message(xmlNode *msg,
enum crmd_fsa_cause cause);
static void handle_response(xmlNode *stored_msg);
static enum crmd_fsa_input handle_request(xmlNode *stored_msg,
enum crmd_fsa_cause cause);
static enum crmd_fsa_input handle_shutdown_request(xmlNode *stored_msg);
static void send_msg_via_ipc(xmlNode * msg, const char *sys);
/* debug only, can wrap all it likes */
static int last_data_id = 0;
void
register_fsa_error_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input,
fsa_data_t * cur_data, void *new_data, const char *raised_from)
{
/* save the current actions if any */
if (controld_globals.fsa_actions != A_NOTHING) {
register_fsa_input_adv(cur_data ? cur_data->fsa_cause : C_FSA_INTERNAL,
I_NULL, cur_data ? cur_data->data : NULL,
controld_globals.fsa_actions, TRUE, __func__);
}
/* reset the action list */
crm_info("Resetting the current action list");
fsa_dump_actions(controld_globals.fsa_actions, "Drop");
controld_globals.fsa_actions = A_NOTHING;
/* register the error */
register_fsa_input_adv(cause, input, new_data, A_NOTHING, TRUE, raised_from);
}
void
register_fsa_input_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input,
void *data, uint64_t with_actions,
gboolean prepend, const char *raised_from)
{
unsigned old_len = g_list_length(controld_globals.fsa_message_queue);
fsa_data_t *fsa_data = NULL;
if (raised_from == NULL) {
raised_from = "<unknown>";
}
if (input == I_NULL && with_actions == A_NOTHING /* && data == NULL */ ) {
/* no point doing anything */
crm_err("Cannot add entry to queue: no input and no action");
return;
}
if (input == I_WAIT_FOR_EVENT) {
controld_set_global_flags(controld_fsa_is_stalled);
crm_debug("Stalling the FSA pending further input: source=%s cause=%s data=%p queue=%d",
raised_from, fsa_cause2string(cause), data, old_len);
if (old_len > 0) {
fsa_dump_queue(LOG_TRACE);
prepend = FALSE;
}
if (data == NULL) {
controld_set_fsa_action_flags(with_actions);
fsa_dump_actions(with_actions, "Restored");
return;
}
/* Store everything in the new event and reset
* controld_globals.fsa_actions
*/
with_actions |= controld_globals.fsa_actions;
controld_globals.fsa_actions = A_NOTHING;
}
last_data_id++;
crm_trace("%s %s FSA input %d (%s) due to %s, %s data",
raised_from, (prepend? "prepended" : "appended"), last_data_id,
fsa_input2string(input), fsa_cause2string(cause),
(data? "with" : "without"));
fsa_data = calloc(1, sizeof(fsa_data_t));
fsa_data->id = last_data_id;
fsa_data->fsa_input = input;
fsa_data->fsa_cause = cause;
fsa_data->origin = raised_from;
fsa_data->data = NULL;
fsa_data->data_type = fsa_dt_none;
fsa_data->actions = with_actions;
if (with_actions != A_NOTHING) {
crm_trace("Adding actions %.16llx to input",
(unsigned long long) with_actions);
}
if (data != NULL) {
switch (cause) {
case C_FSA_INTERNAL:
case C_CRMD_STATUS_CALLBACK:
case C_IPC_MESSAGE:
case C_HA_MESSAGE:
CRM_CHECK(((ha_msg_input_t *) data)->msg != NULL,
crm_err("Bogus data from %s", raised_from));
crm_trace("Copying %s data from %s as cluster message data",
fsa_cause2string(cause), raised_from);
fsa_data->data = copy_ha_msg_input(data);
fsa_data->data_type = fsa_dt_ha_msg;
break;
case C_LRM_OP_CALLBACK:
crm_trace("Copying %s data from %s as lrmd_event_data_t",
fsa_cause2string(cause), raised_from);
fsa_data->data = lrmd_copy_event((lrmd_event_data_t *) data);
fsa_data->data_type = fsa_dt_lrm;
break;
case C_TIMER_POPPED:
case C_SHUTDOWN:
case C_UNKNOWN:
case C_STARTUP:
crm_crit("Copying %s data (from %s) is not yet implemented",
fsa_cause2string(cause), raised_from);
crmd_exit(CRM_EX_SOFTWARE);
break;
}
}
/* make sure to free it properly later */
if (prepend) {
controld_globals.fsa_message_queue
= g_list_prepend(controld_globals.fsa_message_queue, fsa_data);
} else {
controld_globals.fsa_message_queue
= g_list_append(controld_globals.fsa_message_queue, fsa_data);
}
crm_trace("FSA message queue length is %d",
g_list_length(controld_globals.fsa_message_queue));
/* fsa_dump_queue(LOG_TRACE); */
if (old_len == g_list_length(controld_globals.fsa_message_queue)) {
crm_err("Couldn't add message to the queue");
}
if (input != I_WAIT_FOR_EVENT) {
controld_trigger_fsa();
}
}
void
fsa_dump_queue(int log_level)
{
int offset = 0;
for (GList *iter = controld_globals.fsa_message_queue; iter != NULL;
iter = iter->next) {
fsa_data_t *data = (fsa_data_t *) iter->data;
do_crm_log_unlikely(log_level,
"queue[%d.%d]: input %s raised by %s(%p.%d)\t(cause=%s)",
offset++, data->id, fsa_input2string(data->fsa_input),
data->origin, data->data, data->data_type,
fsa_cause2string(data->fsa_cause));
}
}
ha_msg_input_t *
copy_ha_msg_input(ha_msg_input_t * orig)
{
ha_msg_input_t *copy = calloc(1, sizeof(ha_msg_input_t));
CRM_ASSERT(copy != NULL);
copy->msg = (orig && orig->msg)? copy_xml(orig->msg) : NULL;
copy->xml = get_message_xml(copy->msg, F_CRM_DATA);
return copy;
}
void
delete_fsa_input(fsa_data_t * fsa_data)
{
lrmd_event_data_t *op = NULL;
xmlNode *foo = NULL;
if (fsa_data == NULL) {
return;
}
crm_trace("About to free %s data", fsa_cause2string(fsa_data->fsa_cause));
if (fsa_data->data != NULL) {
switch (fsa_data->data_type) {
case fsa_dt_ha_msg:
delete_ha_msg_input(fsa_data->data);
break;
case fsa_dt_xml:
foo = fsa_data->data;
free_xml(foo);
break;
case fsa_dt_lrm:
op = (lrmd_event_data_t *) fsa_data->data;
lrmd_free_event(op);
break;
case fsa_dt_none:
if (fsa_data->data != NULL) {
crm_err("Don't know how to free %s data from %s",
fsa_cause2string(fsa_data->fsa_cause), fsa_data->origin);
crmd_exit(CRM_EX_SOFTWARE);
}
break;
}
crm_trace("%s data freed", fsa_cause2string(fsa_data->fsa_cause));
}
free(fsa_data);
}
/* returns the next message */
fsa_data_t *
get_message(void)
{
fsa_data_t *message
= (fsa_data_t *) controld_globals.fsa_message_queue->data;
controld_globals.fsa_message_queue
= g_list_remove(controld_globals.fsa_message_queue, message);
crm_trace("Processing input %d", message->id);
return message;
}
void *
fsa_typed_data_adv(fsa_data_t * fsa_data, enum fsa_data_type a_type, const char *caller)
{
void *ret_val = NULL;
if (fsa_data == NULL) {
crm_err("%s: No FSA data available", caller);
} else if (fsa_data->data == NULL) {
crm_err("%s: No message data available. Origin: %s", caller, fsa_data->origin);
} else if (fsa_data->data_type != a_type) {
crm_crit("%s: Message data was the wrong type! %d vs. requested=%d. Origin: %s",
caller, fsa_data->data_type, a_type, fsa_data->origin);
CRM_ASSERT(fsa_data->data_type == a_type);
} else {
ret_val = fsa_data->data;
}
return ret_val;
}
/* A_MSG_ROUTE */
void
do_msg_route(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input, fsa_data_t * msg_data)
{
ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg);
route_message(msg_data->fsa_cause, input->msg);
}
void
route_message(enum crmd_fsa_cause cause, xmlNode * input)
{
ha_msg_input_t fsa_input;
enum crmd_fsa_input result = I_NULL;
fsa_input.msg = input;
CRM_CHECK(cause == C_IPC_MESSAGE || cause == C_HA_MESSAGE, return);
/* try passing the buck first */
if (relay_message(input, cause == C_IPC_MESSAGE)) {
return;
}
/* handle locally */
result = handle_message(input, cause);
/* done or process later? */
switch (result) {
case I_NULL:
case I_CIB_OP:
case I_ROUTER:
case I_NODE_JOIN:
case I_JOIN_REQUEST:
case I_JOIN_RESULT:
break;
default:
/* Defering local processing of message */
register_fsa_input_later(cause, result, &fsa_input);
return;
}
if (result != I_NULL) {
/* add to the front of the queue */
register_fsa_input(cause, result, &fsa_input);
}
}
gboolean
relay_message(xmlNode * msg, gboolean originated_locally)
{
int dest = 1;
bool is_for_dc = false;
bool is_for_dcib = false;
bool is_for_te = false;
bool is_for_crm = false;
bool is_for_cib = false;
bool is_local = false;
const char *host_to = crm_element_value(msg, F_CRM_HOST_TO);
const char *sys_to = crm_element_value(msg, F_CRM_SYS_TO);
const char *sys_from = crm_element_value(msg, F_CRM_SYS_FROM);
const char *type = crm_element_value(msg, F_TYPE);
const char *task = crm_element_value(msg, F_CRM_TASK);
const char *ref = crm_element_value(msg, XML_ATTR_REFERENCE);
if (ref == NULL) {
ref = "without reference ID";
}
if (msg == NULL) {
crm_warn("Cannot route empty message");
return TRUE;
} else if (pcmk__str_eq(task, CRM_OP_HELLO, pcmk__str_casei)) {
crm_trace("No routing needed for hello message %s", ref);
return TRUE;
} else if (!pcmk__str_eq(type, T_CRM, pcmk__str_casei)) {
crm_warn("Received invalid message %s: type '%s' not '" T_CRM "'",
ref, pcmk__s(type, ""));
crm_log_xml_warn(msg, "[bad message type]");
return TRUE;
} else if (sys_to == NULL) {
crm_warn("Received invalid message %s: no subsystem", ref);
crm_log_xml_warn(msg, "[no subsystem]");
return TRUE;
}
is_for_dc = (strcasecmp(CRM_SYSTEM_DC, sys_to) == 0);
is_for_dcib = (strcasecmp(CRM_SYSTEM_DCIB, sys_to) == 0);
is_for_te = (strcasecmp(CRM_SYSTEM_TENGINE, sys_to) == 0);
is_for_cib = (strcasecmp(CRM_SYSTEM_CIB, sys_to) == 0);
is_for_crm = (strcasecmp(CRM_SYSTEM_CRMD, sys_to) == 0);
is_local = false;
if (pcmk__str_empty(host_to)) {
if (is_for_dc || is_for_te) {
is_local = false;
} else if (is_for_crm) {
if (pcmk__strcase_any_of(task, CRM_OP_NODE_INFO,
PCMK__CONTROLD_CMD_NODES, NULL)) {
/* Node info requests do not specify a host, which is normally
* treated as "all hosts", because the whole point is that the
* client may not know the local node name. Always handle these
* requests locally.
*/
is_local = true;
} else {
is_local = !originated_locally;
}
} else {
is_local = true;
}
} else if (pcmk__str_eq(controld_globals.our_nodename, host_to,
pcmk__str_casei)) {
is_local = true;
} else if (is_for_crm && pcmk__str_eq(task, CRM_OP_LRM_DELETE, pcmk__str_casei)) {
xmlNode *msg_data = get_message_xml(msg, F_CRM_DATA);
const char *mode = crm_element_value(msg_data, PCMK__XA_MODE);
if (pcmk__str_eq(mode, XML_TAG_CIB, pcmk__str_casei)) {
// Local delete of an offline node's resource history
is_local = true;
}
}
if (is_for_dc || is_for_dcib || is_for_te) {
if (AM_I_DC && is_for_te) {
crm_trace("Route message %s locally as transition request", ref);
send_msg_via_ipc(msg, sys_to);
} else if (AM_I_DC) {
crm_trace("Route message %s locally as DC request", ref);
return FALSE; // More to be done by caller
} else if (originated_locally && !pcmk__strcase_any_of(sys_from, CRM_SYSTEM_PENGINE,
CRM_SYSTEM_TENGINE, NULL)) {
if (is_corosync_cluster()) {
dest = text2msg_type(sys_to);
}
crm_trace("Relay message %s to DC", ref);
send_cluster_message(host_to ? crm_get_peer(0, host_to) : NULL, dest, msg, TRUE);
} else {
/* Neither the TE nor the scheduler should be sending messages
* to DCs on other nodes. By definition, if we are no longer the DC,
* then the scheduler's or TE's data should be discarded.
*/
crm_trace("Discard message %s because we are not DC", ref);
}
} else if (is_local && (is_for_crm || is_for_cib)) {
crm_trace("Route message %s locally as controller request", ref);
return FALSE; // More to be done by caller
} else if (is_local) {
crm_trace("Relay message %s locally to %s",
ref, (sys_to? sys_to : "unknown client"));
crm_log_xml_trace(msg, "[IPC relay]");
send_msg_via_ipc(msg, sys_to);
} else {
crm_node_t *node_to = NULL;
if (is_corosync_cluster()) {
dest = text2msg_type(sys_to);
if (dest == crm_msg_none || dest > crm_msg_stonith_ng) {
dest = crm_msg_crmd;
}
}
if (host_to) {
- node_to = pcmk__search_cluster_node_cache(0, host_to);
+ node_to = pcmk__search_cluster_node_cache(0, host_to, NULL);
if (node_to == NULL) {
crm_warn("Cannot route message %s: Unknown node %s",
ref, host_to);
return TRUE;
}
crm_trace("Relay message %s to %s",
ref, (node_to->uname? node_to->uname : "peer"));
} else {
crm_trace("Broadcast message %s to all peers", ref);
}
send_cluster_message(host_to ? node_to : NULL, dest, msg, TRUE);
}
return TRUE; // No further processing of message is needed
}
// Return true if field contains a positive integer
static bool
authorize_version(xmlNode *message_data, const char *field,
const char *client_name, const char *ref, const char *uuid)
{
const char *version = crm_element_value(message_data, field);
long long version_num;
if ((pcmk__scan_ll(version, &version_num, -1LL) != pcmk_rc_ok)
|| (version_num < 0LL)) {
crm_warn("Rejected IPC hello from %s: '%s' is not a valid protocol %s "
CRM_XS " ref=%s uuid=%s",
client_name, ((version == NULL)? "" : version),
field, (ref? ref : "none"), uuid);
return false;
}
return true;
}
/*!
* \internal
* \brief Check whether a client IPC message is acceptable
*
* If a given client IPC message is a hello, "authorize" it by ensuring it has
* valid information such as a protocol version, and return false indicating
* that nothing further needs to be done with the message. If the message is not
* a hello, just return true to indicate it needs further processing.
*
* \param[in] client_msg XML of IPC message
* \param[in,out] curr_client If IPC is not proxied, client that sent message
* \param[in] proxy_session If IPC is proxied, the session ID
*
* \return true if message needs further processing, false if it doesn't
*/
bool
controld_authorize_ipc_message(const xmlNode *client_msg, pcmk__client_t *curr_client,
const char *proxy_session)
{
xmlNode *message_data = NULL;
const char *client_name = NULL;
const char *op = crm_element_value(client_msg, F_CRM_TASK);
const char *ref = crm_element_value(client_msg, XML_ATTR_REFERENCE);
const char *uuid = (curr_client? curr_client->id : proxy_session);
if (uuid == NULL) {
crm_warn("IPC message from client rejected: No client identifier "
CRM_XS " ref=%s", (ref? ref : "none"));
goto rejected;
}
if (!pcmk__str_eq(CRM_OP_HELLO, op, pcmk__str_casei)) {
// Only hello messages need to be authorized
return true;
}
message_data = get_message_xml(client_msg, F_CRM_DATA);
client_name = crm_element_value(message_data, "client_name");
if (pcmk__str_empty(client_name)) {
crm_warn("IPC hello from client rejected: No client name",
CRM_XS " ref=%s uuid=%s", (ref? ref : "none"), uuid);
goto rejected;
}
if (!authorize_version(message_data, "major_version", client_name, ref,
uuid)) {
goto rejected;
}
if (!authorize_version(message_data, "minor_version", client_name, ref,
uuid)) {
goto rejected;
}
crm_trace("Validated IPC hello from client %s", client_name);
if (curr_client) {
curr_client->userdata = strdup(client_name);
}
controld_trigger_fsa();
return false;
rejected:
if (curr_client) {
qb_ipcs_disconnect(curr_client->ipcs);
}
return false;
}
static enum crmd_fsa_input
handle_message(xmlNode *msg, enum crmd_fsa_cause cause)
{
const char *type = NULL;
CRM_CHECK(msg != NULL, return I_NULL);
type = crm_element_value(msg, F_CRM_MSG_TYPE);
if (pcmk__str_eq(type, XML_ATTR_REQUEST, pcmk__str_none)) {
return handle_request(msg, cause);
} else if (pcmk__str_eq(type, XML_ATTR_RESPONSE, pcmk__str_none)) {
handle_response(msg);
return I_NULL;
}
crm_err("Unknown message type: %s", type);
return I_NULL;
}
static enum crmd_fsa_input
handle_failcount_op(xmlNode * stored_msg)
{
const char *rsc = NULL;
const char *uname = NULL;
const char *op = NULL;
char *interval_spec = NULL;
guint interval_ms = 0;
gboolean is_remote_node = FALSE;
xmlNode *xml_op = get_message_xml(stored_msg, F_CRM_DATA);
if (xml_op) {
xmlNode *xml_rsc = first_named_child(xml_op, XML_CIB_TAG_RESOURCE);
xmlNode *xml_attrs = first_named_child(xml_op, XML_TAG_ATTRS);
if (xml_rsc) {
rsc = ID(xml_rsc);
}
if (xml_attrs) {
op = crm_element_value(xml_attrs,
CRM_META "_" XML_RSC_ATTR_CLEAR_OP);
crm_element_value_ms(xml_attrs,
CRM_META "_" XML_RSC_ATTR_CLEAR_INTERVAL,
&interval_ms);
}
}
uname = crm_element_value(xml_op, XML_LRM_ATTR_TARGET);
if ((rsc == NULL) || (uname == NULL)) {
crm_log_xml_warn(stored_msg, "invalid failcount op");
return I_NULL;
}
if (crm_element_value(xml_op, XML_LRM_ATTR_ROUTER_NODE)) {
is_remote_node = TRUE;
}
crm_debug("Clearing failures for %s-interval %s on %s "
"from attribute manager, CIB, and executor state",
pcmk__readable_interval(interval_ms), rsc, uname);
if (interval_ms) {
interval_spec = crm_strdup_printf("%ums", interval_ms);
}
update_attrd_clear_failures(uname, rsc, op, interval_spec, is_remote_node);
free(interval_spec);
controld_cib_delete_last_failure(rsc, uname, op, interval_ms);
lrm_clear_last_failure(rsc, uname, op, interval_ms);
return I_NULL;
}
static enum crmd_fsa_input
handle_lrm_delete(xmlNode *stored_msg)
{
const char *mode = NULL;
xmlNode *msg_data = get_message_xml(stored_msg, F_CRM_DATA);
CRM_CHECK(msg_data != NULL, return I_NULL);
/* CRM_OP_LRM_DELETE has two distinct modes. The default behavior is to
* relay the operation to the affected node, which will unregister the
* resource from the local executor, clear the resource's history from the
* CIB, and do some bookkeeping in the controller.
*
* However, if the affected node is offline, the client will specify
* mode="cib" which means the controller receiving the operation should
* clear the resource's history from the CIB and nothing else. This is used
* to clear shutdown locks.
*/
mode = crm_element_value(msg_data, PCMK__XA_MODE);
if ((mode == NULL) || strcmp(mode, XML_TAG_CIB)) {
// Relay to affected node
crm_xml_add(stored_msg, F_CRM_SYS_TO, CRM_SYSTEM_LRMD);
return I_ROUTER;
} else {
// Delete CIB history locally (compare with do_lrm_delete())
const char *from_sys = NULL;
const char *user_name = NULL;
const char *rsc_id = NULL;
const char *node = NULL;
xmlNode *rsc_xml = NULL;
int rc = pcmk_rc_ok;
rsc_xml = first_named_child(msg_data, XML_CIB_TAG_RESOURCE);
CRM_CHECK(rsc_xml != NULL, return I_NULL);
rsc_id = ID(rsc_xml);
from_sys = crm_element_value(stored_msg, F_CRM_SYS_FROM);
node = crm_element_value(msg_data, XML_LRM_ATTR_TARGET);
user_name = pcmk__update_acl_user(stored_msg, F_CRM_USER, NULL);
crm_debug("Handling " CRM_OP_LRM_DELETE " for %s on %s locally%s%s "
"(clearing CIB resource history only)", rsc_id, node,
(user_name? " for user " : ""), (user_name? user_name : ""));
rc = controld_delete_resource_history(rsc_id, node, user_name,
cib_dryrun|cib_sync_call);
if (rc == pcmk_rc_ok) {
rc = controld_delete_resource_history(rsc_id, node, user_name,
crmd_cib_smart_opt());
}
//Notify client and tengine.(Only notify tengine if mode = "cib" and CRM_OP_LRM_DELETE.)
if (from_sys) {
lrmd_event_data_t *op = NULL;
const char *from_host = crm_element_value(stored_msg,
F_CRM_HOST_FROM);
const char *transition;
if (strcmp(from_sys, CRM_SYSTEM_TENGINE)) {
transition = crm_element_value(msg_data,
XML_ATTR_TRANSITION_KEY);
} else {
transition = crm_element_value(stored_msg,
XML_ATTR_TRANSITION_KEY);
}
crm_info("Notifying %s on %s that %s was%s deleted",
from_sys, (from_host? from_host : "local node"), rsc_id,
((rc == pcmk_rc_ok)? "" : " not"));
op = lrmd_new_event(rsc_id, CRMD_ACTION_DELETE, 0);
op->type = lrmd_event_exec_complete;
op->user_data = strdup(transition? transition : FAKE_TE_ID);
op->params = pcmk__strkey_table(free, free);
g_hash_table_insert(op->params, strdup(XML_ATTR_CRM_VERSION),
strdup(CRM_FEATURE_SET));
controld_rc2event(op, rc);
controld_ack_event_directly(from_host, from_sys, NULL, op, rsc_id);
lrmd_free_event(op);
controld_trigger_delete_refresh(from_sys, rsc_id);
}
return I_NULL;
}
}
/*!
* \brief Handle a CRM_OP_REMOTE_STATE message by updating remote peer cache
*
* \param[in] msg Message XML
*
* \return Next FSA input
*/
static enum crmd_fsa_input
handle_remote_state(const xmlNode *msg)
{
const char *conn_host = NULL;
const char *remote_uname = ID(msg);
crm_node_t *remote_peer;
bool remote_is_up = false;
int rc = pcmk_rc_ok;
rc = pcmk__xe_get_bool_attr(msg, XML_NODE_IN_CLUSTER, &remote_is_up);
CRM_CHECK(remote_uname && rc == pcmk_rc_ok, return I_NULL);
remote_peer = crm_remote_peer_get(remote_uname);
CRM_CHECK(remote_peer, return I_NULL);
pcmk__update_peer_state(__func__, remote_peer,
remote_is_up ? CRM_NODE_MEMBER : CRM_NODE_LOST,
0);
conn_host = crm_element_value(msg, PCMK__XA_CONN_HOST);
if (conn_host) {
pcmk__str_update(&remote_peer->conn_host, conn_host);
} else if (remote_peer->conn_host) {
free(remote_peer->conn_host);
remote_peer->conn_host = NULL;
}
return I_NULL;
}
/*!
* \brief Handle a CRM_OP_PING message
*
* \param[in] msg Message XML
*
* \return Next FSA input
*/
static enum crmd_fsa_input
handle_ping(const xmlNode *msg)
{
const char *value = NULL;
xmlNode *ping = NULL;
xmlNode *reply = NULL;
// Build reply
ping = create_xml_node(NULL, XML_CRM_TAG_PING);
value = crm_element_value(msg, F_CRM_SYS_TO);
crm_xml_add(ping, XML_PING_ATTR_SYSFROM, value);
// Add controller state
value = fsa_state2string(controld_globals.fsa_state);
crm_xml_add(ping, XML_PING_ATTR_CRMDSTATE, value);
crm_notice("Current ping state: %s", value); // CTS needs this
// Add controller health
// @TODO maybe do some checks to determine meaningful status
crm_xml_add(ping, XML_PING_ATTR_STATUS, "ok");
// Send reply
reply = create_reply(msg, ping);
free_xml(ping);
if (reply != NULL) {
(void) relay_message(reply, TRUE);
free_xml(reply);
}
// Nothing further to do
return I_NULL;
}
/*!
* \brief Handle a PCMK__CONTROLD_CMD_NODES message
*
* \param[in] request Message XML
*
* \return Next FSA input
*/
static enum crmd_fsa_input
handle_node_list(const xmlNode *request)
{
GHashTableIter iter;
crm_node_t *node = NULL;
xmlNode *reply = NULL;
xmlNode *reply_data = NULL;
// Create message data for reply
reply_data = create_xml_node(NULL, XML_CIB_TAG_NODES);
g_hash_table_iter_init(&iter, crm_peer_cache);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *) & node)) {
xmlNode *xml = create_xml_node(reply_data, XML_CIB_TAG_NODE);
crm_xml_add_ll(xml, XML_ATTR_ID, (long long) node->id); // uint32_t
crm_xml_add(xml, XML_ATTR_UNAME, node->uname);
crm_xml_add(xml, XML_NODE_IN_CLUSTER, node->state);
}
// Create and send reply
reply = create_reply(request, reply_data);
free_xml(reply_data);
if (reply) {
(void) relay_message(reply, TRUE);
free_xml(reply);
}
// Nothing further to do
return I_NULL;
}
/*!
* \brief Handle a CRM_OP_NODE_INFO request
*
* \param[in] msg Message XML
*
* \return Next FSA input
*/
static enum crmd_fsa_input
handle_node_info_request(const xmlNode *msg)
{
const char *value = NULL;
crm_node_t *node = NULL;
int node_id = 0;
xmlNode *reply = NULL;
xmlNode *reply_data = NULL;
// Build reply
reply_data = create_xml_node(NULL, XML_CIB_TAG_NODE);
crm_xml_add(reply_data, XML_PING_ATTR_SYSFROM, CRM_SYSTEM_CRMD);
// Add whether current partition has quorum
pcmk__xe_set_bool_attr(reply_data, XML_ATTR_HAVE_QUORUM,
pcmk_is_set(controld_globals.flags,
controld_has_quorum));
// Check whether client requested node info by ID and/or name
crm_element_value_int(msg, XML_ATTR_ID, &node_id);
if (node_id < 0) {
node_id = 0;
}
value = crm_element_value(msg, XML_ATTR_UNAME);
// Default to local node if none given
if ((node_id == 0) && (value == NULL)) {
value = controld_globals.our_nodename;
}
node = pcmk__search_node_caches(node_id, value, CRM_GET_PEER_ANY);
if (node) {
crm_xml_add(reply_data, XML_ATTR_ID, node->uuid);
crm_xml_add(reply_data, XML_ATTR_UNAME, node->uname);
crm_xml_add(reply_data, XML_NODE_IS_PEER, node->state);
pcmk__xe_set_bool_attr(reply_data, XML_NODE_IS_REMOTE,
pcmk_is_set(node->flags, crm_remote_node));
}
// Send reply
reply = create_reply(msg, reply_data);
free_xml(reply_data);
if (reply != NULL) {
(void) relay_message(reply, TRUE);
free_xml(reply);
}
// Nothing further to do
return I_NULL;
}
static void
verify_feature_set(xmlNode *msg)
{
const char *dc_version = crm_element_value(msg, XML_ATTR_CRM_VERSION);
if (dc_version == NULL) {
/* All we really know is that the DC feature set is older than 3.1.0,
* but that's also all that really matters.
*/
dc_version = "3.0.14";
}
if (feature_set_compatible(dc_version, CRM_FEATURE_SET)) {
crm_trace("Local feature set (%s) is compatible with DC's (%s)",
CRM_FEATURE_SET, dc_version);
} else {
crm_err("Local feature set (%s) is incompatible with DC's (%s)",
CRM_FEATURE_SET, dc_version);
// Nothing is likely to improve without administrator involvement
controld_set_fsa_input_flags(R_STAYDOWN);
crmd_exit(CRM_EX_FATAL);
}
}
// DC gets own shutdown all-clear
static enum crmd_fsa_input
handle_shutdown_self_ack(xmlNode *stored_msg)
{
const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM);
if (pcmk_is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) {
// The expected case -- we initiated own shutdown sequence
crm_info("Shutting down controller");
return I_STOP;
}
if (pcmk__str_eq(host_from, controld_globals.dc_name, pcmk__str_casei)) {
// Must be logic error -- DC confirming its own unrequested shutdown
crm_err("Shutting down controller immediately due to "
"unexpected shutdown confirmation");
return I_TERMINATE;
}
if (controld_globals.fsa_state != S_STOPPING) {
// Shouldn't happen -- non-DC confirming unrequested shutdown
crm_err("Starting new DC election because %s is "
"confirming shutdown we did not request",
(host_from? host_from : "another node"));
return I_ELECTION;
}
// Shouldn't happen, but we are already stopping anyway
crm_debug("Ignoring unexpected shutdown confirmation from %s",
(host_from? host_from : "another node"));
return I_NULL;
}
// Non-DC gets shutdown all-clear from DC
static enum crmd_fsa_input
handle_shutdown_ack(xmlNode *stored_msg)
{
const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM);
if (host_from == NULL) {
crm_warn("Ignoring shutdown request without origin specified");
return I_NULL;
}
if (pcmk__str_eq(host_from, controld_globals.dc_name,
pcmk__str_null_matches|pcmk__str_casei)) {
if (pcmk_is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) {
crm_info("Shutting down controller after confirmation from %s",
host_from);
} else {
crm_err("Shutting down controller after unexpected "
"shutdown request from %s", host_from);
controld_set_fsa_input_flags(R_STAYDOWN);
}
return I_STOP;
}
crm_warn("Ignoring shutdown request from %s because DC is %s",
host_from, controld_globals.dc_name);
return I_NULL;
}
static enum crmd_fsa_input
handle_request(xmlNode *stored_msg, enum crmd_fsa_cause cause)
{
xmlNode *msg = NULL;
const char *op = crm_element_value(stored_msg, F_CRM_TASK);
/* Optimize this for the DC - it has the most to do */
if (op == NULL) {
crm_log_xml_warn(stored_msg, "[request without " F_CRM_TASK "]");
return I_NULL;
}
if (strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0) {
const char *from = crm_element_value(stored_msg, F_CRM_HOST_FROM);
- crm_node_t *node = pcmk__search_cluster_node_cache(0, from);
+ crm_node_t *node = pcmk__search_cluster_node_cache(0, from, NULL);
pcmk__update_peer_expected(__func__, node, CRMD_JOINSTATE_DOWN);
if(AM_I_DC == FALSE) {
return I_NULL; /* Done */
}
}
/*========== DC-Only Actions ==========*/
if (AM_I_DC) {
if (strcmp(op, CRM_OP_JOIN_ANNOUNCE) == 0) {
return I_NODE_JOIN;
} else if (strcmp(op, CRM_OP_JOIN_REQUEST) == 0) {
return I_JOIN_REQUEST;
} else if (strcmp(op, CRM_OP_JOIN_CONFIRM) == 0) {
return I_JOIN_RESULT;
} else if (strcmp(op, CRM_OP_SHUTDOWN) == 0) {
return handle_shutdown_self_ack(stored_msg);
} else if (strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0) {
// Another controller wants to shut down its node
return handle_shutdown_request(stored_msg);
}
}
/*========== common actions ==========*/
if (strcmp(op, CRM_OP_NOVOTE) == 0) {
ha_msg_input_t fsa_input;
fsa_input.msg = stored_msg;
register_fsa_input_adv(C_HA_MESSAGE, I_NULL, &fsa_input,
A_ELECTION_COUNT | A_ELECTION_CHECK, FALSE,
__func__);
} else if (strcmp(op, CRM_OP_REMOTE_STATE) == 0) {
/* a remote connection host is letting us know the node state */
return handle_remote_state(stored_msg);
} else if (strcmp(op, CRM_OP_THROTTLE) == 0) {
throttle_update(stored_msg);
if (AM_I_DC && (controld_globals.transition_graph != NULL)
&& !controld_globals.transition_graph->complete) {
crm_debug("The throttle changed. Trigger a graph.");
trigger_graph();
}
return I_NULL;
} else if (strcmp(op, CRM_OP_CLEAR_FAILCOUNT) == 0) {
return handle_failcount_op(stored_msg);
} else if (strcmp(op, CRM_OP_VOTE) == 0) {
/* count the vote and decide what to do after that */
ha_msg_input_t fsa_input;
fsa_input.msg = stored_msg;
register_fsa_input_adv(C_HA_MESSAGE, I_NULL, &fsa_input,
A_ELECTION_COUNT | A_ELECTION_CHECK, FALSE,
__func__);
/* Sometimes we _must_ go into S_ELECTION */
if (controld_globals.fsa_state == S_HALT) {
crm_debug("Forcing an election from S_HALT");
return I_ELECTION;
#if 0
} else if (AM_I_DC) {
/* This is the old way of doing things but what is gained? */
return I_ELECTION;
#endif
}
} else if (strcmp(op, CRM_OP_JOIN_OFFER) == 0) {
verify_feature_set(stored_msg);
crm_debug("Raising I_JOIN_OFFER: join-%s", crm_element_value(stored_msg, F_CRM_JOIN_ID));
return I_JOIN_OFFER;
} else if (strcmp(op, CRM_OP_JOIN_ACKNAK) == 0) {
crm_debug("Raising I_JOIN_RESULT: join-%s", crm_element_value(stored_msg, F_CRM_JOIN_ID));
return I_JOIN_RESULT;
} else if (strcmp(op, CRM_OP_LRM_DELETE) == 0) {
return handle_lrm_delete(stored_msg);
} else if ((strcmp(op, CRM_OP_LRM_FAIL) == 0)
|| (strcmp(op, CRM_OP_LRM_REFRESH) == 0) // @COMPAT
|| (strcmp(op, CRM_OP_REPROBE) == 0)) {
crm_xml_add(stored_msg, F_CRM_SYS_TO, CRM_SYSTEM_LRMD);
return I_ROUTER;
} else if (strcmp(op, CRM_OP_NOOP) == 0) {
return I_NULL;
} else if (strcmp(op, CRM_OP_LOCAL_SHUTDOWN) == 0) {
crm_shutdown(SIGTERM);
/*return I_SHUTDOWN; */
return I_NULL;
} else if (strcmp(op, CRM_OP_PING) == 0) {
return handle_ping(stored_msg);
} else if (strcmp(op, CRM_OP_NODE_INFO) == 0) {
return handle_node_info_request(stored_msg);
} else if (strcmp(op, CRM_OP_RM_NODE_CACHE) == 0) {
int id = 0;
const char *name = NULL;
crm_element_value_int(stored_msg, XML_ATTR_ID, &id);
name = crm_element_value(stored_msg, XML_ATTR_UNAME);
if(cause == C_IPC_MESSAGE) {
msg = create_request(CRM_OP_RM_NODE_CACHE, NULL, NULL, CRM_SYSTEM_CRMD, CRM_SYSTEM_CRMD, NULL);
if (send_cluster_message(NULL, crm_msg_crmd, msg, TRUE) == FALSE) {
crm_err("Could not instruct peers to remove references to node %s/%u", name, id);
} else {
crm_notice("Instructing peers to remove references to node %s/%u", name, id);
}
free_xml(msg);
} else {
reap_crm_member(id, name);
/* If we're forgetting this node, also forget any failures to fence
* it, so we don't carry that over to any node added later with the
* same name.
*/
st_fail_count_reset(name);
}
} else if (strcmp(op, CRM_OP_MAINTENANCE_NODES) == 0) {
xmlNode *xml = get_message_xml(stored_msg, F_CRM_DATA);
remote_ra_process_maintenance_nodes(xml);
} else if (strcmp(op, PCMK__CONTROLD_CMD_NODES) == 0) {
return handle_node_list(stored_msg);
/*========== (NOT_DC)-Only Actions ==========*/
} else if (!AM_I_DC) {
if (strcmp(op, CRM_OP_SHUTDOWN) == 0) {
return handle_shutdown_ack(stored_msg);
}
} else {
crm_err("Unexpected request (%s) sent to %s", op, AM_I_DC ? "the DC" : "non-DC node");
crm_log_xml_err(stored_msg, "Unexpected");
}
return I_NULL;
}
static void
handle_response(xmlNode *stored_msg)
{
const char *op = crm_element_value(stored_msg, F_CRM_TASK);
if (op == NULL) {
crm_log_xml_err(stored_msg, "Bad message");
} else if (AM_I_DC && strcmp(op, CRM_OP_PECALC) == 0) {
// Check whether scheduler answer been superseded by subsequent request
const char *msg_ref = crm_element_value(stored_msg, XML_ATTR_REFERENCE);
if (msg_ref == NULL) {
crm_err("%s - Ignoring calculation with no reference", op);
} else if (pcmk__str_eq(msg_ref, controld_globals.fsa_pe_ref,
pcmk__str_none)) {
ha_msg_input_t fsa_input;
controld_stop_sched_timer();
fsa_input.msg = stored_msg;
register_fsa_input_later(C_IPC_MESSAGE, I_PE_SUCCESS, &fsa_input);
} else {
crm_info("%s calculation %s is obsolete", op, msg_ref);
}
} else if (strcmp(op, CRM_OP_VOTE) == 0
|| strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0 || strcmp(op, CRM_OP_SHUTDOWN) == 0) {
} else {
const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM);
crm_err("Unexpected response (op=%s, src=%s) sent to the %s",
op, host_from, AM_I_DC ? "DC" : "controller");
}
}
static enum crmd_fsa_input
handle_shutdown_request(xmlNode * stored_msg)
{
/* handle here to avoid potential version issues
* where the shutdown message/procedure may have
* been changed in later versions.
*
* This way the DC is always in control of the shutdown
*/
char *now_s = NULL;
const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM);
if (host_from == NULL) {
/* we're shutting down and the DC */
host_from = controld_globals.our_nodename;
}
crm_info("Creating shutdown request for %s (state=%s)", host_from,
fsa_state2string(controld_globals.fsa_state));
crm_log_xml_trace(stored_msg, "message");
now_s = pcmk__ttoa(time(NULL));
update_attrd(host_from, XML_CIB_ATTR_SHUTDOWN, now_s, NULL, FALSE);
free(now_s);
/* will be picked up by the TE as long as its running */
return I_NULL;
}
static void
send_msg_via_ipc(xmlNode * msg, const char *sys)
{
pcmk__client_t *client_channel = NULL;
CRM_CHECK(sys != NULL, return);
client_channel = pcmk__find_client_by_id(sys);
if (crm_element_value(msg, F_CRM_HOST_FROM) == NULL) {
crm_xml_add(msg, F_CRM_HOST_FROM, controld_globals.our_nodename);
}
if (client_channel != NULL) {
/* Transient clients such as crmadmin */
pcmk__ipc_send_xml(client_channel, 0, msg, crm_ipc_server_event);
} else if (pcmk__str_eq(sys, CRM_SYSTEM_TENGINE, pcmk__str_none)) {
xmlNode *data = get_message_xml(msg, F_CRM_DATA);
process_te_message(msg, data);
} else if (pcmk__str_eq(sys, CRM_SYSTEM_LRMD, pcmk__str_none)) {
fsa_data_t fsa_data;
ha_msg_input_t fsa_input;
fsa_input.msg = msg;
fsa_input.xml = get_message_xml(msg, F_CRM_DATA);
fsa_data.id = 0;
fsa_data.actions = 0;
fsa_data.data = &fsa_input;
fsa_data.fsa_input = I_MESSAGE;
fsa_data.fsa_cause = C_IPC_MESSAGE;
fsa_data.origin = __func__;
fsa_data.data_type = fsa_dt_ha_msg;
do_lrm_invoke(A_LRM_INVOKE, C_IPC_MESSAGE, controld_globals.fsa_state,
I_MESSAGE, &fsa_data);
} else if (crmd_is_proxy_session(sys)) {
crmd_proxy_send(sys, msg);
} else {
crm_info("Received invalid request: unknown subsystem '%s'", sys);
}
}
void
delete_ha_msg_input(ha_msg_input_t * orig)
{
if (orig == NULL) {
return;
}
free_xml(orig->msg);
free(orig);
}
/*!
* \internal
* \brief Notify the cluster of a remote node state change
*
* \param[in] node_name Node's name
* \param[in] node_up true if node is up, false if down
*/
void
broadcast_remote_state_message(const char *node_name, bool node_up)
{
xmlNode *msg = create_request(CRM_OP_REMOTE_STATE, NULL, NULL,
CRM_SYSTEM_CRMD, CRM_SYSTEM_CRMD, NULL);
crm_info("Notifying cluster of Pacemaker Remote node %s %s",
node_name, node_up? "coming up" : "going down");
crm_xml_add(msg, XML_ATTR_ID, node_name);
pcmk__xe_set_bool_attr(msg, XML_NODE_IN_CLUSTER, node_up);
if (node_up) {
crm_xml_add(msg, PCMK__XA_CONN_HOST, controld_globals.our_nodename);
}
send_cluster_message(NULL, crm_msg_crmd, msg, TRUE);
free_xml(msg);
}
diff --git a/include/crm/cluster/internal.h b/include/crm/cluster/internal.h
index 9bc57c617c..da0cc9da4a 100644
--- a/include/crm/cluster/internal.h
+++ b/include/crm/cluster/internal.h
@@ -1,133 +1,134 @@
/*
* Copyright 2004-2021 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#ifndef CRM_CLUSTER_INTERNAL__H
# define CRM_CLUSTER_INTERNAL__H
# include <stdint.h> // uint32_t, uint64_t
# include <crm/cluster.h>
/* *INDENT-OFF* */
enum crm_proc_flag {
crm_proc_none = 0x00000001,
// Cluster layers
crm_proc_cpg = 0x04000000,
// Daemons
crm_proc_execd = 0x00000010,
crm_proc_based = 0x00000100,
crm_proc_controld = 0x00000200,
crm_proc_attrd = 0x00001000,
crm_proc_schedulerd = 0x00010000,
crm_proc_fenced = 0x00100000,
};
/* *INDENT-ON* */
/*!
* \internal
* \brief Return the process bit corresponding to the current cluster stack
*
* \return Process flag if detectable, otherwise 0
*/
static inline uint32_t
crm_get_cluster_proc(void)
{
switch (get_cluster_type()) {
case pcmk_cluster_corosync:
return crm_proc_cpg;
default:
break;
}
return crm_proc_none;
}
/*!
* \internal
* \brief Get log-friendly string description of a Corosync return code
*
* \param[in] error Corosync return code
*
* \return Log-friendly string description corresponding to \p error
*/
static inline const char *
pcmk__cs_err_str(int error)
{
# if SUPPORT_COROSYNC
switch (error) {
case CS_OK: return "OK";
case CS_ERR_LIBRARY: return "Library error";
case CS_ERR_VERSION: return "Version error";
case CS_ERR_INIT: return "Initialization error";
case CS_ERR_TIMEOUT: return "Timeout";
case CS_ERR_TRY_AGAIN: return "Try again";
case CS_ERR_INVALID_PARAM: return "Invalid parameter";
case CS_ERR_NO_MEMORY: return "No memory";
case CS_ERR_BAD_HANDLE: return "Bad handle";
case CS_ERR_BUSY: return "Busy";
case CS_ERR_ACCESS: return "Access error";
case CS_ERR_NOT_EXIST: return "Doesn't exist";
case CS_ERR_NAME_TOO_LONG: return "Name too long";
case CS_ERR_EXIST: return "Exists";
case CS_ERR_NO_SPACE: return "No space";
case CS_ERR_INTERRUPT: return "Interrupt";
case CS_ERR_NAME_NOT_FOUND: return "Name not found";
case CS_ERR_NO_RESOURCES: return "No resources";
case CS_ERR_NOT_SUPPORTED: return "Not supported";
case CS_ERR_BAD_OPERATION: return "Bad operation";
case CS_ERR_FAILED_OPERATION: return "Failed operation";
case CS_ERR_MESSAGE_ERROR: return "Message error";
case CS_ERR_QUEUE_FULL: return "Queue full";
case CS_ERR_QUEUE_NOT_AVAILABLE: return "Queue not available";
case CS_ERR_BAD_FLAGS: return "Bad flags";
case CS_ERR_TOO_BIG: return "Too big";
case CS_ERR_NO_SECTIONS: return "No sections";
}
# endif
return "Corosync error";
}
# if SUPPORT_COROSYNC
#if 0
/* This is the new way to do it, but we still support all Corosync 2 versions,
* and this isn't always available. A better alternative here would be to check
* for support in the configure script and enable this conditionally.
*/
#define pcmk__init_cmap(handle) cmap_initialize_map((handle), CMAP_MAP_ICMAP)
#else
#define pcmk__init_cmap(handle) cmap_initialize(handle)
#endif
char *pcmk__corosync_cluster_name(void);
bool pcmk__corosync_add_nodes(xmlNode *xml_parent);
# endif
crm_node_t *crm_update_peer_proc(const char *source, crm_node_t * peer,
uint32_t flag, const char *status);
crm_node_t *pcmk__update_peer_state(const char *source, crm_node_t *node,
const char *state, uint64_t membership);
void pcmk__update_peer_expected(const char *source, crm_node_t *node,
const char *expected);
void pcmk__reap_unseen_nodes(uint64_t ring_id);
void pcmk__corosync_quorum_connect(gboolean (*dispatch)(unsigned long long,
gboolean),
void (*destroy) (gpointer));
crm_node_t *pcmk__search_node_caches(unsigned int id, const char *uname,
uint32_t flags);
-crm_node_t *pcmk__search_cluster_node_cache(unsigned int id, const char *uname);
+crm_node_t *pcmk__search_cluster_node_cache(unsigned int id, const char *uname,
+ const char *uuid);
void pcmk__refresh_node_caches_from_cib(xmlNode *cib);
crm_node_t *pcmk__search_known_node_cache(unsigned int id, const char *uname,
uint32_t flags);
#endif
diff --git a/lib/cluster/cluster.c b/lib/cluster/cluster.c
index 011e0532d3..fce7591eb8 100644
--- a/lib/cluster/cluster.c
+++ b/lib/cluster/cluster.c
@@ -1,405 +1,405 @@
/*
* Copyright 2004-2022 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <dlfcn.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <sys/param.h>
#include <sys/types.h>
#include <crm/crm.h>
#include <crm/msg_xml.h>
#include <crm/common/ipc.h>
#include <crm/cluster/internal.h>
#include "crmcluster_private.h"
CRM_TRACE_INIT_DATA(cluster);
/*!
* \brief Get (and set if needed) a node's UUID
*
* \param[in,out] peer Node to check
*
* \return Node UUID of \p peer, or NULL if unknown
*/
const char *
crm_peer_uuid(crm_node_t *peer)
{
char *uuid = NULL;
// Check simple cases first, to avoid any calls that might block
if (peer == NULL) {
return NULL;
}
if (peer->uuid != NULL) {
return peer->uuid;
}
switch (get_cluster_type()) {
case pcmk_cluster_corosync:
#if SUPPORT_COROSYNC
uuid = pcmk__corosync_uuid(peer);
#endif
break;
case pcmk_cluster_unknown:
case pcmk_cluster_invalid:
crm_err("Unsupported cluster type");
break;
}
peer->uuid = uuid;
return peer->uuid;
}
/*!
* \brief Connect to the cluster layer
*
* \param[in,out] Initialized cluster object to connect
*
* \return TRUE on success, otherwise FALSE
*/
gboolean
crm_cluster_connect(crm_cluster_t *cluster)
{
enum cluster_type_e type = get_cluster_type();
crm_notice("Connecting to %s cluster infrastructure",
name_for_cluster_type(type));
switch (type) {
case pcmk_cluster_corosync:
#if SUPPORT_COROSYNC
crm_peer_init();
return pcmk__corosync_connect(cluster);
#else
break;
#endif // SUPPORT_COROSYNC
default:
break;
}
return FALSE;
}
/*!
* \brief Disconnect from the cluster layer
*
* \param[in,out] cluster Cluster object to disconnect
*/
void
crm_cluster_disconnect(crm_cluster_t *cluster)
{
enum cluster_type_e type = get_cluster_type();
crm_info("Disconnecting from %s cluster infrastructure",
name_for_cluster_type(type));
switch (type) {
case pcmk_cluster_corosync:
#if SUPPORT_COROSYNC
crm_peer_destroy();
pcmk__corosync_disconnect(cluster);
#endif // SUPPORT_COROSYNC
break;
default:
break;
}
}
/*!
* \brief Allocate a new \p crm_cluster_t object
*
* \return A newly allocated \p crm_cluster_t object (guaranteed not \p NULL)
* \note The caller is responsible for freeing the return value using
* \p pcmk_cluster_free().
*/
crm_cluster_t *
pcmk_cluster_new(void)
{
crm_cluster_t *cluster = calloc(1, sizeof(crm_cluster_t));
CRM_ASSERT(cluster != NULL);
return cluster;
}
/*!
* \brief Free a \p crm_cluster_t object and its dynamically allocated members
*
* \param[in,out] cluster Cluster object to free
*/
void
pcmk_cluster_free(crm_cluster_t *cluster)
{
if (cluster == NULL) {
return;
}
free(cluster->uuid);
free(cluster->uname);
free(cluster);
}
/*!
* \brief Send an XML message via the cluster messaging layer
*
* \param[in] node Cluster node to send message to
* \param[in] service Message type to use in message host info
* \param[in] data XML message to send
* \param[in] ordered Ignored for currently supported messaging layers
*
* \return TRUE on success, otherwise FALSE
*/
gboolean
send_cluster_message(const crm_node_t *node, enum crm_ais_msg_types service,
xmlNode *data, gboolean ordered)
{
switch (get_cluster_type()) {
case pcmk_cluster_corosync:
#if SUPPORT_COROSYNC
return pcmk__cpg_send_xml(data, node, service);
#endif
break;
default:
break;
}
return FALSE;
}
/*!
* \brief Get the local node's name
*
* \return Local node's name
* \note This will fatally exit if local node name cannot be known.
*/
const char *
get_local_node_name(void)
{
static char *name = NULL;
if (name == NULL) {
name = get_node_name(0);
}
return name;
}
/*!
* \brief Get the node name corresponding to a cluster node ID
*
* \param[in] nodeid Node ID to check (or 0 for local node)
*
* \return Node name corresponding to \p nodeid
* \note This will fatally exit if \p nodeid is 0 and local node name cannot be
* known.
*/
char *
get_node_name(uint32_t nodeid)
{
char *name = NULL;
enum cluster_type_e stack = get_cluster_type();
switch (stack) {
case pcmk_cluster_corosync:
#if SUPPORT_COROSYNC
name = pcmk__corosync_name(0, nodeid);
break;
#endif // SUPPORT_COROSYNC
default:
crm_err("Unknown cluster type: %s (%d)", name_for_cluster_type(stack), stack);
}
if ((name == NULL) && (nodeid == 0)) {
name = pcmk_hostname();
if (name == NULL) {
// @TODO Maybe let the caller decide what to do
crm_err("Could not obtain the local %s node name",
name_for_cluster_type(stack));
crm_exit(CRM_EX_FATAL);
}
crm_notice("Defaulting to uname -n for the local %s node name",
name_for_cluster_type(stack));
}
if (name == NULL) {
crm_notice("Could not obtain a node name for %s node with id %u",
name_for_cluster_type(stack), nodeid);
}
return name;
}
/*!
* \brief Get the node name corresponding to a node UUID
*
* \param[in] uuid UUID of desired node
*
* \return name of desired node
*
* \note This relies on the remote peer cache being populated with all
* remote nodes in the cluster, so callers should maintain that cache.
*/
const char *
crm_peer_uname(const char *uuid)
{
GHashTableIter iter;
crm_node_t *node = NULL;
CRM_CHECK(uuid != NULL, return NULL);
/* remote nodes have the same uname and uuid */
if (g_hash_table_lookup(crm_remote_peer_cache, uuid)) {
return uuid;
}
/* avoid blocking calls where possible */
g_hash_table_iter_init(&iter, crm_peer_cache);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
if (pcmk__str_eq(node->uuid, uuid, pcmk__str_casei)) {
if (node->uname != NULL) {
return node->uname;
}
break;
}
}
node = NULL;
if (is_corosync_cluster()) {
long long id;
if ((pcmk__scan_ll(uuid, &id, 0LL) != pcmk_rc_ok)
|| (id < 1LL) || (id > UINT32_MAX)) {
crm_err("Invalid Corosync node ID '%s'", uuid);
return NULL;
}
- node = pcmk__search_cluster_node_cache((uint32_t) id, NULL);
+ node = pcmk__search_cluster_node_cache((uint32_t) id, NULL, NULL);
if (node != NULL) {
crm_info("Setting uuid for node %s[%u] to %s",
node->uname, node->id, uuid);
node->uuid = strdup(uuid);
return node->uname;
}
return NULL;
}
return NULL;
}
/*!
* \brief Add a node's UUID as an XML attribute
*
* \param[in,out] xml XML element to add UUID to
* \param[in] attr XML attribute name to set
* \param[in,out] node Node whose UUID should be used as attribute value
*/
void
set_uuid(xmlNode *xml, const char *attr, crm_node_t *node)
{
crm_xml_add(xml, attr, crm_peer_uuid(node));
}
/*!
* \brief Get a log-friendly string equivalent of a cluster type
*
* \param[in] type Cluster type
*
* \return Log-friendly string corresponding to \p type
*/
const char *
name_for_cluster_type(enum cluster_type_e type)
{
switch (type) {
case pcmk_cluster_corosync:
return "corosync";
case pcmk_cluster_unknown:
return "unknown";
case pcmk_cluster_invalid:
return "invalid";
}
crm_err("Invalid cluster type: %d", type);
return "invalid";
}
/*!
* \brief Get (and validate) the local cluster type
*
* \return Local cluster type
* \note This will fatally exit if the local cluster type is invalid.
*/
enum cluster_type_e
get_cluster_type(void)
{
bool detected = false;
const char *cluster = NULL;
static enum cluster_type_e cluster_type = pcmk_cluster_unknown;
/* Return the previous calculation, if any */
if (cluster_type != pcmk_cluster_unknown) {
return cluster_type;
}
cluster = pcmk__env_option(PCMK__ENV_CLUSTER_TYPE);
#if SUPPORT_COROSYNC
/* If nothing is defined in the environment, try corosync (if supported) */
if (cluster == NULL) {
crm_debug("Testing with Corosync");
cluster_type = pcmk__corosync_detect();
if (cluster_type != pcmk_cluster_unknown) {
detected = true;
goto done;
}
}
#endif
/* Something was defined in the environment, test it against what we support */
crm_info("Verifying cluster type: '%s'",
((cluster == NULL)? "-unspecified-" : cluster));
if (cluster == NULL) {
#if SUPPORT_COROSYNC
} else if (pcmk__str_eq(cluster, "corosync", pcmk__str_casei)) {
cluster_type = pcmk_cluster_corosync;
#endif
} else {
cluster_type = pcmk_cluster_invalid;
goto done; /* Keep the compiler happy when no stacks are supported */
}
done:
if (cluster_type == pcmk_cluster_unknown) {
crm_notice("Could not determine the current cluster type");
} else if (cluster_type == pcmk_cluster_invalid) {
crm_notice("This installation does not support the '%s' cluster infrastructure: terminating.",
cluster);
crm_exit(CRM_EX_FATAL);
} else {
crm_info("%s an active '%s' cluster",
(detected? "Detected" : "Assuming"),
name_for_cluster_type(cluster_type));
}
return cluster_type;
}
/*!
* \brief Check whether the local cluster is a Corosync cluster
*
* \return TRUE if the local cluster is a Corosync cluster, otherwise FALSE
*/
gboolean
is_corosync_cluster(void)
{
return get_cluster_type() == pcmk_cluster_corosync;
}
diff --git a/lib/cluster/cpg.c b/lib/cluster/cpg.c
index 2af4a5059d..d79ea47cbb 100644
--- a/lib/cluster/cpg.c
+++ b/lib/cluster/cpg.c
@@ -1,1092 +1,1092 @@
/*
* Copyright 2004-2022 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <bzlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <crm/common/ipc.h>
#include <crm/cluster/internal.h>
#include <crm/common/mainloop.h>
#include <sys/utsname.h>
#include <qb/qbipc_common.h>
#include <qb/qbipcc.h>
#include <qb/qbutil.h>
#include <corosync/corodefs.h>
#include <corosync/corotypes.h>
#include <corosync/hdb.h>
#include <corosync/cpg.h>
#include <crm/msg_xml.h>
#include <crm/common/ipc_internal.h> /* PCMK__SPECIAL_PID* */
#include "crmcluster_private.h"
/* @TODO Once we can update the public API to require crm_cluster_t* in more
* functions, we can ditch this in favor of cluster->cpg_handle.
*/
static cpg_handle_t pcmk_cpg_handle = 0;
// @TODO These could be moved to crm_cluster_t* at that time as well
static bool cpg_evicted = false;
static GList *cs_message_queue = NULL;
static int cs_message_timer = 0;
struct pcmk__cpg_host_s {
uint32_t id;
uint32_t pid;
gboolean local;
enum crm_ais_msg_types type;
uint32_t size;
char uname[MAX_NAME];
} __attribute__ ((packed));
typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
struct pcmk__cpg_msg_s {
struct qb_ipc_response_header header __attribute__ ((aligned(8)));
uint32_t id;
gboolean is_compressed;
pcmk__cpg_host_t host;
pcmk__cpg_host_t sender;
uint32_t size;
uint32_t compressed_size;
/* 584 bytes */
char data[0];
} __attribute__ ((packed));
typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
static void crm_cs_flush(gpointer data);
#define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
#define cs_repeat(rc, counter, max, code) do { \
rc = code; \
if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
counter++; \
crm_debug("Retrying operation after %ds", counter); \
sleep(counter); \
} else { \
break; \
} \
} while (counter < max)
/*!
* \brief Disconnect from Corosync CPG
*
* \param[in,out] cluster Cluster to disconnect
*/
void
cluster_disconnect_cpg(crm_cluster_t *cluster)
{
pcmk_cpg_handle = 0;
if (cluster->cpg_handle) {
crm_trace("Disconnecting CPG");
cpg_leave(cluster->cpg_handle, &cluster->group);
cpg_finalize(cluster->cpg_handle);
cluster->cpg_handle = 0;
} else {
crm_info("No CPG connection");
}
}
/*!
* \brief Get the local Corosync node ID (via CPG)
*
* \param[in] handle CPG connection to use (or 0 to use new connection)
*
* \return Corosync ID of local node (or 0 if not known)
*/
uint32_t
get_local_nodeid(cpg_handle_t handle)
{
cs_error_t rc = CS_OK;
int retries = 0;
static uint32_t local_nodeid = 0;
cpg_handle_t local_handle = handle;
cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
int fd = -1;
uid_t found_uid = 0;
gid_t found_gid = 0;
pid_t found_pid = 0;
int rv;
if(local_nodeid != 0) {
return local_nodeid;
}
if(handle == 0) {
crm_trace("Creating connection");
cs_repeat(rc, retries, 5, cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
if (rc != CS_OK) {
crm_err("Could not connect to the CPG API: %s (%d)",
cs_strerror(rc), rc);
return 0;
}
rc = cpg_fd_get(local_handle, &fd);
if (rc != CS_OK) {
crm_err("Could not obtain the CPG API connection: %s (%d)",
cs_strerror(rc), rc);
goto bail;
}
/* CPG provider run as root (in given user namespace, anyway)? */
if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
&found_uid, &found_gid))) {
crm_err("CPG provider is not authentic:"
" process %lld (uid: %lld, gid: %lld)",
(long long) PCMK__SPECIAL_PID_AS_0(found_pid),
(long long) found_uid, (long long) found_gid);
goto bail;
} else if (rv < 0) {
crm_err("Could not verify authenticity of CPG provider: %s (%d)",
strerror(-rv), -rv);
goto bail;
}
}
if (rc == CS_OK) {
retries = 0;
crm_trace("Performing lookup");
cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
}
if (rc != CS_OK) {
crm_err("Could not get local node id from the CPG API: %s (%d)",
pcmk__cs_err_str(rc), rc);
}
bail:
if(handle == 0) {
crm_trace("Closing connection");
cpg_finalize(local_handle);
}
crm_debug("Local nodeid is %u", local_nodeid);
return local_nodeid;
}
/*!
* \internal
* \brief Callback function for Corosync message queue timer
*
* \param[in] data CPG handle
*
* \return FALSE (to indicate to glib that timer should not be removed)
*/
static gboolean
crm_cs_flush_cb(gpointer data)
{
cs_message_timer = 0;
crm_cs_flush(data);
return FALSE;
}
// Send no more than this many CPG messages in one flush
#define CS_SEND_MAX 200
/*!
* \internal
* \brief Send messages in Corosync CPG message queue
*
* \param[in] data CPG handle
*/
static void
crm_cs_flush(gpointer data)
{
unsigned int sent = 0;
guint queue_len = 0;
cs_error_t rc = 0;
cpg_handle_t *handle = (cpg_handle_t *) data;
if (*handle == 0) {
crm_trace("Connection is dead");
return;
}
queue_len = g_list_length(cs_message_queue);
if (((queue_len % 1000) == 0) && (queue_len > 1)) {
crm_err("CPG queue has grown to %d", queue_len);
} else if (queue_len == CS_SEND_MAX) {
crm_warn("CPG queue has grown to %d", queue_len);
}
if (cs_message_timer != 0) {
/* There is already a timer, wait until it goes off */
crm_trace("Timer active %d", cs_message_timer);
return;
}
while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
struct iovec *iov = cs_message_queue->data;
rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
if (rc != CS_OK) {
break;
}
sent++;
crm_trace("CPG message sent, size=%llu",
(unsigned long long) iov->iov_len);
cs_message_queue = g_list_remove(cs_message_queue, iov);
free(iov->iov_base);
free(iov);
}
queue_len -= sent;
do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
"Sent %u CPG message%s (%d still queued): %s (rc=%d)",
sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
(int) rc);
if (cs_message_queue) {
uint32_t delay_ms = 100;
if (rc != CS_OK) {
/* Proportionally more if sending failed but cap at 1s */
delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
}
cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
}
}
/*!
* \internal
* \brief Dispatch function for CPG handle
*
* \param[in,out] user_data Cluster object
*
* \return 0 on success, -1 on error (per mainloop_io_t interface)
*/
static int
pcmk_cpg_dispatch(gpointer user_data)
{
cs_error_t rc = CS_OK;
crm_cluster_t *cluster = (crm_cluster_t *) user_data;
rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
if (rc != CS_OK) {
crm_err("Connection to the CPG API failed: %s (%d)",
pcmk__cs_err_str(rc), rc);
cpg_finalize(cluster->cpg_handle);
cluster->cpg_handle = 0;
return -1;
} else if (cpg_evicted) {
crm_err("Evicted from CPG membership");
return -1;
}
return 0;
}
static inline const char *
ais_dest(const pcmk__cpg_host_t *host)
{
if (host->local) {
return "local";
} else if (host->size > 0) {
return host->uname;
} else {
return "<all>";
}
}
static inline const char *
msg_type2text(enum crm_ais_msg_types type)
{
const char *text = "unknown";
switch (type) {
case crm_msg_none:
text = "unknown";
break;
case crm_msg_ais:
text = "ais";
break;
case crm_msg_cib:
text = "cib";
break;
case crm_msg_crmd:
text = "crmd";
break;
case crm_msg_pe:
text = "pengine";
break;
case crm_msg_te:
text = "tengine";
break;
case crm_msg_lrmd:
text = "lrmd";
break;
case crm_msg_attrd:
text = "attrd";
break;
case crm_msg_stonithd:
text = "stonithd";
break;
case crm_msg_stonith_ng:
text = "stonith-ng";
break;
}
return text;
}
/*!
* \internal
* \brief Check whether a Corosync CPG message is valid
*
* \param[in] msg Corosync CPG message to check
*
* \return true if \p msg is valid, otherwise false
*/
static bool
check_message_sanity(const pcmk__cpg_msg_t *msg)
{
int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
if (payload_size < 1) {
crm_err("%sCPG message %d from %s invalid: "
"Claimed size of %d bytes is too small "
CRM_XS " from %s[%u] to %s@%s",
(msg->is_compressed? "Compressed " : ""),
msg->id, ais_dest(&(msg->sender)),
(int) msg->header.size,
msg_type2text(msg->sender.type), msg->sender.pid,
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return false;
}
if (msg->header.error != CS_OK) {
crm_err("%sCPG message %d from %s invalid: "
"Sender indicated error %d "
CRM_XS " from %s[%u] to %s@%s",
(msg->is_compressed? "Compressed " : ""),
msg->id, ais_dest(&(msg->sender)),
msg->header.error,
msg_type2text(msg->sender.type), msg->sender.pid,
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return false;
}
if (msg_data_len(msg) != payload_size) {
crm_err("%sCPG message %d from %s invalid: "
"Total size %d inconsistent with payload size %d "
CRM_XS " from %s[%u] to %s@%s",
(msg->is_compressed? "Compressed " : ""),
msg->id, ais_dest(&(msg->sender)),
(int) msg->header.size, (int) msg_data_len(msg),
msg_type2text(msg->sender.type), msg->sender.pid,
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return false;
}
if (!msg->is_compressed &&
/* msg->size != (strlen(msg->data) + 1) would be a stronger check,
* but checking the last byte or two should be quick
*/
(((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
|| (msg->data[msg->size - 1] != '\0'))) {
crm_err("CPG message %d from %s invalid: "
"Payload does not end at byte %llu "
CRM_XS " from %s[%u] to %s@%s",
msg->id, ais_dest(&(msg->sender)),
(unsigned long long) msg->size,
msg_type2text(msg->sender.type), msg->sender.pid,
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return false;
}
crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
(int) msg->header.size, (msg->is_compressed? "compressed " : ""),
msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
ais_dest(&(msg->sender)),
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return true;
}
/*!
* \brief Extract text data from a Corosync CPG message
*
* \param[in] handle CPG connection (to get local node ID if not known)
* \param[in] nodeid Corosync ID of node that sent message
* \param[in] pid Process ID of message sender (for logging only)
* \param[in,out] content CPG message
* \param[out] kind If not NULL, will be set to CPG header ID
* (which should be an enum crm_ais_msg_class value,
* currently always crm_class_cluster)
* \param[out] from If not NULL, will be set to sender uname
* (valid for the lifetime of \p content)
*
* \return Newly allocated string with message data
* \note It is the caller's responsibility to free the return value with free().
*/
char *
pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
uint32_t *kind, const char **from)
{
char *data = NULL;
pcmk__cpg_msg_t *msg = (pcmk__cpg_msg_t *) content;
if(handle) {
// Do filtering and field massaging
uint32_t local_nodeid = get_local_nodeid(handle);
const char *local_name = get_local_node_name();
if (msg->sender.id > 0 && msg->sender.id != nodeid) {
crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
return NULL;
} else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
/* Not for us */
crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
return NULL;
} else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
/* Not for us */
crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
return NULL;
}
msg->sender.id = nodeid;
if (msg->sender.size == 0) {
crm_node_t *peer = crm_get_peer(nodeid, NULL);
if (peer == NULL) {
crm_err("Peer with nodeid=%u is unknown", nodeid);
} else if (peer->uname == NULL) {
crm_err("No uname for peer with nodeid=%u", nodeid);
} else {
crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
msg->sender.size = strlen(peer->uname);
memset(msg->sender.uname, 0, MAX_NAME);
memcpy(msg->sender.uname, peer->uname, msg->sender.size);
}
}
}
crm_trace("Got new%s message (size=%d, %d, %d)",
msg->is_compressed ? " compressed" : "",
msg_data_len(msg), msg->size, msg->compressed_size);
if (kind != NULL) {
*kind = msg->header.id;
}
if (from != NULL) {
*from = msg->sender.uname;
}
if (msg->is_compressed && msg->size > 0) {
int rc = BZ_OK;
char *uncompressed = NULL;
unsigned int new_size = msg->size + 1;
if (!check_message_sanity(msg)) {
goto badmsg;
}
crm_trace("Decompressing message data");
uncompressed = calloc(1, new_size);
rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
if (rc != BZ_OK) {
crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
bz2_strerror(rc), rc);
free(uncompressed);
goto badmsg;
}
CRM_ASSERT(rc == BZ_OK);
CRM_ASSERT(new_size == msg->size);
data = uncompressed;
} else if (!check_message_sanity(msg)) {
goto badmsg;
} else {
data = strdup(msg->data);
}
// Is this necessary?
crm_get_peer(msg->sender.id, msg->sender.uname);
crm_trace("Payload: %.200s", data);
return data;
badmsg:
crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
" min=%d, total=%d, size=%d, bz2_size=%d",
msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
msg->header.size, msg->size, msg->compressed_size);
free(data);
return NULL;
}
/*!
* \internal
* \brief Compare cpg_address objects by node ID
*
* \param[in] first First cpg_address structure to compare
* \param[in] second Second cpg_address structure to compare
*
* \return Negative number if first's node ID is lower,
* positive number if first's node ID is greater,
* or 0 if both node IDs are equal
*/
static int
cmp_member_list_nodeid(const void *first, const void *second)
{
const struct cpg_address *const a = *((const struct cpg_address **) first),
*const b = *((const struct cpg_address **) second);
if (a->nodeid < b->nodeid) {
return -1;
} else if (a->nodeid > b->nodeid) {
return 1;
}
/* don't bother with "reason" nor "pid" */
return 0;
}
/*!
* \internal
* \brief Get a readable string equivalent of a cpg_reason_t value
*
* \param[in] reason CPG reason value
*
* \return Readable string suitable for logging
*/
static const char *
cpgreason2str(cpg_reason_t reason)
{
switch (reason) {
case CPG_REASON_JOIN: return " via cpg_join";
case CPG_REASON_LEAVE: return " via cpg_leave";
case CPG_REASON_NODEDOWN: return " via cluster exit";
case CPG_REASON_NODEUP: return " via cluster join";
case CPG_REASON_PROCDOWN: return " for unknown reason";
default: break;
}
return "";
}
/*!
* \internal
* \brief Get a log-friendly node name
*
* \param[in] peer Node to check
*
* \return Node's uname, or readable string if not known
*/
static inline const char *
peer_name(const crm_node_t *peer)
{
if (peer == NULL) {
return "unknown node";
} else if (peer->uname == NULL) {
return "peer node";
} else {
return peer->uname;
}
}
/*!
* \internal
* \brief Process a CPG peer's leaving the cluster
*
* \param[in] cpg_group_name CPG group name (for logging)
* \param[in] event_counter Event number (for logging)
* \param[in] local_nodeid Node ID of local node
* \param[in] cpg_peer CPG peer that left
* \param[in] sorted_member_list List of remaining members, qsort()-ed by ID
* \param[in] member_list_entries Number of entries in \p sorted_member_list
*/
static void
node_left(const char *cpg_group_name, int event_counter,
uint32_t local_nodeid, const struct cpg_address *cpg_peer,
const struct cpg_address **sorted_member_list,
size_t member_list_entries)
{
crm_node_t *peer = pcmk__search_cluster_node_cache(cpg_peer->nodeid,
- NULL);
+ NULL, NULL);
const struct cpg_address **rival = NULL;
/* Most CPG-related Pacemaker code assumes that only one process on a node
* can be in the process group, but Corosync does not impose this
* limitation, and more than one can be a member in practice due to a
* daemon attempting to start while another instance is already running.
*
* Check for any such duplicate instances, because we don't want to process
* their leaving as if our actual peer left. If the peer that left still has
* an entry in sorted_member_list (with a different PID), we will ignore the
* leaving.
*
* @TODO Track CPG members' PIDs so we can tell exactly who left.
*/
if (peer != NULL) {
rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
sizeof(const struct cpg_address *),
cmp_member_list_nodeid);
}
if (rival == NULL) {
crm_info("Group %s event %d: %s (node %u pid %u) left%s",
cpg_group_name, event_counter, peer_name(peer),
cpg_peer->nodeid, cpg_peer->pid,
cpgreason2str(cpg_peer->reason));
if (peer != NULL) {
crm_update_peer_proc(__func__, peer, crm_proc_cpg,
OFFLINESTATUS);
}
} else if (cpg_peer->nodeid == local_nodeid) {
crm_warn("Group %s event %d: duplicate local pid %u left%s",
cpg_group_name, event_counter,
cpg_peer->pid, cpgreason2str(cpg_peer->reason));
} else {
crm_warn("Group %s event %d: "
"%s (node %u) duplicate pid %u left%s (%u remains)",
cpg_group_name, event_counter, peer_name(peer),
cpg_peer->nodeid, cpg_peer->pid,
cpgreason2str(cpg_peer->reason), (*rival)->pid);
}
}
/*!
* \brief Handle a CPG configuration change event
*
* \param[in] handle CPG connection
* \param[in] cpg_name CPG group name
* \param[in] member_list List of current CPG members
* \param[in] member_list_entries Number of entries in \p member_list
* \param[in] left_list List of CPG members that left
* \param[in] left_list_entries Number of entries in \p left_list
* \param[in] joined_list List of CPG members that joined
* \param[in] joined_list_entries Number of entries in \p joined_list
*/
void
pcmk_cpg_membership(cpg_handle_t handle,
const struct cpg_name *groupName,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
const struct cpg_address *joined_list, size_t joined_list_entries)
{
int i;
gboolean found = FALSE;
static int counter = 0;
uint32_t local_nodeid = get_local_nodeid(handle);
const struct cpg_address **sorted;
sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
CRM_ASSERT(sorted != NULL);
for (size_t iter = 0; iter < member_list_entries; iter++) {
sorted[iter] = member_list + iter;
}
/* so that the cross-matching multiply-subscribed nodes is then cheap */
qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
cmp_member_list_nodeid);
for (i = 0; i < left_list_entries; i++) {
node_left(groupName->value, counter, local_nodeid, &left_list[i],
sorted, member_list_entries);
}
free(sorted);
sorted = NULL;
for (i = 0; i < joined_list_entries; i++) {
crm_info("Group %s event %d: node %u pid %u joined%s",
groupName->value, counter, joined_list[i].nodeid,
joined_list[i].pid, cpgreason2str(joined_list[i].reason));
}
for (i = 0; i < member_list_entries; i++) {
crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
if (member_list[i].nodeid == local_nodeid
&& member_list[i].pid != getpid()) {
// See the note in node_left()
crm_warn("Group %s event %d: detected duplicate local pid %u",
groupName->value, counter, member_list[i].pid);
continue;
}
crm_info("Group %s event %d: %s (node %u pid %u) is member",
groupName->value, counter, peer_name(peer),
member_list[i].nodeid, member_list[i].pid);
/* If the caller left auto-reaping enabled, this will also update the
* state to member.
*/
peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
ONLINESTATUS);
if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
/* The node is a CPG member, but we currently think it's not a
* cluster member. This is possible only if auto-reaping was
* disabled. The node may be joining, and we happened to get the CPG
* notification before the quorum notification; or the node may have
* just died, and we are processing its final messages; or a bug
* has affected the peer cache.
*/
time_t now = time(NULL);
if (peer->when_lost == 0) {
// Track when we first got into this contradictory state
peer->when_lost = now;
} else if (now > (peer->when_lost + 60)) {
// If it persists for more than a minute, update the state
crm_warn("Node %u is member of group %s but was believed offline",
member_list[i].nodeid, groupName->value);
pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
}
}
if (local_nodeid == member_list[i].nodeid) {
found = TRUE;
}
}
if (!found) {
crm_err("Local node was evicted from group %s", groupName->value);
cpg_evicted = true;
}
counter++;
}
/*!
* \brief Connect to Corosync CPG
*
* \param[in,out] cluster Cluster object
*
* \return TRUE on success, otherwise FALSE
*/
gboolean
cluster_connect_cpg(crm_cluster_t *cluster)
{
cs_error_t rc;
int fd = -1;
int retries = 0;
uint32_t id = 0;
crm_node_t *peer = NULL;
cpg_handle_t handle = 0;
const char *message_name = pcmk__message_name(crm_system_name);
uid_t found_uid = 0;
gid_t found_gid = 0;
pid_t found_pid = 0;
int rv;
struct mainloop_fd_callbacks cpg_fd_callbacks = {
.dispatch = pcmk_cpg_dispatch,
.destroy = cluster->destroy,
};
cpg_model_v1_data_t cpg_model_info = {
.model = CPG_MODEL_V1,
.cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
.cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
.cpg_totem_confchg_fn = NULL,
.flags = 0,
};
cpg_evicted = false;
cluster->group.length = 0;
cluster->group.value[0] = 0;
/* group.value is char[128] */
strncpy(cluster->group.value, message_name, 127);
cluster->group.value[127] = 0;
cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
if (rc != CS_OK) {
crm_err("Could not connect to the CPG API: %s (%d)",
cs_strerror(rc), rc);
goto bail;
}
rc = cpg_fd_get(handle, &fd);
if (rc != CS_OK) {
crm_err("Could not obtain the CPG API connection: %s (%d)",
cs_strerror(rc), rc);
goto bail;
}
/* CPG provider run as root (in given user namespace, anyway)? */
if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
&found_uid, &found_gid))) {
crm_err("CPG provider is not authentic:"
" process %lld (uid: %lld, gid: %lld)",
(long long) PCMK__SPECIAL_PID_AS_0(found_pid),
(long long) found_uid, (long long) found_gid);
rc = CS_ERR_ACCESS;
goto bail;
} else if (rv < 0) {
crm_err("Could not verify authenticity of CPG provider: %s (%d)",
strerror(-rv), -rv);
rc = CS_ERR_ACCESS;
goto bail;
}
id = get_local_nodeid(handle);
if (id == 0) {
crm_err("Could not get local node id from the CPG API");
goto bail;
}
cluster->nodeid = id;
retries = 0;
cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
if (rc != CS_OK) {
crm_err("Could not join the CPG group '%s': %d", message_name, rc);
goto bail;
}
pcmk_cpg_handle = handle;
cluster->cpg_handle = handle;
mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
bail:
if (rc != CS_OK) {
cpg_finalize(handle);
return FALSE;
}
peer = crm_get_peer(id, NULL);
crm_update_peer_proc(__func__, peer, crm_proc_cpg, ONLINESTATUS);
return TRUE;
}
/*!
* \internal
* \brief Send an XML message via Corosync CPG
*
* \param[in] msg XML message to send
* \param[in] node Cluster node to send message to
* \param[in] dest Type of message to send
*
* \return TRUE on success, otherwise FALSE
*/
gboolean
pcmk__cpg_send_xml(xmlNode *msg, const crm_node_t *node,
enum crm_ais_msg_types dest)
{
gboolean rc = TRUE;
char *data = NULL;
data = dump_xml_unformatted(msg);
rc = send_cluster_text(crm_class_cluster, data, FALSE, node, dest);
free(data);
return rc;
}
/*!
* \internal
* \brief Send string data via Corosync CPG
*
* \param[in] msg_class Message class (to set as CPG header ID)
* \param[in] data Data to send
* \param[in] local What to set as host "local" value (which is never used)
* \param[in] node Cluster node to send message to
* \param[in] dest Type of message to send
*
* \return TRUE on success, otherwise FALSE
*/
gboolean
send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
gboolean local, const crm_node_t *node,
enum crm_ais_msg_types dest)
{
static int msg_id = 0;
static int local_pid = 0;
static int local_name_len = 0;
static const char *local_name = NULL;
char *target = NULL;
struct iovec *iov;
pcmk__cpg_msg_t *msg = NULL;
enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
switch (msg_class) {
case crm_class_cluster:
break;
default:
crm_err("Invalid message class: %d", msg_class);
return FALSE;
}
CRM_CHECK(dest != crm_msg_ais, return FALSE);
if (local_name == NULL) {
local_name = get_local_node_name();
}
if ((local_name_len == 0) && (local_name != NULL)) {
local_name_len = strlen(local_name);
}
if (data == NULL) {
data = "";
}
if (local_pid == 0) {
local_pid = getpid();
}
if (sender == crm_msg_none) {
sender = local_pid;
}
msg = calloc(1, sizeof(pcmk__cpg_msg_t));
msg_id++;
msg->id = msg_id;
msg->header.id = msg_class;
msg->header.error = CS_OK;
msg->host.type = dest;
msg->host.local = local;
if (node) {
if (node->uname) {
target = strdup(node->uname);
msg->host.size = strlen(node->uname);
memset(msg->host.uname, 0, MAX_NAME);
memcpy(msg->host.uname, node->uname, msg->host.size);
} else {
target = crm_strdup_printf("%u", node->id);
}
msg->host.id = node->id;
} else {
target = strdup("all");
}
msg->sender.id = 0;
msg->sender.type = sender;
msg->sender.pid = local_pid;
msg->sender.size = local_name_len;
memset(msg->sender.uname, 0, MAX_NAME);
if ((local_name != NULL) && (msg->sender.size != 0)) {
memcpy(msg->sender.uname, local_name, msg->sender.size);
}
msg->size = 1 + strlen(data);
msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
if (msg->size < CRM_BZ2_THRESHOLD) {
msg = pcmk__realloc(msg, msg->header.size);
memcpy(msg->data, data, msg->size);
} else {
char *compressed = NULL;
unsigned int new_size = 0;
char *uncompressed = strdup(data);
if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
&compressed, &new_size) == pcmk_rc_ok) {
msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
msg = pcmk__realloc(msg, msg->header.size);
memcpy(msg->data, compressed, new_size);
msg->is_compressed = TRUE;
msg->compressed_size = new_size;
} else {
// cppcheck seems not to understand the abort logic in pcmk__realloc
// cppcheck-suppress memleak
msg = pcmk__realloc(msg, msg->header.size);
memcpy(msg->data, data, msg->size);
}
free(uncompressed);
free(compressed);
}
iov = calloc(1, sizeof(struct iovec));
iov->iov_base = msg;
iov->iov_len = msg->header.size;
if (msg->compressed_size) {
crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
msg->id, target, (unsigned long long) iov->iov_len,
msg->compressed_size, data);
} else {
crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
msg->id, target, (unsigned long long) iov->iov_len,
msg->size, data);
}
free(target);
cs_message_queue = g_list_append(cs_message_queue, iov);
crm_cs_flush(&pcmk_cpg_handle);
return TRUE;
}
/*!
* \brief Get the message type equivalent of a string
*
* \param[in] text String of message type
*
* \return Message type equivalent of \p text
*/
enum crm_ais_msg_types
text2msg_type(const char *text)
{
int type = crm_msg_none;
CRM_CHECK(text != NULL, return type);
text = pcmk__message_name(text);
if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
type = crm_msg_ais;
} else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
type = crm_msg_cib;
} else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
type = crm_msg_crmd;
} else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
type = crm_msg_te;
} else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
type = crm_msg_pe;
} else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
type = crm_msg_lrmd;
} else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
type = crm_msg_stonithd;
} else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
type = crm_msg_stonith_ng;
} else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
type = crm_msg_attrd;
} else {
/* This will normally be a transient client rather than
* a cluster daemon. Set the type to the pid of the client
*/
int scan_rc = sscanf(text, "%d", &type);
if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
/* Ensure it's sane */
type = crm_msg_none;
}
}
return type;
}
diff --git a/lib/cluster/membership.c b/lib/cluster/membership.c
index 122ee99fd6..d5db518cbd 100644
--- a/lib/cluster/membership.c
+++ b/lib/cluster/membership.c
@@ -1,1315 +1,1328 @@
/*
* Copyright 2004-2023 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#ifndef _GNU_SOURCE
# define _GNU_SOURCE
#endif
#include <sys/param.h>
#include <sys/types.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <glib.h>
#include <crm/common/ipc.h>
#include <crm/common/xml_internal.h>
#include <crm/cluster/internal.h>
#include <crm/msg_xml.h>
#include <crm/stonith-ng.h>
#include "crmcluster_private.h"
/* The peer cache remembers cluster nodes that have been seen.
* This is managed mostly automatically by libcluster, based on
* cluster membership events.
*
* Because cluster nodes can have conflicting names or UUIDs,
* the hash table key is a uniquely generated ID.
*/
GHashTable *crm_peer_cache = NULL;
/*
* The remote peer cache tracks pacemaker_remote nodes. While the
* value has the same type as the peer cache's, it is tracked separately for
* three reasons: pacemaker_remote nodes can't have conflicting names or UUIDs,
* so the name (which is also the UUID) is used as the hash table key; there
* is no equivalent of membership events, so management is not automatic; and
* most users of the peer cache need to exclude pacemaker_remote nodes.
*
* That said, using a single cache would be more logical and less error-prone,
* so it would be a good idea to merge them one day.
*
* libcluster provides two avenues for populating the cache:
* crm_remote_peer_get() and crm_remote_peer_cache_remove() directly manage it,
* while crm_remote_peer_cache_refresh() populates it via the CIB.
*/
GHashTable *crm_remote_peer_cache = NULL;
/*
* The known node cache tracks cluster and remote nodes that have been seen in
* the CIB. It is useful mainly when a caller needs to know about a node that
* may no longer be in the membership, but doesn't want to add the node to the
* main peer cache tables.
*/
static GHashTable *known_node_cache = NULL;
unsigned long long crm_peer_seq = 0;
gboolean crm_have_quorum = FALSE;
static gboolean crm_autoreap = TRUE;
// Flag setting and clearing for crm_node_t:flags
#define set_peer_flags(peer, flags_to_set) do { \
(peer)->flags = pcmk__set_flags_as(__func__, __LINE__, LOG_TRACE, \
"Peer", (peer)->uname, \
(peer)->flags, (flags_to_set), \
#flags_to_set); \
} while (0)
#define clear_peer_flags(peer, flags_to_clear) do { \
(peer)->flags = pcmk__clear_flags_as(__func__, __LINE__, \
LOG_TRACE, \
"Peer", (peer)->uname, \
(peer)->flags, (flags_to_clear), \
#flags_to_clear); \
} while (0)
static void update_peer_uname(crm_node_t *node, const char *uname);
int
crm_remote_peer_cache_size(void)
{
if (crm_remote_peer_cache == NULL) {
return 0;
}
return g_hash_table_size(crm_remote_peer_cache);
}
/*!
* \brief Get a remote node peer cache entry, creating it if necessary
*
* \param[in] node_name Name of remote node
*
* \return Cache entry for node on success, NULL (and set errno) otherwise
*
* \note When creating a new entry, this will leave the node state undetermined,
* so the caller should also call pcmk__update_peer_state() if the state
* is known.
*/
crm_node_t *
crm_remote_peer_get(const char *node_name)
{
crm_node_t *node;
if (node_name == NULL) {
errno = -EINVAL;
return NULL;
}
/* Return existing cache entry if one exists */
node = g_hash_table_lookup(crm_remote_peer_cache, node_name);
if (node) {
return node;
}
/* Allocate a new entry */
node = calloc(1, sizeof(crm_node_t));
if (node == NULL) {
return NULL;
}
/* Populate the essential information */
set_peer_flags(node, crm_remote_node);
node->uuid = strdup(node_name);
if (node->uuid == NULL) {
free(node);
errno = -ENOMEM;
return NULL;
}
/* Add the new entry to the cache */
g_hash_table_replace(crm_remote_peer_cache, node->uuid, node);
crm_trace("added %s to remote cache", node_name);
/* Update the entry's uname, ensuring peer status callbacks are called */
update_peer_uname(node, node_name);
return node;
}
void
crm_remote_peer_cache_remove(const char *node_name)
{
if (g_hash_table_remove(crm_remote_peer_cache, node_name)) {
crm_trace("removed %s from remote peer cache", node_name);
}
}
/*!
* \internal
* \brief Return node status based on a CIB status entry
*
* \param[in] node_state XML of node state
*
* \return CRM_NODE_LOST if XML_NODE_IN_CLUSTER is false in node_state,
* CRM_NODE_MEMBER otherwise
* \note Unlike most boolean XML attributes, this one defaults to true, for
* backward compatibility with older controllers that don't set it.
*/
static const char *
remote_state_from_cib(const xmlNode *node_state)
{
bool status = false;
if (pcmk__xe_get_bool_attr(node_state, XML_NODE_IN_CLUSTER, &status) == pcmk_rc_ok && !status) {
return CRM_NODE_LOST;
} else {
return CRM_NODE_MEMBER;
}
}
/* user data for looping through remote node xpath searches */
struct refresh_data {
const char *field; /* XML attribute to check for node name */
gboolean has_state; /* whether to update node state based on XML */
};
/*!
* \internal
* \brief Process one pacemaker_remote node xpath search result
*
* \param[in] result XML search result
* \param[in] user_data what to look for in the XML
*/
static void
remote_cache_refresh_helper(xmlNode *result, void *user_data)
{
const struct refresh_data *data = user_data;
const char *remote = crm_element_value(result, data->field);
const char *state = NULL;
crm_node_t *node;
CRM_CHECK(remote != NULL, return);
/* Determine node's state, if the result has it */
if (data->has_state) {
state = remote_state_from_cib(result);
}
/* Check whether cache already has entry for node */
node = g_hash_table_lookup(crm_remote_peer_cache, remote);
if (node == NULL) {
/* Node is not in cache, so add a new entry for it */
node = crm_remote_peer_get(remote);
CRM_ASSERT(node);
if (state) {
pcmk__update_peer_state(__func__, node, state, 0);
}
} else if (pcmk_is_set(node->flags, crm_node_dirty)) {
/* Node is in cache and hasn't been updated already, so mark it clean */
clear_peer_flags(node, crm_node_dirty);
if (state) {
pcmk__update_peer_state(__func__, node, state, 0);
}
}
}
static void
mark_dirty(gpointer key, gpointer value, gpointer user_data)
{
set_peer_flags((crm_node_t *) value, crm_node_dirty);
}
static gboolean
is_dirty(gpointer key, gpointer value, gpointer user_data)
{
return pcmk_is_set(((crm_node_t*)value)->flags, crm_node_dirty);
}
/*!
* \brief Repopulate the remote peer cache based on CIB XML
*
* \param[in] xmlNode CIB XML to parse
*/
void
crm_remote_peer_cache_refresh(xmlNode *cib)
{
struct refresh_data data;
crm_peer_init();
/* First, we mark all existing cache entries as dirty,
* so that later we can remove any that weren't in the CIB.
* We don't empty the cache, because we need to detect changes in state.
*/
g_hash_table_foreach(crm_remote_peer_cache, mark_dirty, NULL);
/* Look for guest nodes and remote nodes in the status section */
data.field = "id";
data.has_state = TRUE;
crm_foreach_xpath_result(cib, PCMK__XP_REMOTE_NODE_STATUS,
remote_cache_refresh_helper, &data);
/* Look for guest nodes and remote nodes in the configuration section,
* because they may have just been added and not have a status entry yet.
* In that case, the cached node state will be left NULL, so that the
* peer status callback isn't called until we're sure the node started
* successfully.
*/
data.field = "value";
data.has_state = FALSE;
crm_foreach_xpath_result(cib, PCMK__XP_GUEST_NODE_CONFIG,
remote_cache_refresh_helper, &data);
data.field = "id";
data.has_state = FALSE;
crm_foreach_xpath_result(cib, PCMK__XP_REMOTE_NODE_CONFIG,
remote_cache_refresh_helper, &data);
/* Remove all old cache entries that weren't seen in the CIB */
g_hash_table_foreach_remove(crm_remote_peer_cache, is_dirty, NULL);
}
gboolean
crm_is_peer_active(const crm_node_t * node)
{
if(node == NULL) {
return FALSE;
}
if (pcmk_is_set(node->flags, crm_remote_node)) {
/* remote nodes are never considered active members. This
* guarantees they will never be considered for DC membership.*/
return FALSE;
}
#if SUPPORT_COROSYNC
if (is_corosync_cluster()) {
return crm_is_corosync_peer_active(node);
}
#endif
crm_err("Unhandled cluster type: %s", name_for_cluster_type(get_cluster_type()));
return FALSE;
}
static gboolean
crm_reap_dead_member(gpointer key, gpointer value, gpointer user_data)
{
crm_node_t *node = value;
crm_node_t *search = user_data;
if (search == NULL) {
return FALSE;
} else if (search->id && node->id != search->id) {
return FALSE;
} else if (search->id == 0 && !pcmk__str_eq(node->uname, search->uname, pcmk__str_casei)) {
return FALSE;
} else if (crm_is_peer_active(value) == FALSE) {
crm_info("Removing node with name %s and id %u from membership cache",
(node->uname? node->uname : "unknown"), node->id);
return TRUE;
}
return FALSE;
}
/*!
* \brief Remove all peer cache entries matching a node ID and/or uname
*
* \param[in] id ID of node to remove (or 0 to ignore)
* \param[in] name Uname of node to remove (or NULL to ignore)
*
* \return Number of cache entries removed
*/
guint
reap_crm_member(uint32_t id, const char *name)
{
int matches = 0;
crm_node_t search = { 0, };
if (crm_peer_cache == NULL) {
crm_trace("Membership cache not initialized, ignoring purge request");
return 0;
}
search.id = id;
pcmk__str_update(&search.uname, name);
matches = g_hash_table_foreach_remove(crm_peer_cache, crm_reap_dead_member, &search);
if(matches) {
crm_notice("Purged %d peer%s with id=%u%s%s from the membership cache",
matches, pcmk__plural_s(matches), search.id,
(search.uname? " and/or uname=" : ""),
(search.uname? search.uname : ""));
} else {
crm_info("No peers with id=%u%s%s to purge from the membership cache",
search.id, (search.uname? " and/or uname=" : ""),
(search.uname? search.uname : ""));
}
free(search.uname);
return matches;
}
static void
count_peer(gpointer key, gpointer value, gpointer user_data)
{
guint *count = user_data;
crm_node_t *node = value;
if (crm_is_peer_active(node)) {
*count = *count + 1;
}
}
guint
crm_active_peers(void)
{
guint count = 0;
if (crm_peer_cache) {
g_hash_table_foreach(crm_peer_cache, count_peer, &count);
}
return count;
}
static void
destroy_crm_node(gpointer data)
{
crm_node_t *node = data;
crm_trace("Destroying entry for node %u: %s", node->id, node->uname);
free(node->uname);
free(node->state);
free(node->uuid);
free(node->expected);
free(node->conn_host);
free(node);
}
void
crm_peer_init(void)
{
if (crm_peer_cache == NULL) {
crm_peer_cache = pcmk__strikey_table(free, destroy_crm_node);
}
if (crm_remote_peer_cache == NULL) {
crm_remote_peer_cache = pcmk__strikey_table(NULL, destroy_crm_node);
}
if (known_node_cache == NULL) {
known_node_cache = pcmk__strikey_table(free, destroy_crm_node);
}
}
void
crm_peer_destroy(void)
{
if (crm_peer_cache != NULL) {
crm_trace("Destroying peer cache with %d members", g_hash_table_size(crm_peer_cache));
g_hash_table_destroy(crm_peer_cache);
crm_peer_cache = NULL;
}
if (crm_remote_peer_cache != NULL) {
crm_trace("Destroying remote peer cache with %d members", g_hash_table_size(crm_remote_peer_cache));
g_hash_table_destroy(crm_remote_peer_cache);
crm_remote_peer_cache = NULL;
}
if (known_node_cache != NULL) {
crm_trace("Destroying known node cache with %d members",
g_hash_table_size(known_node_cache));
g_hash_table_destroy(known_node_cache);
known_node_cache = NULL;
}
}
static void (*peer_status_callback)(enum crm_status_type, crm_node_t *,
const void *) = NULL;
/*!
* \brief Set a client function that will be called after peer status changes
*
* \param[in] dispatch Pointer to function to use as callback
*
* \note Previously, client callbacks were responsible for peer cache
* management. This is no longer the case, and client callbacks should do
* only client-specific handling. Callbacks MUST NOT add or remove entries
* in the peer caches.
*/
void
crm_set_status_callback(void (*dispatch) (enum crm_status_type, crm_node_t *, const void *))
{
peer_status_callback = dispatch;
}
/*!
* \brief Tell the library whether to automatically reap lost nodes
*
* If TRUE (the default), calling crm_update_peer_proc() will also update the
* peer state to CRM_NODE_MEMBER or CRM_NODE_LOST, and pcmk__update_peer_state()
* will reap peers whose state changes to anything other than CRM_NODE_MEMBER.
* Callers should leave this enabled unless they plan to manage the cache
* separately on their own.
*
* \param[in] autoreap TRUE to enable automatic reaping, FALSE to disable
*/
void
crm_set_autoreap(gboolean autoreap)
{
crm_autoreap = autoreap;
}
static void
dump_peer_hash(int level, const char *caller)
{
GHashTableIter iter;
const char *id = NULL;
crm_node_t *node = NULL;
g_hash_table_iter_init(&iter, crm_peer_cache);
while (g_hash_table_iter_next(&iter, (gpointer *) &id, (gpointer *) &node)) {
do_crm_log(level, "%s: Node %u/%s = %p - %s", caller, node->id, node->uname, node, id);
}
}
static gboolean
hash_find_by_data(gpointer key, gpointer value, gpointer user_data)
{
return value == user_data;
}
/*!
* \internal
* \brief Search caches for a node (cluster or Pacemaker Remote)
*
* \param[in] id If not 0, cluster node ID to search for
* \param[in] uname If not NULL, node name to search for
* \param[in] flags Bitmask of enum crm_get_peer_flags
*
* \return Node cache entry if found, otherwise NULL
*/
crm_node_t *
pcmk__search_node_caches(unsigned int id, const char *uname, uint32_t flags)
{
crm_node_t *node = NULL;
CRM_ASSERT(id > 0 || uname != NULL);
crm_peer_init();
if ((uname != NULL) && pcmk_is_set(flags, CRM_GET_PEER_REMOTE)) {
node = g_hash_table_lookup(crm_remote_peer_cache, uname);
}
if ((node == NULL) && pcmk_is_set(flags, CRM_GET_PEER_CLUSTER)) {
- node = pcmk__search_cluster_node_cache(id, uname);
+ node = pcmk__search_cluster_node_cache(id, uname, NULL);
}
return node;
}
/*!
* \brief Get a node cache entry (cluster or Pacemaker Remote)
*
* \param[in] id If not 0, cluster node ID to search for
* \param[in] uname If not NULL, node name to search for
* \param[in] flags Bitmask of enum crm_get_peer_flags
*
* \return (Possibly newly created) node cache entry
*/
crm_node_t *
crm_get_peer_full(unsigned int id, const char *uname, int flags)
{
crm_node_t *node = NULL;
CRM_ASSERT(id > 0 || uname != NULL);
crm_peer_init();
if (pcmk_is_set(flags, CRM_GET_PEER_REMOTE)) {
node = g_hash_table_lookup(crm_remote_peer_cache, uname);
}
if ((node == NULL) && pcmk_is_set(flags, CRM_GET_PEER_CLUSTER)) {
node = crm_get_peer(id, uname);
}
return node;
}
/*!
* \internal
* \brief Search cluster node cache
*
* \param[in] id If not 0, cluster node ID to search for
* \param[in] uname If not NULL, node name to search for
+ * \param[in] uuid If not NULL while id is 0, node UUID instead of cluster
+ * node ID to search for
*
* \return Cluster node cache entry if found, otherwise NULL
*/
crm_node_t *
-pcmk__search_cluster_node_cache(unsigned int id, const char *uname)
+pcmk__search_cluster_node_cache(unsigned int id, const char *uname,
+ const char *uuid)
{
GHashTableIter iter;
crm_node_t *node = NULL;
crm_node_t *by_id = NULL;
crm_node_t *by_name = NULL;
CRM_ASSERT(id > 0 || uname != NULL);
crm_peer_init();
if (uname != NULL) {
g_hash_table_iter_init(&iter, crm_peer_cache);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
if(node->uname && strcasecmp(node->uname, uname) == 0) {
crm_trace("Name match: %s = %p", node->uname, node);
by_name = node;
break;
}
}
}
if (id > 0) {
g_hash_table_iter_init(&iter, crm_peer_cache);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
if(node->id == id) {
crm_trace("ID match: %u = %p", node->id, node);
by_id = node;
break;
}
}
+
+ } else if (uuid != NULL) {
+ g_hash_table_iter_init(&iter, crm_peer_cache);
+ while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
+ if (pcmk__str_eq(node->uuid, uuid, pcmk__str_casei)) {
+ crm_trace("UUID match: %s = %p", node->uuid, node);
+ by_id = node;
+ break;
+ }
+ }
}
node = by_id; /* Good default */
if(by_id == by_name) {
/* Nothing to do if they match (both NULL counts) */
crm_trace("Consistent: %p for %u/%s", by_id, id, uname);
} else if(by_id == NULL && by_name) {
crm_trace("Only one: %p for %u/%s", by_name, id, uname);
if(id && by_name->id) {
dump_peer_hash(LOG_WARNING, __func__);
crm_crit("Node %u and %u share the same name '%s'",
id, by_name->id, uname);
node = NULL; /* Create a new one */
} else {
node = by_name;
}
} else if(by_name == NULL && by_id) {
crm_trace("Only one: %p for %u/%s", by_id, id, uname);
if(uname && by_id->uname) {
dump_peer_hash(LOG_WARNING, __func__);
crm_crit("Node '%s' and '%s' share the same cluster nodeid %u: assuming '%s' is correct",
uname, by_id->uname, id, uname);
}
} else if(uname && by_id->uname) {
if(pcmk__str_eq(uname, by_id->uname, pcmk__str_casei)) {
crm_notice("Node '%s' has changed its ID from %u to %u", by_id->uname, by_name->id, by_id->id);
g_hash_table_foreach_remove(crm_peer_cache, hash_find_by_data, by_name);
} else {
crm_warn("Node '%s' and '%s' share the same cluster nodeid: %u %s", by_id->uname, by_name->uname, id, uname);
dump_peer_hash(LOG_INFO, __func__);
crm_abort(__FILE__, __func__, __LINE__, "member weirdness", TRUE,
TRUE);
}
} else if(id && by_name->id) {
crm_warn("Node %u and %u share the same name: '%s'", by_id->id, by_name->id, uname);
} else {
/* Simple merge */
/* Only corosync-based clusters use node IDs. The functions that call
* pcmk__update_peer_state() and crm_update_peer_proc() only know
* nodeid, so 'by_id' is authoritative when merging.
*/
dump_peer_hash(LOG_DEBUG, __func__);
crm_info("Merging %p into %p", by_name, by_id);
g_hash_table_foreach_remove(crm_peer_cache, hash_find_by_data, by_name);
}
return node;
}
#if SUPPORT_COROSYNC
static guint
remove_conflicting_peer(crm_node_t *node)
{
int matches = 0;
GHashTableIter iter;
crm_node_t *existing_node = NULL;
if (node->id == 0 || node->uname == NULL) {
return 0;
}
if (!pcmk__corosync_has_nodelist()) {
return 0;
}
g_hash_table_iter_init(&iter, crm_peer_cache);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &existing_node)) {
if (existing_node->id > 0
&& existing_node->id != node->id
&& existing_node->uname != NULL
&& strcasecmp(existing_node->uname, node->uname) == 0) {
if (crm_is_peer_active(existing_node)) {
continue;
}
crm_warn("Removing cached offline node %u/%s which has conflicting uname with %u",
existing_node->id, existing_node->uname, node->id);
g_hash_table_iter_remove(&iter);
matches++;
}
}
return matches;
}
#endif
/*!
* \brief Get a cluster node cache entry
*
* \param[in] id If not 0, cluster node ID to search for
* \param[in] uname If not NULL, node name to search for
*
* \return (Possibly newly created) cluster node cache entry
*/
/* coverity[-alloc] Memory is referenced in one or both hashtables */
crm_node_t *
crm_get_peer(unsigned int id, const char *uname)
{
crm_node_t *node = NULL;
char *uname_lookup = NULL;
CRM_ASSERT(id > 0 || uname != NULL);
crm_peer_init();
- node = pcmk__search_cluster_node_cache(id, uname);
+ node = pcmk__search_cluster_node_cache(id, uname, NULL);
/* if uname wasn't provided, and find_peer did not turn up a uname based on id.
* we need to do a lookup of the node name using the id in the cluster membership. */
if ((node == NULL || node->uname == NULL) && (uname == NULL)) {
uname_lookup = get_node_name(id);
}
if (uname_lookup) {
uname = uname_lookup;
crm_trace("Inferred a name of '%s' for node %u", uname, id);
/* try to turn up the node one more time now that we know the uname. */
if (node == NULL) {
- node = pcmk__search_cluster_node_cache(id, uname);
+ node = pcmk__search_cluster_node_cache(id, uname, NULL);
}
}
if (node == NULL) {
char *uniqueid = crm_generate_uuid();
node = calloc(1, sizeof(crm_node_t));
CRM_ASSERT(node);
crm_info("Created entry %s/%p for node %s/%u (%d total)",
uniqueid, node, uname, id, 1 + g_hash_table_size(crm_peer_cache));
g_hash_table_replace(crm_peer_cache, uniqueid, node);
}
if(id > 0 && uname && (node->id == 0 || node->uname == NULL)) {
crm_info("Node %u is now known as %s", id, uname);
}
if(id > 0 && node->id == 0) {
node->id = id;
}
if (uname && (node->uname == NULL)) {
update_peer_uname(node, uname);
}
if(node->uuid == NULL) {
const char *uuid = crm_peer_uuid(node);
if (uuid) {
crm_info("Node %u has uuid %s", id, uuid);
} else {
crm_info("Cannot obtain a UUID for node %u/%s", id, node->uname);
}
}
free(uname_lookup);
return node;
}
/*!
* \internal
* \brief Update a node's uname
*
* \param[in,out] node Node object to update
* \param[in] uname New name to set
*
* \note This function should not be called within a peer cache iteration,
* because in some cases it can remove conflicting cache entries,
* which would invalidate the iterator.
*/
static void
update_peer_uname(crm_node_t *node, const char *uname)
{
CRM_CHECK(uname != NULL,
crm_err("Bug: can't update node name without name"); return);
CRM_CHECK(node != NULL,
crm_err("Bug: can't update node name to %s without node", uname);
return);
if (pcmk__str_eq(uname, node->uname, pcmk__str_casei)) {
crm_debug("Node uname '%s' did not change", uname);
return;
}
for (const char *c = uname; *c; ++c) {
if ((*c >= 'A') && (*c <= 'Z')) {
crm_warn("Node names with capitals are discouraged, consider changing '%s'",
uname);
break;
}
}
pcmk__str_update(&node->uname, uname);
if (peer_status_callback != NULL) {
peer_status_callback(crm_status_uname, node, NULL);
}
#if SUPPORT_COROSYNC
if (is_corosync_cluster() && !pcmk_is_set(node->flags, crm_remote_node)) {
remove_conflicting_peer(node);
}
#endif
}
/*!
* \internal
* \brief Get log-friendly string equivalent of a process flag
*
* \param[in] proc Process flag
*
* \return Log-friendly string equivalent of \p proc
*/
static inline const char *
proc2text(enum crm_proc_flag proc)
{
const char *text = "unknown";
switch (proc) {
case crm_proc_none:
text = "none";
break;
case crm_proc_based:
text = "pacemaker-based";
break;
case crm_proc_controld:
text = "pacemaker-controld";
break;
case crm_proc_schedulerd:
text = "pacemaker-schedulerd";
break;
case crm_proc_execd:
text = "pacemaker-execd";
break;
case crm_proc_attrd:
text = "pacemaker-attrd";
break;
case crm_proc_fenced:
text = "pacemaker-fenced";
break;
case crm_proc_cpg:
text = "corosync-cpg";
break;
}
return text;
}
/*!
* \internal
* \brief Update a node's process information (and potentially state)
*
* \param[in] source Caller's function name (for log messages)
* \param[in,out] node Node object to update
* \param[in] flag Bitmask of new process information
* \param[in] status node status (online, offline, etc.)
*
* \return NULL if any node was reaped from peer caches, value of node otherwise
*
* \note If this function returns NULL, the supplied node object was likely
* freed and should not be used again. This function should not be
* called within a cache iteration if reaping is possible, otherwise
* reaping could invalidate the iterator.
*/
crm_node_t *
crm_update_peer_proc(const char *source, crm_node_t * node, uint32_t flag, const char *status)
{
uint32_t last = 0;
gboolean changed = FALSE;
CRM_CHECK(node != NULL, crm_err("%s: Could not set %s to %s for NULL",
source, proc2text(flag), status);
return NULL);
/* Pacemaker doesn't spawn processes on remote nodes */
if (pcmk_is_set(node->flags, crm_remote_node)) {
return node;
}
last = node->processes;
if (status == NULL) {
node->processes = flag;
if (node->processes != last) {
changed = TRUE;
}
} else if (pcmk__str_eq(status, ONLINESTATUS, pcmk__str_casei)) {
if ((node->processes & flag) != flag) {
node->processes = pcmk__set_flags_as(__func__, __LINE__,
LOG_TRACE, "Peer process",
node->uname, node->processes,
flag, "processes");
changed = TRUE;
}
} else if (node->processes & flag) {
node->processes = pcmk__clear_flags_as(__func__, __LINE__,
LOG_TRACE, "Peer process",
node->uname, node->processes,
flag, "processes");
changed = TRUE;
}
if (changed) {
if (status == NULL && flag <= crm_proc_none) {
crm_info("%s: Node %s[%u] - all processes are now offline", source, node->uname,
node->id);
} else {
crm_info("%s: Node %s[%u] - %s is now %s", source, node->uname, node->id,
proc2text(flag), status);
}
if (pcmk_is_set(node->processes, crm_get_cluster_proc())) {
node->when_online = time(NULL);
} else {
node->when_online = 0;
}
/* Call the client callback first, then update the peer state,
* in case the node will be reaped
*/
if (peer_status_callback != NULL) {
peer_status_callback(crm_status_processes, node, &last);
}
/* The client callback shouldn't touch the peer caches,
* but as a safety net, bail if the peer cache was destroyed.
*/
if (crm_peer_cache == NULL) {
return NULL;
}
if (crm_autoreap) {
const char *peer_state = NULL;
if (pcmk_is_set(node->processes, crm_get_cluster_proc())) {
peer_state = CRM_NODE_MEMBER;
} else {
peer_state = CRM_NODE_LOST;
}
node = pcmk__update_peer_state(__func__, node, peer_state, 0);
}
} else {
crm_trace("%s: Node %s[%u] - %s is unchanged (%s)", source, node->uname, node->id,
proc2text(flag), status);
}
return node;
}
/*!
* \internal
* \brief Update a cluster node cache entry's expected join state
*
* \param[in] source Caller's function name (for logging)
* \param[in,out] node Node to update
* \param[in] expected Node's new join state
*/
void
pcmk__update_peer_expected(const char *source, crm_node_t *node,
const char *expected)
{
char *last = NULL;
gboolean changed = FALSE;
CRM_CHECK(node != NULL, crm_err("%s: Could not set 'expected' to %s", source, expected);
return);
/* Remote nodes don't participate in joins */
if (pcmk_is_set(node->flags, crm_remote_node)) {
return;
}
last = node->expected;
if (expected != NULL && !pcmk__str_eq(node->expected, expected, pcmk__str_casei)) {
node->expected = strdup(expected);
changed = TRUE;
}
if (changed) {
crm_info("%s: Node %s[%u] - expected state is now %s (was %s)", source, node->uname, node->id,
expected, last);
free(last);
} else {
crm_trace("%s: Node %s[%u] - expected state is unchanged (%s)", source, node->uname,
node->id, expected);
}
}
/*!
* \internal
* \brief Update a node's state and membership information
*
* \param[in] source Caller's function name (for log messages)
* \param[in,out] node Node object to update
* \param[in] state Node's new state
* \param[in] membership Node's new membership ID
* \param[in,out] iter If not NULL, pointer to node's peer cache iterator
*
* \return NULL if any node was reaped, value of node otherwise
*
* \note If this function returns NULL, the supplied node object was likely
* freed and should not be used again. This function may be called from
* within a peer cache iteration if the iterator is supplied.
*/
static crm_node_t *
update_peer_state_iter(const char *source, crm_node_t *node, const char *state,
uint64_t membership, GHashTableIter *iter)
{
gboolean is_member;
CRM_CHECK(node != NULL,
crm_err("Could not set state for unknown host to %s"
CRM_XS " source=%s", state, source);
return NULL);
is_member = pcmk__str_eq(state, CRM_NODE_MEMBER, pcmk__str_casei);
if (is_member) {
node->when_lost = 0;
if (membership) {
node->last_seen = membership;
}
}
if (state && !pcmk__str_eq(node->state, state, pcmk__str_casei)) {
char *last = node->state;
if (is_member) {
node->when_member = time(NULL);
} else {
node->when_member = 0;
}
node->state = strdup(state);
crm_notice("Node %s state is now %s " CRM_XS
" nodeid=%u previous=%s source=%s", node->uname, state,
node->id, (last? last : "unknown"), source);
if (peer_status_callback != NULL) {
peer_status_callback(crm_status_nstate, node, last);
}
free(last);
if (crm_autoreap && !is_member
&& !pcmk_is_set(node->flags, crm_remote_node)) {
/* We only autoreap from the peer cache, not the remote peer cache,
* because the latter should be managed only by
* crm_remote_peer_cache_refresh().
*/
if(iter) {
crm_notice("Purged 1 peer with id=%u and/or uname=%s from the membership cache", node->id, node->uname);
g_hash_table_iter_remove(iter);
} else {
reap_crm_member(node->id, node->uname);
}
node = NULL;
}
} else {
crm_trace("Node %s state is unchanged (%s) " CRM_XS
" nodeid=%u source=%s", node->uname, state, node->id, source);
}
return node;
}
/*!
* \brief Update a node's state and membership information
*
* \param[in] source Caller's function name (for log messages)
* \param[in,out] node Node object to update
* \param[in] state Node's new state
* \param[in] membership Node's new membership ID
*
* \return NULL if any node was reaped, value of node otherwise
*
* \note If this function returns NULL, the supplied node object was likely
* freed and should not be used again. This function should not be
* called within a cache iteration if reaping is possible,
* otherwise reaping could invalidate the iterator.
*/
crm_node_t *
pcmk__update_peer_state(const char *source, crm_node_t *node,
const char *state, uint64_t membership)
{
return update_peer_state_iter(source, node, state, membership, NULL);
}
/*!
* \internal
* \brief Reap all nodes from cache whose membership information does not match
*
* \param[in] membership Membership ID of nodes to keep
*/
void
pcmk__reap_unseen_nodes(uint64_t membership)
{
GHashTableIter iter;
crm_node_t *node = NULL;
crm_trace("Reaping unseen nodes...");
g_hash_table_iter_init(&iter, crm_peer_cache);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *)&node)) {
if (node->last_seen != membership) {
if (node->state) {
/*
* Calling update_peer_state_iter() allows us to
* remove the node from crm_peer_cache without
* invalidating our iterator
*/
update_peer_state_iter(__func__, node, CRM_NODE_LOST,
membership, &iter);
} else {
crm_info("State of node %s[%u] is still unknown",
node->uname, node->id);
}
}
}
}
static crm_node_t *
find_known_node(const char *id, const char *uname)
{
GHashTableIter iter;
crm_node_t *node = NULL;
crm_node_t *by_id = NULL;
crm_node_t *by_name = NULL;
if (uname) {
g_hash_table_iter_init(&iter, known_node_cache);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
if (node->uname && strcasecmp(node->uname, uname) == 0) {
crm_trace("Name match: %s = %p", node->uname, node);
by_name = node;
break;
}
}
}
if (id) {
g_hash_table_iter_init(&iter, known_node_cache);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
if(strcasecmp(node->uuid, id) == 0) {
crm_trace("ID match: %s= %p", id, node);
by_id = node;
break;
}
}
}
node = by_id; /* Good default */
if (by_id == by_name) {
/* Nothing to do if they match (both NULL counts) */
crm_trace("Consistent: %p for %s/%s", by_id, id, uname);
} else if (by_id == NULL && by_name) {
crm_trace("Only one: %p for %s/%s", by_name, id, uname);
if (id) {
node = NULL;
} else {
node = by_name;
}
} else if (by_name == NULL && by_id) {
crm_trace("Only one: %p for %s/%s", by_id, id, uname);
if (uname) {
node = NULL;
}
} else if (uname && by_id->uname
&& pcmk__str_eq(uname, by_id->uname, pcmk__str_casei)) {
/* Multiple nodes have the same uname in the CIB.
* Return by_id. */
} else if (id && by_name->uuid
&& pcmk__str_eq(id, by_name->uuid, pcmk__str_casei)) {
/* Multiple nodes have the same id in the CIB.
* Return by_name. */
node = by_name;
} else {
node = NULL;
}
if (node == NULL) {
crm_debug("Couldn't find node%s%s%s%s",
id? " " : "",
id? id : "",
uname? " with name " : "",
uname? uname : "");
}
return node;
}
static void
known_node_cache_refresh_helper(xmlNode *xml_node, void *user_data)
{
const char *id = crm_element_value(xml_node, XML_ATTR_ID);
const char *uname = crm_element_value(xml_node, XML_ATTR_UNAME);
crm_node_t * node = NULL;
CRM_CHECK(id != NULL && uname !=NULL, return);
node = find_known_node(id, uname);
if (node == NULL) {
char *uniqueid = crm_generate_uuid();
node = calloc(1, sizeof(crm_node_t));
CRM_ASSERT(node != NULL);
node->uname = strdup(uname);
CRM_ASSERT(node->uname != NULL);
node->uuid = strdup(id);
CRM_ASSERT(node->uuid != NULL);
g_hash_table_replace(known_node_cache, uniqueid, node);
} else if (pcmk_is_set(node->flags, crm_node_dirty)) {
pcmk__str_update(&node->uname, uname);
/* Node is in cache and hasn't been updated already, so mark it clean */
clear_peer_flags(node, crm_node_dirty);
}
}
static void
refresh_known_node_cache(xmlNode *cib)
{
crm_peer_init();
g_hash_table_foreach(known_node_cache, mark_dirty, NULL);
crm_foreach_xpath_result(cib, PCMK__XP_MEMBER_NODE_CONFIG,
known_node_cache_refresh_helper, NULL);
/* Remove all old cache entries that weren't seen in the CIB */
g_hash_table_foreach_remove(known_node_cache, is_dirty, NULL);
}
void
pcmk__refresh_node_caches_from_cib(xmlNode *cib)
{
crm_remote_peer_cache_refresh(cib);
refresh_known_node_cache(cib);
}
/*!
* \internal
* \brief Search known node cache
*
* \param[in] id If not 0, cluster node ID to search for
* \param[in] uname If not NULL, node name to search for
* \param[in] flags Bitmask of enum crm_get_peer_flags
*
* \return Known node cache entry if found, otherwise NULL
*/
crm_node_t *
pcmk__search_known_node_cache(unsigned int id, const char *uname,
uint32_t flags)
{
crm_node_t *node = NULL;
char *id_str = NULL;
CRM_ASSERT(id > 0 || uname != NULL);
node = pcmk__search_node_caches(id, uname, flags);
if (node || !(flags & CRM_GET_PEER_CLUSTER)) {
return node;
}
if (id > 0) {
id_str = crm_strdup_printf("%u", id);
}
node = find_known_node(id_str, uname);
free(id_str);
return node;
}
// Deprecated functions kept only for backward API compatibility
// LCOV_EXCL_START
#include <crm/cluster/compat.h>
int
crm_terminate_member(int nodeid, const char *uname, void *unused)
{
return stonith_api_kick(nodeid, uname, 120, TRUE);
}
int
crm_terminate_member_no_mainloop(int nodeid, const char *uname, int *connection)
{
return stonith_api_kick(nodeid, uname, 120, TRUE);
}
// LCOV_EXCL_STOP
// End deprecated API

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 3:23 PM (19 h, 44 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1018846
Default Alt Text
(197 KB)

Event Timeline