Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F1842310
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
197 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/daemons/attrd/attrd_corosync.c b/daemons/attrd/attrd_corosync.c
index ef205e6e13..086ec03d95 100644
--- a/daemons/attrd/attrd_corosync.c
+++ b/daemons/attrd/attrd_corosync.c
@@ -1,620 +1,620 @@
/*
* Copyright 2013-2023 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU General Public License version 2
* or later (GPLv2+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <errno.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <crm/cluster.h>
#include <crm/cluster/internal.h>
#include <crm/common/logging.h>
#include <crm/common/results.h>
#include <crm/common/strings_internal.h>
#include <crm/msg_xml.h>
#include "pacemaker-attrd.h"
extern crm_exit_t attrd_exit_status;
static xmlNode *
attrd_confirmation(int callid)
{
xmlNode *node = create_xml_node(NULL, __func__);
crm_xml_add(node, F_TYPE, T_ATTRD);
crm_xml_add(node, F_ORIG, get_local_node_name());
crm_xml_add(node, PCMK__XA_TASK, PCMK__ATTRD_CMD_CONFIRM);
crm_xml_add_int(node, XML_LRM_ATTR_CALLID, callid);
return node;
}
static void
attrd_peer_message(crm_node_t *peer, xmlNode *xml)
{
const char *election_op = crm_element_value(xml, F_CRM_TASK);
if (election_op) {
attrd_handle_election_op(peer, xml);
return;
}
if (attrd_shutting_down()) {
/* If we're shutting down, we want to continue responding to election
* ops as long as we're a cluster member (because our vote may be
* needed). Ignore all other messages.
*/
return;
} else {
pcmk__request_t request = {
.ipc_client = NULL,
.ipc_id = 0,
.ipc_flags = 0,
.peer = peer->uname,
.xml = xml,
.call_options = 0,
.result = PCMK__UNKNOWN_RESULT,
};
request.op = crm_element_value_copy(request.xml, PCMK__XA_TASK);
CRM_CHECK(request.op != NULL, return);
attrd_handle_request(&request);
/* Having finished handling the request, check to see if the originating
* peer requested confirmation. If so, send that confirmation back now.
*/
if (pcmk__xe_attr_is_true(xml, PCMK__XA_CONFIRM) &&
!pcmk__str_eq(request.op, PCMK__ATTRD_CMD_CONFIRM, pcmk__str_none)) {
int callid = 0;
xmlNode *reply = NULL;
/* Add the confirmation ID for the message we are confirming to the
* response so the originating peer knows what they're a confirmation
* for.
*/
crm_element_value_int(xml, XML_LRM_ATTR_CALLID, &callid);
reply = attrd_confirmation(callid);
/* And then send the confirmation back to the originating peer. This
* ends up right back in this same function (attrd_peer_message) on the
* peer where it will have to do something with a PCMK__XA_CONFIRM type
* message.
*/
crm_debug("Sending %s a confirmation", peer->uname);
attrd_send_message(peer, reply, false);
free_xml(reply);
}
pcmk__reset_request(&request);
}
}
static void
attrd_cpg_dispatch(cpg_handle_t handle,
const struct cpg_name *groupName,
uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
{
uint32_t kind = 0;
xmlNode *xml = NULL;
const char *from = NULL;
char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
if(data == NULL) {
return;
}
if (kind == crm_class_cluster) {
xml = string2xml(data);
}
if (xml == NULL) {
crm_err("Bad message of class %d received from %s[%u]: '%.120s'", kind, from, nodeid, data);
} else {
crm_node_t *peer = crm_get_peer(nodeid, from);
attrd_peer_message(peer, xml);
}
free_xml(xml);
free(data);
}
static void
attrd_cpg_destroy(gpointer unused)
{
if (attrd_shutting_down()) {
crm_info("Corosync disconnection complete");
} else {
crm_crit("Lost connection to cluster layer, shutting down");
attrd_exit_status = CRM_EX_DISCONNECT;
attrd_shutdown(0);
}
}
/*!
* \internal
* \brief Override an attribute sync with a local value
*
* Broadcast the local node's value for an attribute that's different from the
* value provided in a peer's attribute synchronization response. This ensures a
* node's values for itself take precedence and all peers are kept in sync.
*
* \param[in] a Attribute entry to override
*
* \return Local instance of attribute value
*/
static attribute_value_t *
broadcast_local_value(const attribute_t *a)
{
attribute_value_t *v = g_hash_table_lookup(a->values, attrd_cluster->uname);
xmlNode *sync = create_xml_node(NULL, __func__);
crm_xml_add(sync, PCMK__XA_TASK, PCMK__ATTRD_CMD_SYNC_RESPONSE);
attrd_add_value_xml(sync, a, v, false);
attrd_send_message(NULL, sync, false);
free_xml(sync);
return v;
}
/*!
* \internal
* \brief Ensure a Pacemaker Remote node is in the correct peer cache
*
* \param[in] node_name Name of Pacemaker Remote node to check
*/
static void
cache_remote_node(const char *node_name)
{
/* If we previously assumed this node was an unseen cluster node,
* remove its entry from the cluster peer cache.
*/
- crm_node_t *dup = pcmk__search_cluster_node_cache(0, node_name);
+ crm_node_t *dup = pcmk__search_cluster_node_cache(0, node_name, NULL);
if (dup && (dup->uuid == NULL)) {
reap_crm_member(0, node_name);
}
// Ensure node is in the remote peer cache
CRM_ASSERT(crm_remote_peer_get(node_name) != NULL);
}
#define state_text(state) pcmk__s((state), "in unknown state")
/*!
* \internal
* \brief Return host's hash table entry (creating one if needed)
*
* \param[in,out] values Hash table of values
* \param[in] host Name of peer to look up
* \param[in] xml XML describing the attribute
*
* \return Pointer to new or existing hash table entry
*/
static attribute_value_t *
attrd_lookup_or_create_value(GHashTable *values, const char *host,
const xmlNode *xml)
{
attribute_value_t *v = g_hash_table_lookup(values, host);
int is_remote = 0;
crm_element_value_int(xml, PCMK__XA_ATTR_IS_REMOTE, &is_remote);
if (is_remote) {
cache_remote_node(host);
}
if (v == NULL) {
v = calloc(1, sizeof(attribute_value_t));
CRM_ASSERT(v != NULL);
pcmk__str_update(&v->nodename, host);
v->is_remote = is_remote;
g_hash_table_replace(values, v->nodename, v);
}
return(v);
}
static void
attrd_peer_change_cb(enum crm_status_type kind, crm_node_t *peer, const void *data)
{
bool gone = false;
bool is_remote = pcmk_is_set(peer->flags, crm_remote_node);
switch (kind) {
case crm_status_uname:
crm_debug("%s node %s is now %s",
(is_remote? "Remote" : "Cluster"),
peer->uname, state_text(peer->state));
break;
case crm_status_processes:
if (!pcmk_is_set(peer->processes, crm_get_cluster_proc())) {
gone = true;
}
crm_debug("Node %s is %s a peer",
peer->uname, (gone? "no longer" : "now"));
break;
case crm_status_nstate:
crm_debug("%s node %s is now %s (was %s)",
(is_remote? "Remote" : "Cluster"),
peer->uname, state_text(peer->state), state_text(data));
if (pcmk__str_eq(peer->state, CRM_NODE_MEMBER, pcmk__str_casei)) {
/* If we're the writer, send new peers a list of all attributes
* (unless it's a remote node, which doesn't run its own attrd)
*/
if (attrd_election_won()
&& !pcmk_is_set(peer->flags, crm_remote_node)) {
attrd_peer_sync(peer, NULL);
}
} else {
// Remove all attribute values associated with lost nodes
attrd_peer_remove(peer->uname, false, "loss");
gone = true;
}
break;
}
// Remove votes from cluster nodes that leave, in case election in progress
if (gone && !is_remote) {
attrd_remove_voter(peer);
attrd_remove_peer_protocol_ver(peer->uname);
attrd_do_not_expect_from_peer(peer->uname);
// Ensure remote nodes that come up are in the remote node cache
} else if (!gone && is_remote) {
cache_remote_node(peer->uname);
}
}
static void
record_peer_nodeid(attribute_value_t *v, const char *host)
{
crm_node_t *known_peer = crm_get_peer(v->nodeid, host);
crm_trace("Learned %s has node id %s", known_peer->uname, known_peer->uuid);
if (attrd_election_won()) {
attrd_write_attributes(false, false);
}
}
static void
update_attr_on_host(attribute_t *a, const crm_node_t *peer, const xmlNode *xml,
const char *attr, const char *value, const char *host,
bool filter, int is_force_write)
{
attribute_value_t *v = NULL;
v = attrd_lookup_or_create_value(a->values, host, xml);
if (filter && !pcmk__str_eq(v->current, value, pcmk__str_casei)
&& pcmk__str_eq(host, attrd_cluster->uname, pcmk__str_casei)) {
crm_notice("%s[%s]: local value '%s' takes priority over '%s' from %s",
attr, host, v->current, value, peer->uname);
v = broadcast_local_value(a);
} else if (!pcmk__str_eq(v->current, value, pcmk__str_casei)) {
crm_notice("Setting %s[%s]%s%s: %s -> %s "
CRM_XS " from %s with %s write delay",
attr, host, a->set_type ? " in " : "",
pcmk__s(a->set_type, ""), pcmk__s(v->current, "(unset)"),
pcmk__s(value, "(unset)"), peer->uname,
(a->timeout_ms == 0)? "no" : pcmk__readable_interval(a->timeout_ms));
pcmk__str_update(&v->current, value);
a->changed = true;
if (pcmk__str_eq(host, attrd_cluster->uname, pcmk__str_casei)
&& pcmk__str_eq(attr, XML_CIB_ATTR_SHUTDOWN, pcmk__str_none)) {
if (!pcmk__str_eq(value, "0", pcmk__str_null_matches)) {
attrd_set_requesting_shutdown();
} else {
attrd_clear_requesting_shutdown();
}
}
// Write out new value or start dampening timer
if (a->timeout_ms && a->timer) {
crm_trace("Delayed write out (%dms) for %s", a->timeout_ms, attr);
mainloop_timer_start(a->timer);
} else {
attrd_write_or_elect_attribute(a);
}
} else {
if (is_force_write == 1 && a->timeout_ms && a->timer) {
/* Save forced writing and set change flag. */
/* The actual attribute is written by Writer after election. */
crm_trace("Unchanged %s[%s] from %s is %s(Set the forced write flag)",
attr, host, peer->uname, value);
a->force_write = TRUE;
} else {
crm_trace("Unchanged %s[%s] from %s is %s", attr, host, peer->uname, value);
}
}
/* Set the seen flag for attribute processing held only in the own node. */
v->seen = TRUE;
/* If this is a cluster node whose node ID we are learning, remember it */
if ((v->nodeid == 0) && (v->is_remote == FALSE)
&& (crm_element_value_int(xml, PCMK__XA_ATTR_NODE_ID,
(int*)&v->nodeid) == 0) && (v->nodeid > 0)) {
record_peer_nodeid(v, host);
}
}
static void
attrd_peer_update_one(const crm_node_t *peer, xmlNode *xml, bool filter)
{
attribute_t *a = NULL;
const char *attr = crm_element_value(xml, PCMK__XA_ATTR_NAME);
const char *value = crm_element_value(xml, PCMK__XA_ATTR_VALUE);
const char *host = crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME);
int is_force_write = 0;
if (attr == NULL) {
crm_warn("Could not update attribute: peer did not specify name");
return;
}
crm_element_value_int(xml, PCMK__XA_ATTR_FORCE, &is_force_write);
a = attrd_populate_attribute(xml, attr);
if (a == NULL) {
return;
}
if (host == NULL) {
// If no host was specified, update all hosts
GHashTableIter vIter;
crm_debug("Setting %s for all hosts to %s", attr, value);
xml_remove_prop(xml, PCMK__XA_ATTR_NODE_ID);
g_hash_table_iter_init(&vIter, a->values);
while (g_hash_table_iter_next(&vIter, (gpointer *) & host, NULL)) {
update_attr_on_host(a, peer, xml, attr, value, host, filter, is_force_write);
}
} else {
// Update attribute value for the given host
update_attr_on_host(a, peer, xml, attr, value, host, filter, is_force_write);
}
/* If this is a message from some attrd instance broadcasting its protocol
* version, check to see if it's a new minimum version.
*/
if (pcmk__str_eq(attr, CRM_ATTR_PROTOCOL, pcmk__str_none)) {
attrd_update_minimum_protocol_ver(peer->uname, value);
}
}
static void
broadcast_unseen_local_values(void)
{
GHashTableIter aIter;
GHashTableIter vIter;
attribute_t *a = NULL;
attribute_value_t *v = NULL;
xmlNode *sync = NULL;
g_hash_table_iter_init(&aIter, attributes);
while (g_hash_table_iter_next(&aIter, NULL, (gpointer *) & a)) {
g_hash_table_iter_init(&vIter, a->values);
while (g_hash_table_iter_next(&vIter, NULL, (gpointer *) & v)) {
if (!(v->seen) && pcmk__str_eq(v->nodename, attrd_cluster->uname,
pcmk__str_casei)) {
if (sync == NULL) {
sync = create_xml_node(NULL, __func__);
crm_xml_add(sync, PCMK__XA_TASK, PCMK__ATTRD_CMD_SYNC_RESPONSE);
}
attrd_add_value_xml(sync, a, v, a->timeout_ms && a->timer);
}
}
}
if (sync != NULL) {
crm_debug("Broadcasting local-only values");
attrd_send_message(NULL, sync, false);
free_xml(sync);
}
}
int
attrd_cluster_connect(void)
{
attrd_cluster = pcmk_cluster_new();
attrd_cluster->destroy = attrd_cpg_destroy;
attrd_cluster->cpg.cpg_deliver_fn = attrd_cpg_dispatch;
attrd_cluster->cpg.cpg_confchg_fn = pcmk_cpg_membership;
crm_set_status_callback(&attrd_peer_change_cb);
if (crm_cluster_connect(attrd_cluster) == FALSE) {
crm_err("Cluster connection failed");
return -ENOTCONN;
}
return pcmk_ok;
}
void
attrd_peer_clear_failure(pcmk__request_t *request)
{
xmlNode *xml = request->xml;
const char *rsc = crm_element_value(xml, PCMK__XA_ATTR_RESOURCE);
const char *host = crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME);
const char *op = crm_element_value(xml, PCMK__XA_ATTR_OPERATION);
const char *interval_spec = crm_element_value(xml, PCMK__XA_ATTR_INTERVAL);
guint interval_ms = crm_parse_interval_spec(interval_spec);
char *attr = NULL;
GHashTableIter iter;
regex_t regex;
crm_node_t *peer = crm_get_peer(0, request->peer);
if (attrd_failure_regex(®ex, rsc, op, interval_ms) != pcmk_ok) {
crm_info("Ignoring invalid request to clear failures for %s",
pcmk__s(rsc, "all resources"));
return;
}
crm_xml_add(xml, PCMK__XA_TASK, PCMK__ATTRD_CMD_UPDATE);
/* Make sure value is not set, so we delete */
if (crm_element_value(xml, PCMK__XA_ATTR_VALUE)) {
crm_xml_replace(xml, PCMK__XA_ATTR_VALUE, NULL);
}
g_hash_table_iter_init(&iter, attributes);
while (g_hash_table_iter_next(&iter, (gpointer *) &attr, NULL)) {
if (regexec(®ex, attr, 0, NULL, 0) == 0) {
crm_trace("Matched %s when clearing %s",
attr, pcmk__s(rsc, "all resources"));
crm_xml_add(xml, PCMK__XA_ATTR_NAME, attr);
attrd_peer_update(peer, xml, host, false);
}
}
regfree(®ex);
}
/*!
* \internal
* \brief Load attributes from a peer sync response
*
* \param[in] peer Peer that sent clear request
* \param[in] peer_won Whether peer is the attribute writer
* \param[in,out] xml Request XML
*/
void
attrd_peer_sync_response(const crm_node_t *peer, bool peer_won, xmlNode *xml)
{
crm_info("Processing " PCMK__ATTRD_CMD_SYNC_RESPONSE " from %s",
peer->uname);
if (peer_won) {
/* Initialize the "seen" flag for all attributes to cleared, so we can
* detect attributes that local node has but the writer doesn't.
*/
attrd_clear_value_seen();
}
// Process each attribute update in the sync response
for (xmlNode *child = pcmk__xml_first_child(xml); child != NULL;
child = pcmk__xml_next(child)) {
attrd_peer_update(peer, child,
crm_element_value(child, PCMK__XA_ATTR_NODE_NAME),
true);
}
if (peer_won) {
/* If any attributes are still not marked as seen, the writer doesn't
* know about them, so send all peers an update with them.
*/
broadcast_unseen_local_values();
}
}
/*!
* \internal
* \brief Remove all attributes and optionally peer cache entries for a node
*
* \param[in] host Name of node to purge
* \param[in] uncache If true, remove node from peer caches
* \param[in] source Who requested removal (only used for logging)
*/
void
attrd_peer_remove(const char *host, bool uncache, const char *source)
{
attribute_t *a = NULL;
GHashTableIter aIter;
CRM_CHECK(host != NULL, return);
crm_notice("Removing all %s attributes for peer %s", host, source);
g_hash_table_iter_init(&aIter, attributes);
while (g_hash_table_iter_next(&aIter, NULL, (gpointer *) & a)) {
if(g_hash_table_remove(a->values, host)) {
crm_debug("Removed %s[%s] for peer %s", a->id, host, source);
}
}
if (uncache) {
crm_remote_peer_cache_remove(host);
reap_crm_member(0, host);
}
}
void
attrd_peer_sync(crm_node_t *peer, xmlNode *xml)
{
GHashTableIter aIter;
GHashTableIter vIter;
attribute_t *a = NULL;
attribute_value_t *v = NULL;
xmlNode *sync = create_xml_node(NULL, __func__);
crm_xml_add(sync, PCMK__XA_TASK, PCMK__ATTRD_CMD_SYNC_RESPONSE);
g_hash_table_iter_init(&aIter, attributes);
while (g_hash_table_iter_next(&aIter, NULL, (gpointer *) & a)) {
g_hash_table_iter_init(&vIter, a->values);
while (g_hash_table_iter_next(&vIter, NULL, (gpointer *) & v)) {
crm_debug("Syncing %s[%s] = %s to %s", a->id, v->nodename, v->current, peer?peer->uname:"everyone");
attrd_add_value_xml(sync, a, v, false);
}
}
crm_debug("Syncing values to %s", peer?peer->uname:"everyone");
attrd_send_message(peer, sync, false);
free_xml(sync);
}
void
attrd_peer_update(const crm_node_t *peer, xmlNode *xml, const char *host,
bool filter)
{
bool handle_sync_point = false;
if (xml_has_children(xml)) {
for (xmlNode *child = first_named_child(xml, XML_ATTR_OP); child != NULL;
child = crm_next_same_xml(child)) {
attrd_copy_xml_attributes(xml, child);
attrd_peer_update_one(peer, child, filter);
if (attrd_request_has_sync_point(child)) {
handle_sync_point = true;
}
}
} else {
attrd_peer_update_one(peer, xml, filter);
if (attrd_request_has_sync_point(xml)) {
handle_sync_point = true;
}
}
/* If the update XML specified that the client wanted to wait for a sync
* point, process that now.
*/
if (handle_sync_point) {
crm_trace("Hit local sync point for attribute update");
attrd_ack_waitlist_clients(attrd_sync_point_local, xml);
}
}
diff --git a/daemons/attrd/attrd_ipc.c b/daemons/attrd/attrd_ipc.c
index 9d3dfff23a..8b7040b57f 100644
--- a/daemons/attrd/attrd_ipc.c
+++ b/daemons/attrd/attrd_ipc.c
@@ -1,628 +1,629 @@
/*
* Copyright 2004-2023 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU General Public License version 2
* or later (GPLv2+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <errno.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/types.h>
#include <crm/cluster.h>
#include <crm/cluster/internal.h>
#include <crm/msg_xml.h>
#include <crm/common/acl_internal.h>
#include <crm/common/ipc_internal.h>
#include <crm/common/logging.h>
#include <crm/common/results.h>
#include <crm/common/strings_internal.h>
#include <crm/common/util.h>
#include "pacemaker-attrd.h"
static qb_ipcs_service_t *ipcs = NULL;
/*!
* \internal
* \brief Build the XML reply to a client query
*
* param[in] attr Name of requested attribute
* param[in] host Name of requested host (or NULL for all hosts)
*
* \return New XML reply
* \note Caller is responsible for freeing the resulting XML
*/
static xmlNode *build_query_reply(const char *attr, const char *host)
{
xmlNode *reply = create_xml_node(NULL, __func__);
attribute_t *a;
if (reply == NULL) {
return NULL;
}
crm_xml_add(reply, F_TYPE, T_ATTRD);
crm_xml_add(reply, F_SUBTYPE, PCMK__ATTRD_CMD_QUERY);
crm_xml_add(reply, PCMK__XA_ATTR_VERSION, ATTRD_PROTOCOL_VERSION);
/* If desired attribute exists, add its value(s) to the reply */
a = g_hash_table_lookup(attributes, attr);
if (a) {
attribute_value_t *v;
xmlNode *host_value;
crm_xml_add(reply, PCMK__XA_ATTR_NAME, attr);
/* Allow caller to use "localhost" to refer to local node */
if (pcmk__str_eq(host, "localhost", pcmk__str_casei)) {
host = attrd_cluster->uname;
crm_trace("Mapped localhost to %s", host);
}
/* If a specific node was requested, add its value */
if (host) {
v = g_hash_table_lookup(a->values, host);
host_value = create_xml_node(reply, XML_CIB_TAG_NODE);
if (host_value == NULL) {
free_xml(reply);
return NULL;
}
pcmk__xe_add_node(host_value, host, 0);
crm_xml_add(host_value, PCMK__XA_ATTR_VALUE,
(v? v->current : NULL));
/* Otherwise, add all nodes' values */
} else {
GHashTableIter iter;
g_hash_table_iter_init(&iter, a->values);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &v)) {
host_value = create_xml_node(reply, XML_CIB_TAG_NODE);
if (host_value == NULL) {
free_xml(reply);
return NULL;
}
pcmk__xe_add_node(host_value, v->nodename, 0);
crm_xml_add(host_value, PCMK__XA_ATTR_VALUE, v->current);
}
}
}
return reply;
}
xmlNode *
attrd_client_clear_failure(pcmk__request_t *request)
{
xmlNode *xml = request->xml;
const char *rsc, *op, *interval_spec;
if (minimum_protocol_version >= 2) {
/* Propagate to all peers (including ourselves).
* This ends up at attrd_peer_message().
*/
attrd_send_message(NULL, xml, false);
pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
return NULL;
}
rsc = crm_element_value(xml, PCMK__XA_ATTR_RESOURCE);
op = crm_element_value(xml, PCMK__XA_ATTR_OPERATION);
interval_spec = crm_element_value(xml, PCMK__XA_ATTR_INTERVAL);
/* Map this to an update */
crm_xml_add(xml, PCMK__XA_TASK, PCMK__ATTRD_CMD_UPDATE);
/* Add regular expression matching desired attributes */
if (rsc) {
char *pattern;
if (op == NULL) {
pattern = crm_strdup_printf(ATTRD_RE_CLEAR_ONE, rsc);
} else {
guint interval_ms = crm_parse_interval_spec(interval_spec);
pattern = crm_strdup_printf(ATTRD_RE_CLEAR_OP,
rsc, op, interval_ms);
}
crm_xml_add(xml, PCMK__XA_ATTR_PATTERN, pattern);
free(pattern);
} else {
crm_xml_add(xml, PCMK__XA_ATTR_PATTERN, ATTRD_RE_CLEAR_ALL);
}
/* Make sure attribute and value are not set, so we delete via regex */
if (crm_element_value(xml, PCMK__XA_ATTR_NAME)) {
crm_xml_replace(xml, PCMK__XA_ATTR_NAME, NULL);
}
if (crm_element_value(xml, PCMK__XA_ATTR_VALUE)) {
crm_xml_replace(xml, PCMK__XA_ATTR_VALUE, NULL);
}
return attrd_client_update(request);
}
xmlNode *
attrd_client_peer_remove(pcmk__request_t *request)
{
xmlNode *xml = request->xml;
// Host and ID are not used in combination, rather host has precedence
const char *host = crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME);
char *host_alloc = NULL;
attrd_send_ack(request->ipc_client, request->ipc_id, request->ipc_flags);
if (host == NULL) {
int nodeid = 0;
crm_element_value_int(xml, PCMK__XA_ATTR_NODE_ID, &nodeid);
if (nodeid > 0) {
- crm_node_t *node = pcmk__search_cluster_node_cache(nodeid, NULL);
+ crm_node_t *node = pcmk__search_cluster_node_cache(nodeid, NULL,
+ NULL);
char *host_alloc = NULL;
if (node && node->uname) {
// Use cached name if available
host = node->uname;
} else {
// Otherwise ask cluster layer
host_alloc = get_node_name(nodeid);
host = host_alloc;
}
pcmk__xe_add_node(xml, host, 0);
}
}
if (host) {
crm_info("Client %s is requesting all values for %s be removed",
pcmk__client_name(request->ipc_client), host);
attrd_send_message(NULL, xml, false); /* ends up at attrd_peer_message() */
free(host_alloc);
} else {
crm_info("Ignoring request by client %s to remove all peer values without specifying peer",
pcmk__client_name(request->ipc_client));
}
pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
return NULL;
}
xmlNode *
attrd_client_query(pcmk__request_t *request)
{
xmlNode *query = request->xml;
xmlNode *reply = NULL;
const char *attr = NULL;
crm_debug("Query arrived from %s", pcmk__client_name(request->ipc_client));
/* Request must specify attribute name to query */
attr = crm_element_value(query, PCMK__XA_ATTR_NAME);
if (attr == NULL) {
pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR,
"Ignoring malformed query from %s (no attribute name given)",
pcmk__client_name(request->ipc_client));
return NULL;
}
/* Build the XML reply */
reply = build_query_reply(attr, crm_element_value(query,
PCMK__XA_ATTR_NODE_NAME));
if (reply == NULL) {
pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR,
"Could not respond to query from %s: could not create XML reply",
pcmk__client_name(request->ipc_client));
return NULL;
} else {
pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
}
request->ipc_client->request_id = 0;
return reply;
}
xmlNode *
attrd_client_refresh(pcmk__request_t *request)
{
crm_info("Updating all attributes");
attrd_send_ack(request->ipc_client, request->ipc_id, request->ipc_flags);
attrd_write_attributes(true, true);
pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
return NULL;
}
static void
handle_missing_host(xmlNode *xml)
{
const char *host = crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME);
if (host == NULL) {
crm_trace("Inferring host");
pcmk__xe_add_node(xml, attrd_cluster->uname, attrd_cluster->nodeid);
}
}
/* Convert a single IPC message with a regex into one with multiple children, one
* for each regex match.
*/
static int
expand_regexes(xmlNode *xml, const char *attr, const char *value, const char *regex)
{
if (attr == NULL && regex) {
bool matched = false;
GHashTableIter aIter;
regex_t r_patt;
crm_debug("Setting %s to %s", regex, value);
if (regcomp(&r_patt, regex, REG_EXTENDED|REG_NOSUB)) {
return EINVAL;
}
g_hash_table_iter_init(&aIter, attributes);
while (g_hash_table_iter_next(&aIter, (gpointer *) & attr, NULL)) {
int status = regexec(&r_patt, attr, 0, NULL, 0);
if (status == 0) {
xmlNode *child = create_xml_node(xml, XML_ATTR_OP);
crm_trace("Matched %s with %s", attr, regex);
matched = true;
/* Copy all the attributes from the parent over, but remove the
* regex and replace it with the name.
*/
attrd_copy_xml_attributes(xml, child);
crm_xml_replace(child, PCMK__XA_ATTR_PATTERN, NULL);
crm_xml_add(child, PCMK__XA_ATTR_NAME, attr);
}
}
regfree(&r_patt);
/* Return a code if we never matched anything. This should not be treated
* as an error. It indicates there was a regex, and it was a valid regex,
* but simply did not match anything and the caller should not continue
* doing any regex-related processing.
*/
if (!matched) {
return pcmk_rc_op_unsatisfied;
}
} else if (attr == NULL) {
return pcmk_rc_bad_nvpair;
}
return pcmk_rc_ok;
}
static int
handle_regexes(pcmk__request_t *request)
{
xmlNode *xml = request->xml;
int rc = pcmk_rc_ok;
const char *attr = crm_element_value(xml, PCMK__XA_ATTR_NAME);
const char *value = crm_element_value(xml, PCMK__XA_ATTR_VALUE);
const char *regex = crm_element_value(xml, PCMK__XA_ATTR_PATTERN);
rc = expand_regexes(xml, attr, value, regex);
if (rc == EINVAL) {
pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR,
"Bad regex '%s' for update from client %s", regex,
pcmk__client_name(request->ipc_client));
} else if (rc == pcmk_rc_bad_nvpair) {
crm_err("Update request did not specify attribute or regular expression");
pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR,
"Client %s update request did not specify attribute or regular expression",
pcmk__client_name(request->ipc_client));
}
return rc;
}
static int
handle_value_expansion(const char **value, xmlNode *xml, const char *op,
const char *attr)
{
attribute_t *a = g_hash_table_lookup(attributes, attr);
if (a == NULL && pcmk__str_eq(op, PCMK__ATTRD_CMD_UPDATE_DELAY, pcmk__str_none)) {
return EINVAL;
}
if (*value && attrd_value_needs_expansion(*value)) {
int int_value;
attribute_value_t *v = NULL;
if (a) {
const char *host = crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME);
v = g_hash_table_lookup(a->values, host);
}
int_value = attrd_expand_value(*value, (v? v->current : NULL));
crm_info("Expanded %s=%s to %d", attr, *value, int_value);
crm_xml_add_int(xml, PCMK__XA_ATTR_VALUE, int_value);
/* Replacing the value frees the previous memory, so re-query it */
*value = crm_element_value(xml, PCMK__XA_ATTR_VALUE);
}
return pcmk_rc_ok;
}
static void
send_update_msg_to_cluster(pcmk__request_t *request, xmlNode *xml)
{
if (pcmk__str_eq(attrd_request_sync_point(xml), PCMK__VALUE_CLUSTER, pcmk__str_none)) {
/* The client is waiting on the cluster-wide sync point. In this case,
* the response ACK is not sent until this attrd broadcasts the update
* and receives its own confirmation back from all peers.
*/
attrd_expect_confirmations(request, attrd_cluster_sync_point_update);
attrd_send_message(NULL, xml, true); /* ends up at attrd_peer_message() */
} else {
/* The client is either waiting on the local sync point or was not
* waiting on any sync point at all. For the local sync point, the
* response ACK is sent in attrd_peer_update. For clients not
* waiting on any sync point, the response ACK is sent in
* handle_update_request immediately before this function was called.
*/
attrd_send_message(NULL, xml, false); /* ends up at attrd_peer_message() */
}
}
static int
send_child_update(xmlNode *child, void *data)
{
pcmk__request_t *request = (pcmk__request_t *) data;
/* Calling pcmk__set_result is handled by one of these calls to
* attrd_client_update, so no need to do it again here.
*/
request->xml = child;
attrd_client_update(request);
return pcmk_rc_ok;
}
xmlNode *
attrd_client_update(pcmk__request_t *request)
{
xmlNode *xml = request->xml;
const char *attr, *value, *regex;
/* If the message has children, that means it is a message from a newer
* client that supports sending multiple operations at a time. There are
* two ways we can handle that.
*/
if (xml_has_children(xml)) {
if (ATTRD_SUPPORTS_MULTI_MESSAGE(minimum_protocol_version)) {
/* First, if all peers support a certain protocol version, we can
* just broadcast the big message and they'll handle it. However,
* we also need to apply all the transformations in this function
* to the children since they don't happen anywhere else.
*/
for (xmlNode *child = first_named_child(xml, XML_ATTR_OP); child != NULL;
child = crm_next_same_xml(child)) {
attr = crm_element_value(child, PCMK__XA_ATTR_NAME);
value = crm_element_value(child, PCMK__XA_ATTR_VALUE);
handle_missing_host(child);
if (handle_value_expansion(&value, child, request->op, attr) == EINVAL) {
pcmk__format_result(&request->result, CRM_EX_NOSUCH, PCMK_EXEC_ERROR,
"Attribute %s does not exist", attr);
return NULL;
}
}
send_update_msg_to_cluster(request, xml);
pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
} else {
/* Save the original xml node pointer so it can be restored after iterating
* over all the children.
*/
xmlNode *orig_xml = request->xml;
/* Second, if they do not support that protocol version, split it
* up into individual messages and call attrd_client_update on
* each one.
*/
pcmk__xe_foreach_child(xml, XML_ATTR_OP, send_child_update, request);
request->xml = orig_xml;
}
return NULL;
}
attr = crm_element_value(xml, PCMK__XA_ATTR_NAME);
value = crm_element_value(xml, PCMK__XA_ATTR_VALUE);
regex = crm_element_value(xml, PCMK__XA_ATTR_PATTERN);
if (handle_regexes(request) != pcmk_rc_ok) {
/* Error handling was already dealt with in handle_regexes, so just return. */
return NULL;
} else if (regex) {
/* Recursively call attrd_client_update on the new message with regexes
* expanded. If supported by the attribute daemon, this means that all
* matches can also be handled atomically.
*/
return attrd_client_update(request);
}
handle_missing_host(xml);
if (handle_value_expansion(&value, xml, request->op, attr) == EINVAL) {
pcmk__format_result(&request->result, CRM_EX_NOSUCH, PCMK_EXEC_ERROR,
"Attribute %s does not exist", attr);
return NULL;
}
crm_debug("Broadcasting %s[%s]=%s%s", attr, crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME),
value, (attrd_election_won()? " (writer)" : ""));
send_update_msg_to_cluster(request, xml);
pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
return NULL;
}
/*!
* \internal
* \brief Accept a new client IPC connection
*
* \param[in,out] c New connection
* \param[in] uid Client user id
* \param[in] gid Client group id
*
* \return pcmk_ok on success, -errno otherwise
*/
static int32_t
attrd_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid)
{
crm_trace("New client connection %p", c);
if (attrd_shutting_down()) {
crm_info("Ignoring new connection from pid %d during shutdown",
pcmk__client_pid(c));
return -EPERM;
}
if (pcmk__new_client(c, uid, gid) == NULL) {
return -EIO;
}
return pcmk_ok;
}
/*!
* \internal
* \brief Destroy a client IPC connection
*
* \param[in] c Connection to destroy
*
* \return FALSE (i.e. do not re-run this callback)
*/
static int32_t
attrd_ipc_closed(qb_ipcs_connection_t *c)
{
pcmk__client_t *client = pcmk__find_client(c);
if (client == NULL) {
crm_trace("Ignoring request to clean up unknown connection %p", c);
} else {
crm_trace("Cleaning up closed client connection %p", c);
/* Remove the client from the sync point waitlist if it's present. */
attrd_remove_client_from_waitlist(client);
/* And no longer wait for confirmations from any peers. */
attrd_do_not_wait_for_client(client);
pcmk__free_client(client);
}
return FALSE;
}
/*!
* \internal
* \brief Destroy a client IPC connection
*
* \param[in,out] c Connection to destroy
*
* \note We handle a destroyed connection the same as a closed one,
* but we need a separate handler because the return type is different.
*/
static void
attrd_ipc_destroy(qb_ipcs_connection_t *c)
{
crm_trace("Destroying client connection %p", c);
attrd_ipc_closed(c);
}
static int32_t
attrd_ipc_dispatch(qb_ipcs_connection_t * c, void *data, size_t size)
{
uint32_t id = 0;
uint32_t flags = 0;
pcmk__client_t *client = pcmk__find_client(c);
xmlNode *xml = NULL;
// Sanity-check, and parse XML from IPC data
CRM_CHECK((c != NULL) && (client != NULL), return 0);
if (data == NULL) {
crm_debug("No IPC data from PID %d", pcmk__client_pid(c));
return 0;
}
xml = pcmk__client_data2xml(client, data, &id, &flags);
if (xml == NULL) {
crm_debug("Unrecognizable IPC data from PID %d", pcmk__client_pid(c));
pcmk__ipc_send_ack(client, id, flags, "ack", NULL, CRM_EX_PROTOCOL);
return 0;
} else {
pcmk__request_t request = {
.ipc_client = client,
.ipc_id = id,
.ipc_flags = flags,
.peer = NULL,
.xml = xml,
.call_options = 0,
.result = PCMK__UNKNOWN_RESULT,
};
CRM_ASSERT(client->user != NULL);
pcmk__update_acl_user(xml, PCMK__XA_ATTR_USER, client->user);
request.op = crm_element_value_copy(request.xml, PCMK__XA_TASK);
CRM_CHECK(request.op != NULL, return 0);
attrd_handle_request(&request);
pcmk__reset_request(&request);
}
free_xml(xml);
return 0;
}
static struct qb_ipcs_service_handlers ipc_callbacks = {
.connection_accept = attrd_ipc_accept,
.connection_created = NULL,
.msg_process = attrd_ipc_dispatch,
.connection_closed = attrd_ipc_closed,
.connection_destroyed = attrd_ipc_destroy
};
void
attrd_ipc_fini(void)
{
if (ipcs != NULL) {
pcmk__drop_all_clients(ipcs);
qb_ipcs_destroy(ipcs);
ipcs = NULL;
}
}
/*!
* \internal
* \brief Set up attrd IPC communication
*/
void
attrd_init_ipc(void)
{
pcmk__serve_attrd_ipc(&ipcs, &ipc_callbacks);
}
diff --git a/daemons/based/based_messages.c b/daemons/based/based_messages.c
index a1258eda05..5e221039b9 100644
--- a/daemons/based/based_messages.c
+++ b/daemons/based/based_messages.c
@@ -1,418 +1,418 @@
/*
* Copyright 2004-2023 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU General Public License version 2
* or later (GPLv2+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <fcntl.h>
#include <time.h>
#include <sys/param.h>
#include <sys/types.h>
#include <crm/crm.h>
#include <crm/cib/internal.h>
#include <crm/msg_xml.h>
#include <crm/common/xml.h>
#include <crm/common/ipc_internal.h>
#include <crm/common/xml_internal.h>
#include <crm/cluster/internal.h>
#include <pacemaker-based.h>
/* Maximum number of diffs to ignore while waiting for a resync */
#define MAX_DIFF_RETRY 5
bool based_is_primary = false;
xmlNode *the_cib = NULL;
int
cib_process_shutdown_req(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
const char *host = crm_element_value(req, F_ORIG);
*answer = NULL;
if (crm_element_value(req, F_CIB_ISREPLY) == NULL) {
crm_info("Peer %s is requesting to shut down", host);
return pcmk_ok;
}
if (cib_shutdown_flag == FALSE) {
crm_err("Peer %s mistakenly thinks we wanted to shut down", host);
return -EINVAL;
}
crm_info("Peer %s has acknowledged our shutdown request", host);
terminate_cib(__func__, 0);
return pcmk_ok;
}
// @COMPAT: Remove when PCMK__CIB_REQUEST_NOOP is removed
int
cib_process_noop(const char *op, int options, const char *section, xmlNode *req,
xmlNode *input, xmlNode *existing_cib, xmlNode **result_cib,
xmlNode **answer)
{
crm_trace("Processing \"%s\" event", op);
*answer = NULL;
return pcmk_ok;
}
int
cib_process_readwrite(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
int result = pcmk_ok;
crm_trace("Processing \"%s\" event", op);
if (pcmk__str_eq(op, PCMK__CIB_REQUEST_IS_PRIMARY, pcmk__str_none)) {
if (based_is_primary) {
result = pcmk_ok;
} else {
result = -EPERM;
}
return result;
}
if (pcmk__str_eq(op, PCMK__CIB_REQUEST_PRIMARY, pcmk__str_none)) {
if (!based_is_primary) {
crm_info("We are now in R/W mode");
based_is_primary = true;
} else {
crm_debug("We are still in R/W mode");
}
} else if (based_is_primary) {
crm_info("We are now in R/O mode");
based_is_primary = false;
}
return result;
}
/* Set to 1 when a sync is requested, incremented when a diff is ignored,
* reset to 0 when a sync is received
*/
static int sync_in_progress = 0;
void
send_sync_request(const char *host)
{
xmlNode *sync_me = create_xml_node(NULL, "sync-me");
crm_info("Requesting re-sync from %s", (host? host : "all peers"));
sync_in_progress = 1;
crm_xml_add(sync_me, F_TYPE, "cib");
crm_xml_add(sync_me, F_CIB_OPERATION, PCMK__CIB_REQUEST_SYNC_TO_ONE);
crm_xml_add(sync_me, F_CIB_DELEGATED,
stand_alone? "localhost" : crm_cluster->uname);
send_cluster_message(host ? crm_get_peer(0, host) : NULL, crm_msg_cib, sync_me, FALSE);
free_xml(sync_me);
}
int
cib_process_ping(const char *op, int options, const char *section, xmlNode * req, xmlNode * input,
xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer)
{
const char *host = crm_element_value(req, F_ORIG);
const char *seq = crm_element_value(req, F_CIB_PING_ID);
char *digest = calculate_xml_versioned_digest(the_cib, FALSE, TRUE, CRM_FEATURE_SET);
crm_trace("Processing \"%s\" event %s from %s", op, seq, host);
*answer = create_xml_node(NULL, XML_CRM_TAG_PING);
crm_xml_add(*answer, XML_ATTR_CRM_VERSION, CRM_FEATURE_SET);
crm_xml_add(*answer, XML_ATTR_DIGEST, digest);
crm_xml_add(*answer, F_CIB_PING_ID, seq);
pcmk__if_tracing(
{
// Append additional detail so the receiver can log the differences
add_message_xml(*answer, F_CIB_CALLDATA, the_cib);
},
{
// Always include at least the version details
const char *tag = TYPE(the_cib);
xmlNode *shallow = create_xml_node(NULL, tag);
copy_in_properties(shallow, the_cib);
add_message_xml(*answer, F_CIB_CALLDATA, shallow);
free_xml(shallow);
}
);
crm_info("Reporting our current digest to %s: %s for %s.%s.%s",
host, digest,
crm_element_value(existing_cib, XML_ATTR_GENERATION_ADMIN),
crm_element_value(existing_cib, XML_ATTR_GENERATION),
crm_element_value(existing_cib, XML_ATTR_NUMUPDATES));
free(digest);
return pcmk_ok;
}
int
cib_process_sync(const char *op, int options, const char *section, xmlNode * req, xmlNode * input,
xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer)
{
return sync_our_cib(req, TRUE);
}
int
cib_process_upgrade_server(const char *op, int options, const char *section, xmlNode * req, xmlNode * input,
xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer)
{
int rc = pcmk_ok;
*answer = NULL;
if(crm_element_value(req, F_CIB_SCHEMA_MAX)) {
/* The originator of an upgrade request sends it to the DC, without
* F_CIB_SCHEMA_MAX. If an upgrade is needed, the DC re-broadcasts the
* request with F_CIB_SCHEMA_MAX, and each node performs the upgrade
* (and notifies its local clients) here.
*/
return cib_process_upgrade(
op, options, section, req, input, existing_cib, result_cib, answer);
} else {
int new_version = 0;
int current_version = 0;
xmlNode *scratch = copy_xml(existing_cib);
const char *host = crm_element_value(req, F_ORIG);
const char *value = crm_element_value(existing_cib, XML_ATTR_VALIDATION);
const char *client_id = crm_element_value(req, F_CIB_CLIENTID);
const char *call_opts = crm_element_value(req, F_CIB_CALLOPTS);
const char *call_id = crm_element_value(req, F_CIB_CALLID);
crm_trace("Processing \"%s\" event", op);
if (value != NULL) {
current_version = get_schema_version(value);
}
rc = update_validation(&scratch, &new_version, 0, TRUE, TRUE);
if (new_version > current_version) {
xmlNode *up = create_xml_node(NULL, __func__);
rc = pcmk_ok;
crm_notice("Upgrade request from %s verified", host);
crm_xml_add(up, F_TYPE, "cib");
crm_xml_add(up, F_CIB_OPERATION, PCMK__CIB_REQUEST_UPGRADE);
crm_xml_add(up, F_CIB_SCHEMA_MAX, get_schema_name(new_version));
crm_xml_add(up, F_CIB_DELEGATED, host);
crm_xml_add(up, F_CIB_CLIENTID, client_id);
crm_xml_add(up, F_CIB_CALLOPTS, call_opts);
crm_xml_add(up, F_CIB_CALLID, call_id);
if (cib_legacy_mode() && based_is_primary) {
rc = cib_process_upgrade(
op, options, section, up, input, existing_cib, result_cib, answer);
} else {
send_cluster_message(NULL, crm_msg_cib, up, FALSE);
}
free_xml(up);
} else if(rc == pcmk_ok) {
rc = -pcmk_err_schema_unchanged;
}
if (rc != pcmk_ok) {
// Notify originating peer so it can notify its local clients
- crm_node_t *origin = pcmk__search_cluster_node_cache(0, host);
+ crm_node_t *origin = pcmk__search_cluster_node_cache(0, host, NULL);
crm_info("Rejecting upgrade request from %s: %s "
CRM_XS " rc=%d peer=%s", host, pcmk_strerror(rc), rc,
(origin? origin->uname : "lost"));
if (origin) {
xmlNode *up = create_xml_node(NULL, __func__);
crm_xml_add(up, F_TYPE, "cib");
crm_xml_add(up, F_CIB_OPERATION, PCMK__CIB_REQUEST_UPGRADE);
crm_xml_add(up, F_CIB_DELEGATED, host);
crm_xml_add(up, F_CIB_ISREPLY, host);
crm_xml_add(up, F_CIB_CLIENTID, client_id);
crm_xml_add(up, F_CIB_CALLOPTS, call_opts);
crm_xml_add(up, F_CIB_CALLID, call_id);
crm_xml_add_int(up, F_CIB_UPGRADE_RC, rc);
if (send_cluster_message(origin, crm_msg_cib, up, TRUE)
== FALSE) {
crm_warn("Could not send CIB upgrade result to %s", host);
}
free_xml(up);
}
}
free_xml(scratch);
}
return rc;
}
int
cib_process_sync_one(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
return sync_our_cib(req, FALSE);
}
int
cib_server_process_diff(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
int rc = pcmk_ok;
if (sync_in_progress > MAX_DIFF_RETRY) {
/* Don't ignore diffs forever; the last request may have been lost.
* If the diff fails, we'll ask for another full resync.
*/
sync_in_progress = 0;
}
// The primary instance should never ignore a diff
if (sync_in_progress && !based_is_primary) {
int diff_add_updates = 0;
int diff_add_epoch = 0;
int diff_add_admin_epoch = 0;
int diff_del_updates = 0;
int diff_del_epoch = 0;
int diff_del_admin_epoch = 0;
cib_diff_version_details(input,
&diff_add_admin_epoch, &diff_add_epoch, &diff_add_updates,
&diff_del_admin_epoch, &diff_del_epoch, &diff_del_updates);
sync_in_progress++;
crm_notice("Not applying diff %d.%d.%d -> %d.%d.%d (sync in progress)",
diff_del_admin_epoch, diff_del_epoch, diff_del_updates,
diff_add_admin_epoch, diff_add_epoch, diff_add_updates);
return -pcmk_err_diff_resync;
}
rc = cib_process_diff(op, options, section, req, input, existing_cib, result_cib, answer);
crm_trace("result: %s (%d), %s", pcmk_strerror(rc), rc,
(based_is_primary? "primary": "secondary"));
if ((rc == -pcmk_err_diff_resync) && !based_is_primary) {
free_xml(*result_cib);
*result_cib = NULL;
send_sync_request(NULL);
} else if (rc == -pcmk_err_diff_resync) {
rc = -pcmk_err_diff_failed;
if (options & cib_force_diff) {
crm_warn("Not requesting full refresh in R/W mode");
}
} else if ((rc != pcmk_ok) && !based_is_primary && cib_legacy_mode()) {
crm_warn("Requesting full CIB refresh because update failed: %s"
CRM_XS " rc=%d", pcmk_strerror(rc), rc);
pcmk__output_set_log_level(logger_out, LOG_INFO);
logger_out->message(logger_out, "xml-patchset", input);
free_xml(*result_cib);
*result_cib = NULL;
send_sync_request(NULL);
}
return rc;
}
int
cib_process_replace_svr(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
const char *tag = crm_element_name(input);
int rc =
cib_process_replace(op, options, section, req, input, existing_cib, result_cib, answer);
if (rc == pcmk_ok && pcmk__str_eq(tag, XML_TAG_CIB, pcmk__str_casei)) {
sync_in_progress = 0;
}
return rc;
}
// @COMPAT: Remove when PCMK__CIB_REQUEST_ABS_DELETE is removed
int
cib_process_delete_absolute(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
return -EINVAL;
}
int
sync_our_cib(xmlNode * request, gboolean all)
{
int result = pcmk_ok;
char *digest = NULL;
const char *host = crm_element_value(request, F_ORIG);
const char *op = crm_element_value(request, F_CIB_OPERATION);
xmlNode *replace_request = NULL;
CRM_CHECK(the_cib != NULL, return -EINVAL);
replace_request = cib_msg_copy(request);
CRM_CHECK(replace_request != NULL, return -EINVAL);
crm_debug("Syncing CIB to %s", all ? "all peers" : host);
if (all == FALSE && host == NULL) {
crm_log_xml_err(request, "bad sync");
}
/* remove the "all == FALSE" condition
*
* sync_from was failing, the local client wasn't being notified
* because it didn't know it was a reply
* setting this does not prevent the other nodes from applying it
* if all == TRUE
*/
if (host != NULL) {
crm_xml_add(replace_request, F_CIB_ISREPLY, host);
}
if (all) {
xml_remove_prop(replace_request, F_CIB_HOST);
}
crm_xml_add(replace_request, F_CIB_OPERATION, PCMK__CIB_REQUEST_REPLACE);
crm_xml_add(replace_request, "original_" F_CIB_OPERATION, op);
pcmk__xe_set_bool_attr(replace_request, F_CIB_GLOBAL_UPDATE, true);
crm_xml_add(replace_request, XML_ATTR_CRM_VERSION, CRM_FEATURE_SET);
digest = calculate_xml_versioned_digest(the_cib, FALSE, TRUE, CRM_FEATURE_SET);
crm_xml_add(replace_request, XML_ATTR_DIGEST, digest);
add_message_xml(replace_request, F_CIB_CALLDATA, the_cib);
if (send_cluster_message
(all ? NULL : crm_get_peer(0, host), crm_msg_cib, replace_request, FALSE) == FALSE) {
result = -ENOTCONN;
}
free_xml(replace_request);
free(digest);
return result;
}
diff --git a/daemons/controld/controld_corosync.c b/daemons/controld/controld_corosync.c
index 4378b30b58..e7818a5e51 100644
--- a/daemons/controld/controld_corosync.c
+++ b/daemons/controld/controld_corosync.c
@@ -1,164 +1,165 @@
/*
* Copyright 2004-2022 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU General Public License version 2
* or later (GPLv2+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <sys/param.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <crm/crm.h>
#include <crm/cluster/internal.h>
#include <crm/common/xml.h>
#include <pacemaker-controld.h>
#if SUPPORT_COROSYNC
extern void post_cache_update(int seq);
/* A_HA_CONNECT */
static void
crmd_cs_dispatch(cpg_handle_t handle, const struct cpg_name *groupName,
uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
{
uint32_t kind = 0;
const char *from = NULL;
char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
if(data == NULL) {
return;
}
if (kind == crm_class_cluster) {
crm_node_t *peer = NULL;
xmlNode *xml = string2xml(data);
if (xml == NULL) {
crm_err("Could not parse message content (%d): %.100s", kind, data);
free(data);
return;
}
crm_xml_add(xml, F_ORIG, from);
/* crm_xml_add_int(xml, F_SEQ, wrapper->id); Fake? */
peer = crm_get_peer(0, from);
if (!pcmk_is_set(peer->processes, crm_proc_cpg)) {
/* If we can still talk to our peer process on that node,
* then it must be part of the corosync membership
*/
crm_warn("Receiving messages from a node we think is dead: %s[%d]",
peer->uname, peer->id);
crm_update_peer_proc(__func__, peer, crm_proc_cpg,
ONLINESTATUS);
}
crmd_ha_msg_filter(xml);
free_xml(xml);
} else {
crm_err("Invalid message class (%d): %.100s", kind, data);
}
free(data);
}
static gboolean
crmd_quorum_callback(unsigned long long seq, gboolean quorate)
{
crm_update_quorum(quorate, FALSE);
post_cache_update(seq);
return TRUE;
}
static void
crmd_cs_destroy(gpointer user_data)
{
if (!pcmk_is_set(controld_globals.fsa_input_register, R_HA_DISCONNECTED)) {
crm_crit("Lost connection to cluster layer, shutting down");
crmd_exit(CRM_EX_DISCONNECT);
} else {
crm_info("Corosync connection closed");
}
}
/*!
* \brief Handle a Corosync notification of a CPG configuration change
*
* \param[in] handle CPG connection
* \param[in] cpg_name CPG group name
* \param[in] member_list List of current CPG members
* \param[in] member_list_entries Number of entries in \p member_list
* \param[in] left_list List of CPG members that left
* \param[in] left_list_entries Number of entries in \p left_list
* \param[in] joined_list List of CPG members that joined
* \param[in] joined_list_entries Number of entries in \p joined_list
*/
static void
cpg_membership_callback(cpg_handle_t handle, const struct cpg_name *cpg_name,
const struct cpg_address *member_list,
size_t member_list_entries,
const struct cpg_address *left_list,
size_t left_list_entries,
const struct cpg_address *joined_list,
size_t joined_list_entries)
{
/* When nodes leave CPG, the DC clears their transient node attributes.
*
* However if there is no DC, or the DC is among the nodes that left, each
* remaining node needs to do the clearing, to ensure it gets done.
* Otherwise, the attributes would persist when the nodes rejoin, which
* could have serious consequences for unfencing, agents that use attributes
* for internal logic, etc.
*
* Here, we set a global boolean if the DC is among the nodes that left, for
* use by the peer callback.
*/
if (controld_globals.dc_name != NULL) {
crm_node_t *peer = NULL;
- peer = pcmk__search_cluster_node_cache(0, controld_globals.dc_name);
+ peer = pcmk__search_cluster_node_cache(0, controld_globals.dc_name,
+ NULL);
if (peer != NULL) {
for (int i = 0; i < left_list_entries; ++i) {
if (left_list[i].nodeid == peer->id) {
controld_set_global_flags(controld_dc_left);
break;
}
}
}
}
// Process the change normally, which will call the peer callback as needed
pcmk_cpg_membership(handle, cpg_name, member_list, member_list_entries,
left_list, left_list_entries,
joined_list, joined_list_entries);
controld_clear_global_flags(controld_dc_left);
}
extern gboolean crm_connect_corosync(crm_cluster_t * cluster);
gboolean
crm_connect_corosync(crm_cluster_t * cluster)
{
if (is_corosync_cluster()) {
crm_set_status_callback(&peer_update_callback);
cluster->cpg.cpg_deliver_fn = crmd_cs_dispatch;
cluster->cpg.cpg_confchg_fn = cpg_membership_callback;
cluster->destroy = crmd_cs_destroy;
if (crm_cluster_connect(cluster)) {
pcmk__corosync_quorum_connect(crmd_quorum_callback,
crmd_cs_destroy);
return TRUE;
}
}
return FALSE;
}
#endif
diff --git a/daemons/controld/controld_messages.c b/daemons/controld/controld_messages.c
index 54b27ec33d..553dee67cd 100644
--- a/daemons/controld/controld_messages.c
+++ b/daemons/controld/controld_messages.c
@@ -1,1307 +1,1307 @@
/*
* Copyright 2004-2023 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU General Public License version 2
* or later (GPLv2+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <sys/param.h>
#include <string.h>
#include <time.h>
#include <crm/crm.h>
#include <crm/msg_xml.h>
#include <crm/common/xml.h>
#include <crm/cluster/internal.h>
#include <crm/cib.h>
#include <crm/common/ipc_internal.h>
#include <pacemaker-controld.h>
extern void crm_shutdown(int nsig);
static enum crmd_fsa_input handle_message(xmlNode *msg,
enum crmd_fsa_cause cause);
static void handle_response(xmlNode *stored_msg);
static enum crmd_fsa_input handle_request(xmlNode *stored_msg,
enum crmd_fsa_cause cause);
static enum crmd_fsa_input handle_shutdown_request(xmlNode *stored_msg);
static void send_msg_via_ipc(xmlNode * msg, const char *sys);
/* debug only, can wrap all it likes */
static int last_data_id = 0;
void
register_fsa_error_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input,
fsa_data_t * cur_data, void *new_data, const char *raised_from)
{
/* save the current actions if any */
if (controld_globals.fsa_actions != A_NOTHING) {
register_fsa_input_adv(cur_data ? cur_data->fsa_cause : C_FSA_INTERNAL,
I_NULL, cur_data ? cur_data->data : NULL,
controld_globals.fsa_actions, TRUE, __func__);
}
/* reset the action list */
crm_info("Resetting the current action list");
fsa_dump_actions(controld_globals.fsa_actions, "Drop");
controld_globals.fsa_actions = A_NOTHING;
/* register the error */
register_fsa_input_adv(cause, input, new_data, A_NOTHING, TRUE, raised_from);
}
void
register_fsa_input_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input,
void *data, uint64_t with_actions,
gboolean prepend, const char *raised_from)
{
unsigned old_len = g_list_length(controld_globals.fsa_message_queue);
fsa_data_t *fsa_data = NULL;
if (raised_from == NULL) {
raised_from = "<unknown>";
}
if (input == I_NULL && with_actions == A_NOTHING /* && data == NULL */ ) {
/* no point doing anything */
crm_err("Cannot add entry to queue: no input and no action");
return;
}
if (input == I_WAIT_FOR_EVENT) {
controld_set_global_flags(controld_fsa_is_stalled);
crm_debug("Stalling the FSA pending further input: source=%s cause=%s data=%p queue=%d",
raised_from, fsa_cause2string(cause), data, old_len);
if (old_len > 0) {
fsa_dump_queue(LOG_TRACE);
prepend = FALSE;
}
if (data == NULL) {
controld_set_fsa_action_flags(with_actions);
fsa_dump_actions(with_actions, "Restored");
return;
}
/* Store everything in the new event and reset
* controld_globals.fsa_actions
*/
with_actions |= controld_globals.fsa_actions;
controld_globals.fsa_actions = A_NOTHING;
}
last_data_id++;
crm_trace("%s %s FSA input %d (%s) due to %s, %s data",
raised_from, (prepend? "prepended" : "appended"), last_data_id,
fsa_input2string(input), fsa_cause2string(cause),
(data? "with" : "without"));
fsa_data = calloc(1, sizeof(fsa_data_t));
fsa_data->id = last_data_id;
fsa_data->fsa_input = input;
fsa_data->fsa_cause = cause;
fsa_data->origin = raised_from;
fsa_data->data = NULL;
fsa_data->data_type = fsa_dt_none;
fsa_data->actions = with_actions;
if (with_actions != A_NOTHING) {
crm_trace("Adding actions %.16llx to input",
(unsigned long long) with_actions);
}
if (data != NULL) {
switch (cause) {
case C_FSA_INTERNAL:
case C_CRMD_STATUS_CALLBACK:
case C_IPC_MESSAGE:
case C_HA_MESSAGE:
CRM_CHECK(((ha_msg_input_t *) data)->msg != NULL,
crm_err("Bogus data from %s", raised_from));
crm_trace("Copying %s data from %s as cluster message data",
fsa_cause2string(cause), raised_from);
fsa_data->data = copy_ha_msg_input(data);
fsa_data->data_type = fsa_dt_ha_msg;
break;
case C_LRM_OP_CALLBACK:
crm_trace("Copying %s data from %s as lrmd_event_data_t",
fsa_cause2string(cause), raised_from);
fsa_data->data = lrmd_copy_event((lrmd_event_data_t *) data);
fsa_data->data_type = fsa_dt_lrm;
break;
case C_TIMER_POPPED:
case C_SHUTDOWN:
case C_UNKNOWN:
case C_STARTUP:
crm_crit("Copying %s data (from %s) is not yet implemented",
fsa_cause2string(cause), raised_from);
crmd_exit(CRM_EX_SOFTWARE);
break;
}
}
/* make sure to free it properly later */
if (prepend) {
controld_globals.fsa_message_queue
= g_list_prepend(controld_globals.fsa_message_queue, fsa_data);
} else {
controld_globals.fsa_message_queue
= g_list_append(controld_globals.fsa_message_queue, fsa_data);
}
crm_trace("FSA message queue length is %d",
g_list_length(controld_globals.fsa_message_queue));
/* fsa_dump_queue(LOG_TRACE); */
if (old_len == g_list_length(controld_globals.fsa_message_queue)) {
crm_err("Couldn't add message to the queue");
}
if (input != I_WAIT_FOR_EVENT) {
controld_trigger_fsa();
}
}
void
fsa_dump_queue(int log_level)
{
int offset = 0;
for (GList *iter = controld_globals.fsa_message_queue; iter != NULL;
iter = iter->next) {
fsa_data_t *data = (fsa_data_t *) iter->data;
do_crm_log_unlikely(log_level,
"queue[%d.%d]: input %s raised by %s(%p.%d)\t(cause=%s)",
offset++, data->id, fsa_input2string(data->fsa_input),
data->origin, data->data, data->data_type,
fsa_cause2string(data->fsa_cause));
}
}
ha_msg_input_t *
copy_ha_msg_input(ha_msg_input_t * orig)
{
ha_msg_input_t *copy = calloc(1, sizeof(ha_msg_input_t));
CRM_ASSERT(copy != NULL);
copy->msg = (orig && orig->msg)? copy_xml(orig->msg) : NULL;
copy->xml = get_message_xml(copy->msg, F_CRM_DATA);
return copy;
}
void
delete_fsa_input(fsa_data_t * fsa_data)
{
lrmd_event_data_t *op = NULL;
xmlNode *foo = NULL;
if (fsa_data == NULL) {
return;
}
crm_trace("About to free %s data", fsa_cause2string(fsa_data->fsa_cause));
if (fsa_data->data != NULL) {
switch (fsa_data->data_type) {
case fsa_dt_ha_msg:
delete_ha_msg_input(fsa_data->data);
break;
case fsa_dt_xml:
foo = fsa_data->data;
free_xml(foo);
break;
case fsa_dt_lrm:
op = (lrmd_event_data_t *) fsa_data->data;
lrmd_free_event(op);
break;
case fsa_dt_none:
if (fsa_data->data != NULL) {
crm_err("Don't know how to free %s data from %s",
fsa_cause2string(fsa_data->fsa_cause), fsa_data->origin);
crmd_exit(CRM_EX_SOFTWARE);
}
break;
}
crm_trace("%s data freed", fsa_cause2string(fsa_data->fsa_cause));
}
free(fsa_data);
}
/* returns the next message */
fsa_data_t *
get_message(void)
{
fsa_data_t *message
= (fsa_data_t *) controld_globals.fsa_message_queue->data;
controld_globals.fsa_message_queue
= g_list_remove(controld_globals.fsa_message_queue, message);
crm_trace("Processing input %d", message->id);
return message;
}
void *
fsa_typed_data_adv(fsa_data_t * fsa_data, enum fsa_data_type a_type, const char *caller)
{
void *ret_val = NULL;
if (fsa_data == NULL) {
crm_err("%s: No FSA data available", caller);
} else if (fsa_data->data == NULL) {
crm_err("%s: No message data available. Origin: %s", caller, fsa_data->origin);
} else if (fsa_data->data_type != a_type) {
crm_crit("%s: Message data was the wrong type! %d vs. requested=%d. Origin: %s",
caller, fsa_data->data_type, a_type, fsa_data->origin);
CRM_ASSERT(fsa_data->data_type == a_type);
} else {
ret_val = fsa_data->data;
}
return ret_val;
}
/* A_MSG_ROUTE */
void
do_msg_route(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input, fsa_data_t * msg_data)
{
ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg);
route_message(msg_data->fsa_cause, input->msg);
}
void
route_message(enum crmd_fsa_cause cause, xmlNode * input)
{
ha_msg_input_t fsa_input;
enum crmd_fsa_input result = I_NULL;
fsa_input.msg = input;
CRM_CHECK(cause == C_IPC_MESSAGE || cause == C_HA_MESSAGE, return);
/* try passing the buck first */
if (relay_message(input, cause == C_IPC_MESSAGE)) {
return;
}
/* handle locally */
result = handle_message(input, cause);
/* done or process later? */
switch (result) {
case I_NULL:
case I_CIB_OP:
case I_ROUTER:
case I_NODE_JOIN:
case I_JOIN_REQUEST:
case I_JOIN_RESULT:
break;
default:
/* Defering local processing of message */
register_fsa_input_later(cause, result, &fsa_input);
return;
}
if (result != I_NULL) {
/* add to the front of the queue */
register_fsa_input(cause, result, &fsa_input);
}
}
gboolean
relay_message(xmlNode * msg, gboolean originated_locally)
{
int dest = 1;
bool is_for_dc = false;
bool is_for_dcib = false;
bool is_for_te = false;
bool is_for_crm = false;
bool is_for_cib = false;
bool is_local = false;
const char *host_to = crm_element_value(msg, F_CRM_HOST_TO);
const char *sys_to = crm_element_value(msg, F_CRM_SYS_TO);
const char *sys_from = crm_element_value(msg, F_CRM_SYS_FROM);
const char *type = crm_element_value(msg, F_TYPE);
const char *task = crm_element_value(msg, F_CRM_TASK);
const char *ref = crm_element_value(msg, XML_ATTR_REFERENCE);
if (ref == NULL) {
ref = "without reference ID";
}
if (msg == NULL) {
crm_warn("Cannot route empty message");
return TRUE;
} else if (pcmk__str_eq(task, CRM_OP_HELLO, pcmk__str_casei)) {
crm_trace("No routing needed for hello message %s", ref);
return TRUE;
} else if (!pcmk__str_eq(type, T_CRM, pcmk__str_casei)) {
crm_warn("Received invalid message %s: type '%s' not '" T_CRM "'",
ref, pcmk__s(type, ""));
crm_log_xml_warn(msg, "[bad message type]");
return TRUE;
} else if (sys_to == NULL) {
crm_warn("Received invalid message %s: no subsystem", ref);
crm_log_xml_warn(msg, "[no subsystem]");
return TRUE;
}
is_for_dc = (strcasecmp(CRM_SYSTEM_DC, sys_to) == 0);
is_for_dcib = (strcasecmp(CRM_SYSTEM_DCIB, sys_to) == 0);
is_for_te = (strcasecmp(CRM_SYSTEM_TENGINE, sys_to) == 0);
is_for_cib = (strcasecmp(CRM_SYSTEM_CIB, sys_to) == 0);
is_for_crm = (strcasecmp(CRM_SYSTEM_CRMD, sys_to) == 0);
is_local = false;
if (pcmk__str_empty(host_to)) {
if (is_for_dc || is_for_te) {
is_local = false;
} else if (is_for_crm) {
if (pcmk__strcase_any_of(task, CRM_OP_NODE_INFO,
PCMK__CONTROLD_CMD_NODES, NULL)) {
/* Node info requests do not specify a host, which is normally
* treated as "all hosts", because the whole point is that the
* client may not know the local node name. Always handle these
* requests locally.
*/
is_local = true;
} else {
is_local = !originated_locally;
}
} else {
is_local = true;
}
} else if (pcmk__str_eq(controld_globals.our_nodename, host_to,
pcmk__str_casei)) {
is_local = true;
} else if (is_for_crm && pcmk__str_eq(task, CRM_OP_LRM_DELETE, pcmk__str_casei)) {
xmlNode *msg_data = get_message_xml(msg, F_CRM_DATA);
const char *mode = crm_element_value(msg_data, PCMK__XA_MODE);
if (pcmk__str_eq(mode, XML_TAG_CIB, pcmk__str_casei)) {
// Local delete of an offline node's resource history
is_local = true;
}
}
if (is_for_dc || is_for_dcib || is_for_te) {
if (AM_I_DC && is_for_te) {
crm_trace("Route message %s locally as transition request", ref);
send_msg_via_ipc(msg, sys_to);
} else if (AM_I_DC) {
crm_trace("Route message %s locally as DC request", ref);
return FALSE; // More to be done by caller
} else if (originated_locally && !pcmk__strcase_any_of(sys_from, CRM_SYSTEM_PENGINE,
CRM_SYSTEM_TENGINE, NULL)) {
if (is_corosync_cluster()) {
dest = text2msg_type(sys_to);
}
crm_trace("Relay message %s to DC", ref);
send_cluster_message(host_to ? crm_get_peer(0, host_to) : NULL, dest, msg, TRUE);
} else {
/* Neither the TE nor the scheduler should be sending messages
* to DCs on other nodes. By definition, if we are no longer the DC,
* then the scheduler's or TE's data should be discarded.
*/
crm_trace("Discard message %s because we are not DC", ref);
}
} else if (is_local && (is_for_crm || is_for_cib)) {
crm_trace("Route message %s locally as controller request", ref);
return FALSE; // More to be done by caller
} else if (is_local) {
crm_trace("Relay message %s locally to %s",
ref, (sys_to? sys_to : "unknown client"));
crm_log_xml_trace(msg, "[IPC relay]");
send_msg_via_ipc(msg, sys_to);
} else {
crm_node_t *node_to = NULL;
if (is_corosync_cluster()) {
dest = text2msg_type(sys_to);
if (dest == crm_msg_none || dest > crm_msg_stonith_ng) {
dest = crm_msg_crmd;
}
}
if (host_to) {
- node_to = pcmk__search_cluster_node_cache(0, host_to);
+ node_to = pcmk__search_cluster_node_cache(0, host_to, NULL);
if (node_to == NULL) {
crm_warn("Cannot route message %s: Unknown node %s",
ref, host_to);
return TRUE;
}
crm_trace("Relay message %s to %s",
ref, (node_to->uname? node_to->uname : "peer"));
} else {
crm_trace("Broadcast message %s to all peers", ref);
}
send_cluster_message(host_to ? node_to : NULL, dest, msg, TRUE);
}
return TRUE; // No further processing of message is needed
}
// Return true if field contains a positive integer
static bool
authorize_version(xmlNode *message_data, const char *field,
const char *client_name, const char *ref, const char *uuid)
{
const char *version = crm_element_value(message_data, field);
long long version_num;
if ((pcmk__scan_ll(version, &version_num, -1LL) != pcmk_rc_ok)
|| (version_num < 0LL)) {
crm_warn("Rejected IPC hello from %s: '%s' is not a valid protocol %s "
CRM_XS " ref=%s uuid=%s",
client_name, ((version == NULL)? "" : version),
field, (ref? ref : "none"), uuid);
return false;
}
return true;
}
/*!
* \internal
* \brief Check whether a client IPC message is acceptable
*
* If a given client IPC message is a hello, "authorize" it by ensuring it has
* valid information such as a protocol version, and return false indicating
* that nothing further needs to be done with the message. If the message is not
* a hello, just return true to indicate it needs further processing.
*
* \param[in] client_msg XML of IPC message
* \param[in,out] curr_client If IPC is not proxied, client that sent message
* \param[in] proxy_session If IPC is proxied, the session ID
*
* \return true if message needs further processing, false if it doesn't
*/
bool
controld_authorize_ipc_message(const xmlNode *client_msg, pcmk__client_t *curr_client,
const char *proxy_session)
{
xmlNode *message_data = NULL;
const char *client_name = NULL;
const char *op = crm_element_value(client_msg, F_CRM_TASK);
const char *ref = crm_element_value(client_msg, XML_ATTR_REFERENCE);
const char *uuid = (curr_client? curr_client->id : proxy_session);
if (uuid == NULL) {
crm_warn("IPC message from client rejected: No client identifier "
CRM_XS " ref=%s", (ref? ref : "none"));
goto rejected;
}
if (!pcmk__str_eq(CRM_OP_HELLO, op, pcmk__str_casei)) {
// Only hello messages need to be authorized
return true;
}
message_data = get_message_xml(client_msg, F_CRM_DATA);
client_name = crm_element_value(message_data, "client_name");
if (pcmk__str_empty(client_name)) {
crm_warn("IPC hello from client rejected: No client name",
CRM_XS " ref=%s uuid=%s", (ref? ref : "none"), uuid);
goto rejected;
}
if (!authorize_version(message_data, "major_version", client_name, ref,
uuid)) {
goto rejected;
}
if (!authorize_version(message_data, "minor_version", client_name, ref,
uuid)) {
goto rejected;
}
crm_trace("Validated IPC hello from client %s", client_name);
if (curr_client) {
curr_client->userdata = strdup(client_name);
}
controld_trigger_fsa();
return false;
rejected:
if (curr_client) {
qb_ipcs_disconnect(curr_client->ipcs);
}
return false;
}
static enum crmd_fsa_input
handle_message(xmlNode *msg, enum crmd_fsa_cause cause)
{
const char *type = NULL;
CRM_CHECK(msg != NULL, return I_NULL);
type = crm_element_value(msg, F_CRM_MSG_TYPE);
if (pcmk__str_eq(type, XML_ATTR_REQUEST, pcmk__str_none)) {
return handle_request(msg, cause);
} else if (pcmk__str_eq(type, XML_ATTR_RESPONSE, pcmk__str_none)) {
handle_response(msg);
return I_NULL;
}
crm_err("Unknown message type: %s", type);
return I_NULL;
}
static enum crmd_fsa_input
handle_failcount_op(xmlNode * stored_msg)
{
const char *rsc = NULL;
const char *uname = NULL;
const char *op = NULL;
char *interval_spec = NULL;
guint interval_ms = 0;
gboolean is_remote_node = FALSE;
xmlNode *xml_op = get_message_xml(stored_msg, F_CRM_DATA);
if (xml_op) {
xmlNode *xml_rsc = first_named_child(xml_op, XML_CIB_TAG_RESOURCE);
xmlNode *xml_attrs = first_named_child(xml_op, XML_TAG_ATTRS);
if (xml_rsc) {
rsc = ID(xml_rsc);
}
if (xml_attrs) {
op = crm_element_value(xml_attrs,
CRM_META "_" XML_RSC_ATTR_CLEAR_OP);
crm_element_value_ms(xml_attrs,
CRM_META "_" XML_RSC_ATTR_CLEAR_INTERVAL,
&interval_ms);
}
}
uname = crm_element_value(xml_op, XML_LRM_ATTR_TARGET);
if ((rsc == NULL) || (uname == NULL)) {
crm_log_xml_warn(stored_msg, "invalid failcount op");
return I_NULL;
}
if (crm_element_value(xml_op, XML_LRM_ATTR_ROUTER_NODE)) {
is_remote_node = TRUE;
}
crm_debug("Clearing failures for %s-interval %s on %s "
"from attribute manager, CIB, and executor state",
pcmk__readable_interval(interval_ms), rsc, uname);
if (interval_ms) {
interval_spec = crm_strdup_printf("%ums", interval_ms);
}
update_attrd_clear_failures(uname, rsc, op, interval_spec, is_remote_node);
free(interval_spec);
controld_cib_delete_last_failure(rsc, uname, op, interval_ms);
lrm_clear_last_failure(rsc, uname, op, interval_ms);
return I_NULL;
}
static enum crmd_fsa_input
handle_lrm_delete(xmlNode *stored_msg)
{
const char *mode = NULL;
xmlNode *msg_data = get_message_xml(stored_msg, F_CRM_DATA);
CRM_CHECK(msg_data != NULL, return I_NULL);
/* CRM_OP_LRM_DELETE has two distinct modes. The default behavior is to
* relay the operation to the affected node, which will unregister the
* resource from the local executor, clear the resource's history from the
* CIB, and do some bookkeeping in the controller.
*
* However, if the affected node is offline, the client will specify
* mode="cib" which means the controller receiving the operation should
* clear the resource's history from the CIB and nothing else. This is used
* to clear shutdown locks.
*/
mode = crm_element_value(msg_data, PCMK__XA_MODE);
if ((mode == NULL) || strcmp(mode, XML_TAG_CIB)) {
// Relay to affected node
crm_xml_add(stored_msg, F_CRM_SYS_TO, CRM_SYSTEM_LRMD);
return I_ROUTER;
} else {
// Delete CIB history locally (compare with do_lrm_delete())
const char *from_sys = NULL;
const char *user_name = NULL;
const char *rsc_id = NULL;
const char *node = NULL;
xmlNode *rsc_xml = NULL;
int rc = pcmk_rc_ok;
rsc_xml = first_named_child(msg_data, XML_CIB_TAG_RESOURCE);
CRM_CHECK(rsc_xml != NULL, return I_NULL);
rsc_id = ID(rsc_xml);
from_sys = crm_element_value(stored_msg, F_CRM_SYS_FROM);
node = crm_element_value(msg_data, XML_LRM_ATTR_TARGET);
user_name = pcmk__update_acl_user(stored_msg, F_CRM_USER, NULL);
crm_debug("Handling " CRM_OP_LRM_DELETE " for %s on %s locally%s%s "
"(clearing CIB resource history only)", rsc_id, node,
(user_name? " for user " : ""), (user_name? user_name : ""));
rc = controld_delete_resource_history(rsc_id, node, user_name,
cib_dryrun|cib_sync_call);
if (rc == pcmk_rc_ok) {
rc = controld_delete_resource_history(rsc_id, node, user_name,
crmd_cib_smart_opt());
}
//Notify client and tengine.(Only notify tengine if mode = "cib" and CRM_OP_LRM_DELETE.)
if (from_sys) {
lrmd_event_data_t *op = NULL;
const char *from_host = crm_element_value(stored_msg,
F_CRM_HOST_FROM);
const char *transition;
if (strcmp(from_sys, CRM_SYSTEM_TENGINE)) {
transition = crm_element_value(msg_data,
XML_ATTR_TRANSITION_KEY);
} else {
transition = crm_element_value(stored_msg,
XML_ATTR_TRANSITION_KEY);
}
crm_info("Notifying %s on %s that %s was%s deleted",
from_sys, (from_host? from_host : "local node"), rsc_id,
((rc == pcmk_rc_ok)? "" : " not"));
op = lrmd_new_event(rsc_id, CRMD_ACTION_DELETE, 0);
op->type = lrmd_event_exec_complete;
op->user_data = strdup(transition? transition : FAKE_TE_ID);
op->params = pcmk__strkey_table(free, free);
g_hash_table_insert(op->params, strdup(XML_ATTR_CRM_VERSION),
strdup(CRM_FEATURE_SET));
controld_rc2event(op, rc);
controld_ack_event_directly(from_host, from_sys, NULL, op, rsc_id);
lrmd_free_event(op);
controld_trigger_delete_refresh(from_sys, rsc_id);
}
return I_NULL;
}
}
/*!
* \brief Handle a CRM_OP_REMOTE_STATE message by updating remote peer cache
*
* \param[in] msg Message XML
*
* \return Next FSA input
*/
static enum crmd_fsa_input
handle_remote_state(const xmlNode *msg)
{
const char *conn_host = NULL;
const char *remote_uname = ID(msg);
crm_node_t *remote_peer;
bool remote_is_up = false;
int rc = pcmk_rc_ok;
rc = pcmk__xe_get_bool_attr(msg, XML_NODE_IN_CLUSTER, &remote_is_up);
CRM_CHECK(remote_uname && rc == pcmk_rc_ok, return I_NULL);
remote_peer = crm_remote_peer_get(remote_uname);
CRM_CHECK(remote_peer, return I_NULL);
pcmk__update_peer_state(__func__, remote_peer,
remote_is_up ? CRM_NODE_MEMBER : CRM_NODE_LOST,
0);
conn_host = crm_element_value(msg, PCMK__XA_CONN_HOST);
if (conn_host) {
pcmk__str_update(&remote_peer->conn_host, conn_host);
} else if (remote_peer->conn_host) {
free(remote_peer->conn_host);
remote_peer->conn_host = NULL;
}
return I_NULL;
}
/*!
* \brief Handle a CRM_OP_PING message
*
* \param[in] msg Message XML
*
* \return Next FSA input
*/
static enum crmd_fsa_input
handle_ping(const xmlNode *msg)
{
const char *value = NULL;
xmlNode *ping = NULL;
xmlNode *reply = NULL;
// Build reply
ping = create_xml_node(NULL, XML_CRM_TAG_PING);
value = crm_element_value(msg, F_CRM_SYS_TO);
crm_xml_add(ping, XML_PING_ATTR_SYSFROM, value);
// Add controller state
value = fsa_state2string(controld_globals.fsa_state);
crm_xml_add(ping, XML_PING_ATTR_CRMDSTATE, value);
crm_notice("Current ping state: %s", value); // CTS needs this
// Add controller health
// @TODO maybe do some checks to determine meaningful status
crm_xml_add(ping, XML_PING_ATTR_STATUS, "ok");
// Send reply
reply = create_reply(msg, ping);
free_xml(ping);
if (reply != NULL) {
(void) relay_message(reply, TRUE);
free_xml(reply);
}
// Nothing further to do
return I_NULL;
}
/*!
* \brief Handle a PCMK__CONTROLD_CMD_NODES message
*
* \param[in] request Message XML
*
* \return Next FSA input
*/
static enum crmd_fsa_input
handle_node_list(const xmlNode *request)
{
GHashTableIter iter;
crm_node_t *node = NULL;
xmlNode *reply = NULL;
xmlNode *reply_data = NULL;
// Create message data for reply
reply_data = create_xml_node(NULL, XML_CIB_TAG_NODES);
g_hash_table_iter_init(&iter, crm_peer_cache);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *) & node)) {
xmlNode *xml = create_xml_node(reply_data, XML_CIB_TAG_NODE);
crm_xml_add_ll(xml, XML_ATTR_ID, (long long) node->id); // uint32_t
crm_xml_add(xml, XML_ATTR_UNAME, node->uname);
crm_xml_add(xml, XML_NODE_IN_CLUSTER, node->state);
}
// Create and send reply
reply = create_reply(request, reply_data);
free_xml(reply_data);
if (reply) {
(void) relay_message(reply, TRUE);
free_xml(reply);
}
// Nothing further to do
return I_NULL;
}
/*!
* \brief Handle a CRM_OP_NODE_INFO request
*
* \param[in] msg Message XML
*
* \return Next FSA input
*/
static enum crmd_fsa_input
handle_node_info_request(const xmlNode *msg)
{
const char *value = NULL;
crm_node_t *node = NULL;
int node_id = 0;
xmlNode *reply = NULL;
xmlNode *reply_data = NULL;
// Build reply
reply_data = create_xml_node(NULL, XML_CIB_TAG_NODE);
crm_xml_add(reply_data, XML_PING_ATTR_SYSFROM, CRM_SYSTEM_CRMD);
// Add whether current partition has quorum
pcmk__xe_set_bool_attr(reply_data, XML_ATTR_HAVE_QUORUM,
pcmk_is_set(controld_globals.flags,
controld_has_quorum));
// Check whether client requested node info by ID and/or name
crm_element_value_int(msg, XML_ATTR_ID, &node_id);
if (node_id < 0) {
node_id = 0;
}
value = crm_element_value(msg, XML_ATTR_UNAME);
// Default to local node if none given
if ((node_id == 0) && (value == NULL)) {
value = controld_globals.our_nodename;
}
node = pcmk__search_node_caches(node_id, value, CRM_GET_PEER_ANY);
if (node) {
crm_xml_add(reply_data, XML_ATTR_ID, node->uuid);
crm_xml_add(reply_data, XML_ATTR_UNAME, node->uname);
crm_xml_add(reply_data, XML_NODE_IS_PEER, node->state);
pcmk__xe_set_bool_attr(reply_data, XML_NODE_IS_REMOTE,
pcmk_is_set(node->flags, crm_remote_node));
}
// Send reply
reply = create_reply(msg, reply_data);
free_xml(reply_data);
if (reply != NULL) {
(void) relay_message(reply, TRUE);
free_xml(reply);
}
// Nothing further to do
return I_NULL;
}
static void
verify_feature_set(xmlNode *msg)
{
const char *dc_version = crm_element_value(msg, XML_ATTR_CRM_VERSION);
if (dc_version == NULL) {
/* All we really know is that the DC feature set is older than 3.1.0,
* but that's also all that really matters.
*/
dc_version = "3.0.14";
}
if (feature_set_compatible(dc_version, CRM_FEATURE_SET)) {
crm_trace("Local feature set (%s) is compatible with DC's (%s)",
CRM_FEATURE_SET, dc_version);
} else {
crm_err("Local feature set (%s) is incompatible with DC's (%s)",
CRM_FEATURE_SET, dc_version);
// Nothing is likely to improve without administrator involvement
controld_set_fsa_input_flags(R_STAYDOWN);
crmd_exit(CRM_EX_FATAL);
}
}
// DC gets own shutdown all-clear
static enum crmd_fsa_input
handle_shutdown_self_ack(xmlNode *stored_msg)
{
const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM);
if (pcmk_is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) {
// The expected case -- we initiated own shutdown sequence
crm_info("Shutting down controller");
return I_STOP;
}
if (pcmk__str_eq(host_from, controld_globals.dc_name, pcmk__str_casei)) {
// Must be logic error -- DC confirming its own unrequested shutdown
crm_err("Shutting down controller immediately due to "
"unexpected shutdown confirmation");
return I_TERMINATE;
}
if (controld_globals.fsa_state != S_STOPPING) {
// Shouldn't happen -- non-DC confirming unrequested shutdown
crm_err("Starting new DC election because %s is "
"confirming shutdown we did not request",
(host_from? host_from : "another node"));
return I_ELECTION;
}
// Shouldn't happen, but we are already stopping anyway
crm_debug("Ignoring unexpected shutdown confirmation from %s",
(host_from? host_from : "another node"));
return I_NULL;
}
// Non-DC gets shutdown all-clear from DC
static enum crmd_fsa_input
handle_shutdown_ack(xmlNode *stored_msg)
{
const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM);
if (host_from == NULL) {
crm_warn("Ignoring shutdown request without origin specified");
return I_NULL;
}
if (pcmk__str_eq(host_from, controld_globals.dc_name,
pcmk__str_null_matches|pcmk__str_casei)) {
if (pcmk_is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) {
crm_info("Shutting down controller after confirmation from %s",
host_from);
} else {
crm_err("Shutting down controller after unexpected "
"shutdown request from %s", host_from);
controld_set_fsa_input_flags(R_STAYDOWN);
}
return I_STOP;
}
crm_warn("Ignoring shutdown request from %s because DC is %s",
host_from, controld_globals.dc_name);
return I_NULL;
}
static enum crmd_fsa_input
handle_request(xmlNode *stored_msg, enum crmd_fsa_cause cause)
{
xmlNode *msg = NULL;
const char *op = crm_element_value(stored_msg, F_CRM_TASK);
/* Optimize this for the DC - it has the most to do */
if (op == NULL) {
crm_log_xml_warn(stored_msg, "[request without " F_CRM_TASK "]");
return I_NULL;
}
if (strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0) {
const char *from = crm_element_value(stored_msg, F_CRM_HOST_FROM);
- crm_node_t *node = pcmk__search_cluster_node_cache(0, from);
+ crm_node_t *node = pcmk__search_cluster_node_cache(0, from, NULL);
pcmk__update_peer_expected(__func__, node, CRMD_JOINSTATE_DOWN);
if(AM_I_DC == FALSE) {
return I_NULL; /* Done */
}
}
/*========== DC-Only Actions ==========*/
if (AM_I_DC) {
if (strcmp(op, CRM_OP_JOIN_ANNOUNCE) == 0) {
return I_NODE_JOIN;
} else if (strcmp(op, CRM_OP_JOIN_REQUEST) == 0) {
return I_JOIN_REQUEST;
} else if (strcmp(op, CRM_OP_JOIN_CONFIRM) == 0) {
return I_JOIN_RESULT;
} else if (strcmp(op, CRM_OP_SHUTDOWN) == 0) {
return handle_shutdown_self_ack(stored_msg);
} else if (strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0) {
// Another controller wants to shut down its node
return handle_shutdown_request(stored_msg);
}
}
/*========== common actions ==========*/
if (strcmp(op, CRM_OP_NOVOTE) == 0) {
ha_msg_input_t fsa_input;
fsa_input.msg = stored_msg;
register_fsa_input_adv(C_HA_MESSAGE, I_NULL, &fsa_input,
A_ELECTION_COUNT | A_ELECTION_CHECK, FALSE,
__func__);
} else if (strcmp(op, CRM_OP_REMOTE_STATE) == 0) {
/* a remote connection host is letting us know the node state */
return handle_remote_state(stored_msg);
} else if (strcmp(op, CRM_OP_THROTTLE) == 0) {
throttle_update(stored_msg);
if (AM_I_DC && (controld_globals.transition_graph != NULL)
&& !controld_globals.transition_graph->complete) {
crm_debug("The throttle changed. Trigger a graph.");
trigger_graph();
}
return I_NULL;
} else if (strcmp(op, CRM_OP_CLEAR_FAILCOUNT) == 0) {
return handle_failcount_op(stored_msg);
} else if (strcmp(op, CRM_OP_VOTE) == 0) {
/* count the vote and decide what to do after that */
ha_msg_input_t fsa_input;
fsa_input.msg = stored_msg;
register_fsa_input_adv(C_HA_MESSAGE, I_NULL, &fsa_input,
A_ELECTION_COUNT | A_ELECTION_CHECK, FALSE,
__func__);
/* Sometimes we _must_ go into S_ELECTION */
if (controld_globals.fsa_state == S_HALT) {
crm_debug("Forcing an election from S_HALT");
return I_ELECTION;
#if 0
} else if (AM_I_DC) {
/* This is the old way of doing things but what is gained? */
return I_ELECTION;
#endif
}
} else if (strcmp(op, CRM_OP_JOIN_OFFER) == 0) {
verify_feature_set(stored_msg);
crm_debug("Raising I_JOIN_OFFER: join-%s", crm_element_value(stored_msg, F_CRM_JOIN_ID));
return I_JOIN_OFFER;
} else if (strcmp(op, CRM_OP_JOIN_ACKNAK) == 0) {
crm_debug("Raising I_JOIN_RESULT: join-%s", crm_element_value(stored_msg, F_CRM_JOIN_ID));
return I_JOIN_RESULT;
} else if (strcmp(op, CRM_OP_LRM_DELETE) == 0) {
return handle_lrm_delete(stored_msg);
} else if ((strcmp(op, CRM_OP_LRM_FAIL) == 0)
|| (strcmp(op, CRM_OP_LRM_REFRESH) == 0) // @COMPAT
|| (strcmp(op, CRM_OP_REPROBE) == 0)) {
crm_xml_add(stored_msg, F_CRM_SYS_TO, CRM_SYSTEM_LRMD);
return I_ROUTER;
} else if (strcmp(op, CRM_OP_NOOP) == 0) {
return I_NULL;
} else if (strcmp(op, CRM_OP_LOCAL_SHUTDOWN) == 0) {
crm_shutdown(SIGTERM);
/*return I_SHUTDOWN; */
return I_NULL;
} else if (strcmp(op, CRM_OP_PING) == 0) {
return handle_ping(stored_msg);
} else if (strcmp(op, CRM_OP_NODE_INFO) == 0) {
return handle_node_info_request(stored_msg);
} else if (strcmp(op, CRM_OP_RM_NODE_CACHE) == 0) {
int id = 0;
const char *name = NULL;
crm_element_value_int(stored_msg, XML_ATTR_ID, &id);
name = crm_element_value(stored_msg, XML_ATTR_UNAME);
if(cause == C_IPC_MESSAGE) {
msg = create_request(CRM_OP_RM_NODE_CACHE, NULL, NULL, CRM_SYSTEM_CRMD, CRM_SYSTEM_CRMD, NULL);
if (send_cluster_message(NULL, crm_msg_crmd, msg, TRUE) == FALSE) {
crm_err("Could not instruct peers to remove references to node %s/%u", name, id);
} else {
crm_notice("Instructing peers to remove references to node %s/%u", name, id);
}
free_xml(msg);
} else {
reap_crm_member(id, name);
/* If we're forgetting this node, also forget any failures to fence
* it, so we don't carry that over to any node added later with the
* same name.
*/
st_fail_count_reset(name);
}
} else if (strcmp(op, CRM_OP_MAINTENANCE_NODES) == 0) {
xmlNode *xml = get_message_xml(stored_msg, F_CRM_DATA);
remote_ra_process_maintenance_nodes(xml);
} else if (strcmp(op, PCMK__CONTROLD_CMD_NODES) == 0) {
return handle_node_list(stored_msg);
/*========== (NOT_DC)-Only Actions ==========*/
} else if (!AM_I_DC) {
if (strcmp(op, CRM_OP_SHUTDOWN) == 0) {
return handle_shutdown_ack(stored_msg);
}
} else {
crm_err("Unexpected request (%s) sent to %s", op, AM_I_DC ? "the DC" : "non-DC node");
crm_log_xml_err(stored_msg, "Unexpected");
}
return I_NULL;
}
static void
handle_response(xmlNode *stored_msg)
{
const char *op = crm_element_value(stored_msg, F_CRM_TASK);
if (op == NULL) {
crm_log_xml_err(stored_msg, "Bad message");
} else if (AM_I_DC && strcmp(op, CRM_OP_PECALC) == 0) {
// Check whether scheduler answer been superseded by subsequent request
const char *msg_ref = crm_element_value(stored_msg, XML_ATTR_REFERENCE);
if (msg_ref == NULL) {
crm_err("%s - Ignoring calculation with no reference", op);
} else if (pcmk__str_eq(msg_ref, controld_globals.fsa_pe_ref,
pcmk__str_none)) {
ha_msg_input_t fsa_input;
controld_stop_sched_timer();
fsa_input.msg = stored_msg;
register_fsa_input_later(C_IPC_MESSAGE, I_PE_SUCCESS, &fsa_input);
} else {
crm_info("%s calculation %s is obsolete", op, msg_ref);
}
} else if (strcmp(op, CRM_OP_VOTE) == 0
|| strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0 || strcmp(op, CRM_OP_SHUTDOWN) == 0) {
} else {
const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM);
crm_err("Unexpected response (op=%s, src=%s) sent to the %s",
op, host_from, AM_I_DC ? "DC" : "controller");
}
}
static enum crmd_fsa_input
handle_shutdown_request(xmlNode * stored_msg)
{
/* handle here to avoid potential version issues
* where the shutdown message/procedure may have
* been changed in later versions.
*
* This way the DC is always in control of the shutdown
*/
char *now_s = NULL;
const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM);
if (host_from == NULL) {
/* we're shutting down and the DC */
host_from = controld_globals.our_nodename;
}
crm_info("Creating shutdown request for %s (state=%s)", host_from,
fsa_state2string(controld_globals.fsa_state));
crm_log_xml_trace(stored_msg, "message");
now_s = pcmk__ttoa(time(NULL));
update_attrd(host_from, XML_CIB_ATTR_SHUTDOWN, now_s, NULL, FALSE);
free(now_s);
/* will be picked up by the TE as long as its running */
return I_NULL;
}
static void
send_msg_via_ipc(xmlNode * msg, const char *sys)
{
pcmk__client_t *client_channel = NULL;
CRM_CHECK(sys != NULL, return);
client_channel = pcmk__find_client_by_id(sys);
if (crm_element_value(msg, F_CRM_HOST_FROM) == NULL) {
crm_xml_add(msg, F_CRM_HOST_FROM, controld_globals.our_nodename);
}
if (client_channel != NULL) {
/* Transient clients such as crmadmin */
pcmk__ipc_send_xml(client_channel, 0, msg, crm_ipc_server_event);
} else if (pcmk__str_eq(sys, CRM_SYSTEM_TENGINE, pcmk__str_none)) {
xmlNode *data = get_message_xml(msg, F_CRM_DATA);
process_te_message(msg, data);
} else if (pcmk__str_eq(sys, CRM_SYSTEM_LRMD, pcmk__str_none)) {
fsa_data_t fsa_data;
ha_msg_input_t fsa_input;
fsa_input.msg = msg;
fsa_input.xml = get_message_xml(msg, F_CRM_DATA);
fsa_data.id = 0;
fsa_data.actions = 0;
fsa_data.data = &fsa_input;
fsa_data.fsa_input = I_MESSAGE;
fsa_data.fsa_cause = C_IPC_MESSAGE;
fsa_data.origin = __func__;
fsa_data.data_type = fsa_dt_ha_msg;
do_lrm_invoke(A_LRM_INVOKE, C_IPC_MESSAGE, controld_globals.fsa_state,
I_MESSAGE, &fsa_data);
} else if (crmd_is_proxy_session(sys)) {
crmd_proxy_send(sys, msg);
} else {
crm_info("Received invalid request: unknown subsystem '%s'", sys);
}
}
void
delete_ha_msg_input(ha_msg_input_t * orig)
{
if (orig == NULL) {
return;
}
free_xml(orig->msg);
free(orig);
}
/*!
* \internal
* \brief Notify the cluster of a remote node state change
*
* \param[in] node_name Node's name
* \param[in] node_up true if node is up, false if down
*/
void
broadcast_remote_state_message(const char *node_name, bool node_up)
{
xmlNode *msg = create_request(CRM_OP_REMOTE_STATE, NULL, NULL,
CRM_SYSTEM_CRMD, CRM_SYSTEM_CRMD, NULL);
crm_info("Notifying cluster of Pacemaker Remote node %s %s",
node_name, node_up? "coming up" : "going down");
crm_xml_add(msg, XML_ATTR_ID, node_name);
pcmk__xe_set_bool_attr(msg, XML_NODE_IN_CLUSTER, node_up);
if (node_up) {
crm_xml_add(msg, PCMK__XA_CONN_HOST, controld_globals.our_nodename);
}
send_cluster_message(NULL, crm_msg_crmd, msg, TRUE);
free_xml(msg);
}
diff --git a/include/crm/cluster/internal.h b/include/crm/cluster/internal.h
index 9bc57c617c..da0cc9da4a 100644
--- a/include/crm/cluster/internal.h
+++ b/include/crm/cluster/internal.h
@@ -1,133 +1,134 @@
/*
* Copyright 2004-2021 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#ifndef CRM_CLUSTER_INTERNAL__H
# define CRM_CLUSTER_INTERNAL__H
# include <stdint.h> // uint32_t, uint64_t
# include <crm/cluster.h>
/* *INDENT-OFF* */
enum crm_proc_flag {
crm_proc_none = 0x00000001,
// Cluster layers
crm_proc_cpg = 0x04000000,
// Daemons
crm_proc_execd = 0x00000010,
crm_proc_based = 0x00000100,
crm_proc_controld = 0x00000200,
crm_proc_attrd = 0x00001000,
crm_proc_schedulerd = 0x00010000,
crm_proc_fenced = 0x00100000,
};
/* *INDENT-ON* */
/*!
* \internal
* \brief Return the process bit corresponding to the current cluster stack
*
* \return Process flag if detectable, otherwise 0
*/
static inline uint32_t
crm_get_cluster_proc(void)
{
switch (get_cluster_type()) {
case pcmk_cluster_corosync:
return crm_proc_cpg;
default:
break;
}
return crm_proc_none;
}
/*!
* \internal
* \brief Get log-friendly string description of a Corosync return code
*
* \param[in] error Corosync return code
*
* \return Log-friendly string description corresponding to \p error
*/
static inline const char *
pcmk__cs_err_str(int error)
{
# if SUPPORT_COROSYNC
switch (error) {
case CS_OK: return "OK";
case CS_ERR_LIBRARY: return "Library error";
case CS_ERR_VERSION: return "Version error";
case CS_ERR_INIT: return "Initialization error";
case CS_ERR_TIMEOUT: return "Timeout";
case CS_ERR_TRY_AGAIN: return "Try again";
case CS_ERR_INVALID_PARAM: return "Invalid parameter";
case CS_ERR_NO_MEMORY: return "No memory";
case CS_ERR_BAD_HANDLE: return "Bad handle";
case CS_ERR_BUSY: return "Busy";
case CS_ERR_ACCESS: return "Access error";
case CS_ERR_NOT_EXIST: return "Doesn't exist";
case CS_ERR_NAME_TOO_LONG: return "Name too long";
case CS_ERR_EXIST: return "Exists";
case CS_ERR_NO_SPACE: return "No space";
case CS_ERR_INTERRUPT: return "Interrupt";
case CS_ERR_NAME_NOT_FOUND: return "Name not found";
case CS_ERR_NO_RESOURCES: return "No resources";
case CS_ERR_NOT_SUPPORTED: return "Not supported";
case CS_ERR_BAD_OPERATION: return "Bad operation";
case CS_ERR_FAILED_OPERATION: return "Failed operation";
case CS_ERR_MESSAGE_ERROR: return "Message error";
case CS_ERR_QUEUE_FULL: return "Queue full";
case CS_ERR_QUEUE_NOT_AVAILABLE: return "Queue not available";
case CS_ERR_BAD_FLAGS: return "Bad flags";
case CS_ERR_TOO_BIG: return "Too big";
case CS_ERR_NO_SECTIONS: return "No sections";
}
# endif
return "Corosync error";
}
# if SUPPORT_COROSYNC
#if 0
/* This is the new way to do it, but we still support all Corosync 2 versions,
* and this isn't always available. A better alternative here would be to check
* for support in the configure script and enable this conditionally.
*/
#define pcmk__init_cmap(handle) cmap_initialize_map((handle), CMAP_MAP_ICMAP)
#else
#define pcmk__init_cmap(handle) cmap_initialize(handle)
#endif
char *pcmk__corosync_cluster_name(void);
bool pcmk__corosync_add_nodes(xmlNode *xml_parent);
# endif
crm_node_t *crm_update_peer_proc(const char *source, crm_node_t * peer,
uint32_t flag, const char *status);
crm_node_t *pcmk__update_peer_state(const char *source, crm_node_t *node,
const char *state, uint64_t membership);
void pcmk__update_peer_expected(const char *source, crm_node_t *node,
const char *expected);
void pcmk__reap_unseen_nodes(uint64_t ring_id);
void pcmk__corosync_quorum_connect(gboolean (*dispatch)(unsigned long long,
gboolean),
void (*destroy) (gpointer));
crm_node_t *pcmk__search_node_caches(unsigned int id, const char *uname,
uint32_t flags);
-crm_node_t *pcmk__search_cluster_node_cache(unsigned int id, const char *uname);
+crm_node_t *pcmk__search_cluster_node_cache(unsigned int id, const char *uname,
+ const char *uuid);
void pcmk__refresh_node_caches_from_cib(xmlNode *cib);
crm_node_t *pcmk__search_known_node_cache(unsigned int id, const char *uname,
uint32_t flags);
#endif
diff --git a/lib/cluster/cluster.c b/lib/cluster/cluster.c
index 011e0532d3..fce7591eb8 100644
--- a/lib/cluster/cluster.c
+++ b/lib/cluster/cluster.c
@@ -1,405 +1,405 @@
/*
* Copyright 2004-2022 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <dlfcn.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <sys/param.h>
#include <sys/types.h>
#include <crm/crm.h>
#include <crm/msg_xml.h>
#include <crm/common/ipc.h>
#include <crm/cluster/internal.h>
#include "crmcluster_private.h"
CRM_TRACE_INIT_DATA(cluster);
/*!
* \brief Get (and set if needed) a node's UUID
*
* \param[in,out] peer Node to check
*
* \return Node UUID of \p peer, or NULL if unknown
*/
const char *
crm_peer_uuid(crm_node_t *peer)
{
char *uuid = NULL;
// Check simple cases first, to avoid any calls that might block
if (peer == NULL) {
return NULL;
}
if (peer->uuid != NULL) {
return peer->uuid;
}
switch (get_cluster_type()) {
case pcmk_cluster_corosync:
#if SUPPORT_COROSYNC
uuid = pcmk__corosync_uuid(peer);
#endif
break;
case pcmk_cluster_unknown:
case pcmk_cluster_invalid:
crm_err("Unsupported cluster type");
break;
}
peer->uuid = uuid;
return peer->uuid;
}
/*!
* \brief Connect to the cluster layer
*
* \param[in,out] Initialized cluster object to connect
*
* \return TRUE on success, otherwise FALSE
*/
gboolean
crm_cluster_connect(crm_cluster_t *cluster)
{
enum cluster_type_e type = get_cluster_type();
crm_notice("Connecting to %s cluster infrastructure",
name_for_cluster_type(type));
switch (type) {
case pcmk_cluster_corosync:
#if SUPPORT_COROSYNC
crm_peer_init();
return pcmk__corosync_connect(cluster);
#else
break;
#endif // SUPPORT_COROSYNC
default:
break;
}
return FALSE;
}
/*!
* \brief Disconnect from the cluster layer
*
* \param[in,out] cluster Cluster object to disconnect
*/
void
crm_cluster_disconnect(crm_cluster_t *cluster)
{
enum cluster_type_e type = get_cluster_type();
crm_info("Disconnecting from %s cluster infrastructure",
name_for_cluster_type(type));
switch (type) {
case pcmk_cluster_corosync:
#if SUPPORT_COROSYNC
crm_peer_destroy();
pcmk__corosync_disconnect(cluster);
#endif // SUPPORT_COROSYNC
break;
default:
break;
}
}
/*!
* \brief Allocate a new \p crm_cluster_t object
*
* \return A newly allocated \p crm_cluster_t object (guaranteed not \p NULL)
* \note The caller is responsible for freeing the return value using
* \p pcmk_cluster_free().
*/
crm_cluster_t *
pcmk_cluster_new(void)
{
crm_cluster_t *cluster = calloc(1, sizeof(crm_cluster_t));
CRM_ASSERT(cluster != NULL);
return cluster;
}
/*!
* \brief Free a \p crm_cluster_t object and its dynamically allocated members
*
* \param[in,out] cluster Cluster object to free
*/
void
pcmk_cluster_free(crm_cluster_t *cluster)
{
if (cluster == NULL) {
return;
}
free(cluster->uuid);
free(cluster->uname);
free(cluster);
}
/*!
* \brief Send an XML message via the cluster messaging layer
*
* \param[in] node Cluster node to send message to
* \param[in] service Message type to use in message host info
* \param[in] data XML message to send
* \param[in] ordered Ignored for currently supported messaging layers
*
* \return TRUE on success, otherwise FALSE
*/
gboolean
send_cluster_message(const crm_node_t *node, enum crm_ais_msg_types service,
xmlNode *data, gboolean ordered)
{
switch (get_cluster_type()) {
case pcmk_cluster_corosync:
#if SUPPORT_COROSYNC
return pcmk__cpg_send_xml(data, node, service);
#endif
break;
default:
break;
}
return FALSE;
}
/*!
* \brief Get the local node's name
*
* \return Local node's name
* \note This will fatally exit if local node name cannot be known.
*/
const char *
get_local_node_name(void)
{
static char *name = NULL;
if (name == NULL) {
name = get_node_name(0);
}
return name;
}
/*!
* \brief Get the node name corresponding to a cluster node ID
*
* \param[in] nodeid Node ID to check (or 0 for local node)
*
* \return Node name corresponding to \p nodeid
* \note This will fatally exit if \p nodeid is 0 and local node name cannot be
* known.
*/
char *
get_node_name(uint32_t nodeid)
{
char *name = NULL;
enum cluster_type_e stack = get_cluster_type();
switch (stack) {
case pcmk_cluster_corosync:
#if SUPPORT_COROSYNC
name = pcmk__corosync_name(0, nodeid);
break;
#endif // SUPPORT_COROSYNC
default:
crm_err("Unknown cluster type: %s (%d)", name_for_cluster_type(stack), stack);
}
if ((name == NULL) && (nodeid == 0)) {
name = pcmk_hostname();
if (name == NULL) {
// @TODO Maybe let the caller decide what to do
crm_err("Could not obtain the local %s node name",
name_for_cluster_type(stack));
crm_exit(CRM_EX_FATAL);
}
crm_notice("Defaulting to uname -n for the local %s node name",
name_for_cluster_type(stack));
}
if (name == NULL) {
crm_notice("Could not obtain a node name for %s node with id %u",
name_for_cluster_type(stack), nodeid);
}
return name;
}
/*!
* \brief Get the node name corresponding to a node UUID
*
* \param[in] uuid UUID of desired node
*
* \return name of desired node
*
* \note This relies on the remote peer cache being populated with all
* remote nodes in the cluster, so callers should maintain that cache.
*/
const char *
crm_peer_uname(const char *uuid)
{
GHashTableIter iter;
crm_node_t *node = NULL;
CRM_CHECK(uuid != NULL, return NULL);
/* remote nodes have the same uname and uuid */
if (g_hash_table_lookup(crm_remote_peer_cache, uuid)) {
return uuid;
}
/* avoid blocking calls where possible */
g_hash_table_iter_init(&iter, crm_peer_cache);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
if (pcmk__str_eq(node->uuid, uuid, pcmk__str_casei)) {
if (node->uname != NULL) {
return node->uname;
}
break;
}
}
node = NULL;
if (is_corosync_cluster()) {
long long id;
if ((pcmk__scan_ll(uuid, &id, 0LL) != pcmk_rc_ok)
|| (id < 1LL) || (id > UINT32_MAX)) {
crm_err("Invalid Corosync node ID '%s'", uuid);
return NULL;
}
- node = pcmk__search_cluster_node_cache((uint32_t) id, NULL);
+ node = pcmk__search_cluster_node_cache((uint32_t) id, NULL, NULL);
if (node != NULL) {
crm_info("Setting uuid for node %s[%u] to %s",
node->uname, node->id, uuid);
node->uuid = strdup(uuid);
return node->uname;
}
return NULL;
}
return NULL;
}
/*!
* \brief Add a node's UUID as an XML attribute
*
* \param[in,out] xml XML element to add UUID to
* \param[in] attr XML attribute name to set
* \param[in,out] node Node whose UUID should be used as attribute value
*/
void
set_uuid(xmlNode *xml, const char *attr, crm_node_t *node)
{
crm_xml_add(xml, attr, crm_peer_uuid(node));
}
/*!
* \brief Get a log-friendly string equivalent of a cluster type
*
* \param[in] type Cluster type
*
* \return Log-friendly string corresponding to \p type
*/
const char *
name_for_cluster_type(enum cluster_type_e type)
{
switch (type) {
case pcmk_cluster_corosync:
return "corosync";
case pcmk_cluster_unknown:
return "unknown";
case pcmk_cluster_invalid:
return "invalid";
}
crm_err("Invalid cluster type: %d", type);
return "invalid";
}
/*!
* \brief Get (and validate) the local cluster type
*
* \return Local cluster type
* \note This will fatally exit if the local cluster type is invalid.
*/
enum cluster_type_e
get_cluster_type(void)
{
bool detected = false;
const char *cluster = NULL;
static enum cluster_type_e cluster_type = pcmk_cluster_unknown;
/* Return the previous calculation, if any */
if (cluster_type != pcmk_cluster_unknown) {
return cluster_type;
}
cluster = pcmk__env_option(PCMK__ENV_CLUSTER_TYPE);
#if SUPPORT_COROSYNC
/* If nothing is defined in the environment, try corosync (if supported) */
if (cluster == NULL) {
crm_debug("Testing with Corosync");
cluster_type = pcmk__corosync_detect();
if (cluster_type != pcmk_cluster_unknown) {
detected = true;
goto done;
}
}
#endif
/* Something was defined in the environment, test it against what we support */
crm_info("Verifying cluster type: '%s'",
((cluster == NULL)? "-unspecified-" : cluster));
if (cluster == NULL) {
#if SUPPORT_COROSYNC
} else if (pcmk__str_eq(cluster, "corosync", pcmk__str_casei)) {
cluster_type = pcmk_cluster_corosync;
#endif
} else {
cluster_type = pcmk_cluster_invalid;
goto done; /* Keep the compiler happy when no stacks are supported */
}
done:
if (cluster_type == pcmk_cluster_unknown) {
crm_notice("Could not determine the current cluster type");
} else if (cluster_type == pcmk_cluster_invalid) {
crm_notice("This installation does not support the '%s' cluster infrastructure: terminating.",
cluster);
crm_exit(CRM_EX_FATAL);
} else {
crm_info("%s an active '%s' cluster",
(detected? "Detected" : "Assuming"),
name_for_cluster_type(cluster_type));
}
return cluster_type;
}
/*!
* \brief Check whether the local cluster is a Corosync cluster
*
* \return TRUE if the local cluster is a Corosync cluster, otherwise FALSE
*/
gboolean
is_corosync_cluster(void)
{
return get_cluster_type() == pcmk_cluster_corosync;
}
diff --git a/lib/cluster/cpg.c b/lib/cluster/cpg.c
index 2af4a5059d..d79ea47cbb 100644
--- a/lib/cluster/cpg.c
+++ b/lib/cluster/cpg.c
@@ -1,1092 +1,1092 @@
/*
* Copyright 2004-2022 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <bzlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <crm/common/ipc.h>
#include <crm/cluster/internal.h>
#include <crm/common/mainloop.h>
#include <sys/utsname.h>
#include <qb/qbipc_common.h>
#include <qb/qbipcc.h>
#include <qb/qbutil.h>
#include <corosync/corodefs.h>
#include <corosync/corotypes.h>
#include <corosync/hdb.h>
#include <corosync/cpg.h>
#include <crm/msg_xml.h>
#include <crm/common/ipc_internal.h> /* PCMK__SPECIAL_PID* */
#include "crmcluster_private.h"
/* @TODO Once we can update the public API to require crm_cluster_t* in more
* functions, we can ditch this in favor of cluster->cpg_handle.
*/
static cpg_handle_t pcmk_cpg_handle = 0;
// @TODO These could be moved to crm_cluster_t* at that time as well
static bool cpg_evicted = false;
static GList *cs_message_queue = NULL;
static int cs_message_timer = 0;
struct pcmk__cpg_host_s {
uint32_t id;
uint32_t pid;
gboolean local;
enum crm_ais_msg_types type;
uint32_t size;
char uname[MAX_NAME];
} __attribute__ ((packed));
typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
struct pcmk__cpg_msg_s {
struct qb_ipc_response_header header __attribute__ ((aligned(8)));
uint32_t id;
gboolean is_compressed;
pcmk__cpg_host_t host;
pcmk__cpg_host_t sender;
uint32_t size;
uint32_t compressed_size;
/* 584 bytes */
char data[0];
} __attribute__ ((packed));
typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
static void crm_cs_flush(gpointer data);
#define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
#define cs_repeat(rc, counter, max, code) do { \
rc = code; \
if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
counter++; \
crm_debug("Retrying operation after %ds", counter); \
sleep(counter); \
} else { \
break; \
} \
} while (counter < max)
/*!
* \brief Disconnect from Corosync CPG
*
* \param[in,out] cluster Cluster to disconnect
*/
void
cluster_disconnect_cpg(crm_cluster_t *cluster)
{
pcmk_cpg_handle = 0;
if (cluster->cpg_handle) {
crm_trace("Disconnecting CPG");
cpg_leave(cluster->cpg_handle, &cluster->group);
cpg_finalize(cluster->cpg_handle);
cluster->cpg_handle = 0;
} else {
crm_info("No CPG connection");
}
}
/*!
* \brief Get the local Corosync node ID (via CPG)
*
* \param[in] handle CPG connection to use (or 0 to use new connection)
*
* \return Corosync ID of local node (or 0 if not known)
*/
uint32_t
get_local_nodeid(cpg_handle_t handle)
{
cs_error_t rc = CS_OK;
int retries = 0;
static uint32_t local_nodeid = 0;
cpg_handle_t local_handle = handle;
cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
int fd = -1;
uid_t found_uid = 0;
gid_t found_gid = 0;
pid_t found_pid = 0;
int rv;
if(local_nodeid != 0) {
return local_nodeid;
}
if(handle == 0) {
crm_trace("Creating connection");
cs_repeat(rc, retries, 5, cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
if (rc != CS_OK) {
crm_err("Could not connect to the CPG API: %s (%d)",
cs_strerror(rc), rc);
return 0;
}
rc = cpg_fd_get(local_handle, &fd);
if (rc != CS_OK) {
crm_err("Could not obtain the CPG API connection: %s (%d)",
cs_strerror(rc), rc);
goto bail;
}
/* CPG provider run as root (in given user namespace, anyway)? */
if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
&found_uid, &found_gid))) {
crm_err("CPG provider is not authentic:"
" process %lld (uid: %lld, gid: %lld)",
(long long) PCMK__SPECIAL_PID_AS_0(found_pid),
(long long) found_uid, (long long) found_gid);
goto bail;
} else if (rv < 0) {
crm_err("Could not verify authenticity of CPG provider: %s (%d)",
strerror(-rv), -rv);
goto bail;
}
}
if (rc == CS_OK) {
retries = 0;
crm_trace("Performing lookup");
cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
}
if (rc != CS_OK) {
crm_err("Could not get local node id from the CPG API: %s (%d)",
pcmk__cs_err_str(rc), rc);
}
bail:
if(handle == 0) {
crm_trace("Closing connection");
cpg_finalize(local_handle);
}
crm_debug("Local nodeid is %u", local_nodeid);
return local_nodeid;
}
/*!
* \internal
* \brief Callback function for Corosync message queue timer
*
* \param[in] data CPG handle
*
* \return FALSE (to indicate to glib that timer should not be removed)
*/
static gboolean
crm_cs_flush_cb(gpointer data)
{
cs_message_timer = 0;
crm_cs_flush(data);
return FALSE;
}
// Send no more than this many CPG messages in one flush
#define CS_SEND_MAX 200
/*!
* \internal
* \brief Send messages in Corosync CPG message queue
*
* \param[in] data CPG handle
*/
static void
crm_cs_flush(gpointer data)
{
unsigned int sent = 0;
guint queue_len = 0;
cs_error_t rc = 0;
cpg_handle_t *handle = (cpg_handle_t *) data;
if (*handle == 0) {
crm_trace("Connection is dead");
return;
}
queue_len = g_list_length(cs_message_queue);
if (((queue_len % 1000) == 0) && (queue_len > 1)) {
crm_err("CPG queue has grown to %d", queue_len);
} else if (queue_len == CS_SEND_MAX) {
crm_warn("CPG queue has grown to %d", queue_len);
}
if (cs_message_timer != 0) {
/* There is already a timer, wait until it goes off */
crm_trace("Timer active %d", cs_message_timer);
return;
}
while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
struct iovec *iov = cs_message_queue->data;
rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
if (rc != CS_OK) {
break;
}
sent++;
crm_trace("CPG message sent, size=%llu",
(unsigned long long) iov->iov_len);
cs_message_queue = g_list_remove(cs_message_queue, iov);
free(iov->iov_base);
free(iov);
}
queue_len -= sent;
do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
"Sent %u CPG message%s (%d still queued): %s (rc=%d)",
sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
(int) rc);
if (cs_message_queue) {
uint32_t delay_ms = 100;
if (rc != CS_OK) {
/* Proportionally more if sending failed but cap at 1s */
delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
}
cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
}
}
/*!
* \internal
* \brief Dispatch function for CPG handle
*
* \param[in,out] user_data Cluster object
*
* \return 0 on success, -1 on error (per mainloop_io_t interface)
*/
static int
pcmk_cpg_dispatch(gpointer user_data)
{
cs_error_t rc = CS_OK;
crm_cluster_t *cluster = (crm_cluster_t *) user_data;
rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
if (rc != CS_OK) {
crm_err("Connection to the CPG API failed: %s (%d)",
pcmk__cs_err_str(rc), rc);
cpg_finalize(cluster->cpg_handle);
cluster->cpg_handle = 0;
return -1;
} else if (cpg_evicted) {
crm_err("Evicted from CPG membership");
return -1;
}
return 0;
}
static inline const char *
ais_dest(const pcmk__cpg_host_t *host)
{
if (host->local) {
return "local";
} else if (host->size > 0) {
return host->uname;
} else {
return "<all>";
}
}
static inline const char *
msg_type2text(enum crm_ais_msg_types type)
{
const char *text = "unknown";
switch (type) {
case crm_msg_none:
text = "unknown";
break;
case crm_msg_ais:
text = "ais";
break;
case crm_msg_cib:
text = "cib";
break;
case crm_msg_crmd:
text = "crmd";
break;
case crm_msg_pe:
text = "pengine";
break;
case crm_msg_te:
text = "tengine";
break;
case crm_msg_lrmd:
text = "lrmd";
break;
case crm_msg_attrd:
text = "attrd";
break;
case crm_msg_stonithd:
text = "stonithd";
break;
case crm_msg_stonith_ng:
text = "stonith-ng";
break;
}
return text;
}
/*!
* \internal
* \brief Check whether a Corosync CPG message is valid
*
* \param[in] msg Corosync CPG message to check
*
* \return true if \p msg is valid, otherwise false
*/
static bool
check_message_sanity(const pcmk__cpg_msg_t *msg)
{
int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
if (payload_size < 1) {
crm_err("%sCPG message %d from %s invalid: "
"Claimed size of %d bytes is too small "
CRM_XS " from %s[%u] to %s@%s",
(msg->is_compressed? "Compressed " : ""),
msg->id, ais_dest(&(msg->sender)),
(int) msg->header.size,
msg_type2text(msg->sender.type), msg->sender.pid,
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return false;
}
if (msg->header.error != CS_OK) {
crm_err("%sCPG message %d from %s invalid: "
"Sender indicated error %d "
CRM_XS " from %s[%u] to %s@%s",
(msg->is_compressed? "Compressed " : ""),
msg->id, ais_dest(&(msg->sender)),
msg->header.error,
msg_type2text(msg->sender.type), msg->sender.pid,
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return false;
}
if (msg_data_len(msg) != payload_size) {
crm_err("%sCPG message %d from %s invalid: "
"Total size %d inconsistent with payload size %d "
CRM_XS " from %s[%u] to %s@%s",
(msg->is_compressed? "Compressed " : ""),
msg->id, ais_dest(&(msg->sender)),
(int) msg->header.size, (int) msg_data_len(msg),
msg_type2text(msg->sender.type), msg->sender.pid,
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return false;
}
if (!msg->is_compressed &&
/* msg->size != (strlen(msg->data) + 1) would be a stronger check,
* but checking the last byte or two should be quick
*/
(((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
|| (msg->data[msg->size - 1] != '\0'))) {
crm_err("CPG message %d from %s invalid: "
"Payload does not end at byte %llu "
CRM_XS " from %s[%u] to %s@%s",
msg->id, ais_dest(&(msg->sender)),
(unsigned long long) msg->size,
msg_type2text(msg->sender.type), msg->sender.pid,
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return false;
}
crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
(int) msg->header.size, (msg->is_compressed? "compressed " : ""),
msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
ais_dest(&(msg->sender)),
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return true;
}
/*!
* \brief Extract text data from a Corosync CPG message
*
* \param[in] handle CPG connection (to get local node ID if not known)
* \param[in] nodeid Corosync ID of node that sent message
* \param[in] pid Process ID of message sender (for logging only)
* \param[in,out] content CPG message
* \param[out] kind If not NULL, will be set to CPG header ID
* (which should be an enum crm_ais_msg_class value,
* currently always crm_class_cluster)
* \param[out] from If not NULL, will be set to sender uname
* (valid for the lifetime of \p content)
*
* \return Newly allocated string with message data
* \note It is the caller's responsibility to free the return value with free().
*/
char *
pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
uint32_t *kind, const char **from)
{
char *data = NULL;
pcmk__cpg_msg_t *msg = (pcmk__cpg_msg_t *) content;
if(handle) {
// Do filtering and field massaging
uint32_t local_nodeid = get_local_nodeid(handle);
const char *local_name = get_local_node_name();
if (msg->sender.id > 0 && msg->sender.id != nodeid) {
crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
return NULL;
} else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
/* Not for us */
crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
return NULL;
} else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
/* Not for us */
crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
return NULL;
}
msg->sender.id = nodeid;
if (msg->sender.size == 0) {
crm_node_t *peer = crm_get_peer(nodeid, NULL);
if (peer == NULL) {
crm_err("Peer with nodeid=%u is unknown", nodeid);
} else if (peer->uname == NULL) {
crm_err("No uname for peer with nodeid=%u", nodeid);
} else {
crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
msg->sender.size = strlen(peer->uname);
memset(msg->sender.uname, 0, MAX_NAME);
memcpy(msg->sender.uname, peer->uname, msg->sender.size);
}
}
}
crm_trace("Got new%s message (size=%d, %d, %d)",
msg->is_compressed ? " compressed" : "",
msg_data_len(msg), msg->size, msg->compressed_size);
if (kind != NULL) {
*kind = msg->header.id;
}
if (from != NULL) {
*from = msg->sender.uname;
}
if (msg->is_compressed && msg->size > 0) {
int rc = BZ_OK;
char *uncompressed = NULL;
unsigned int new_size = msg->size + 1;
if (!check_message_sanity(msg)) {
goto badmsg;
}
crm_trace("Decompressing message data");
uncompressed = calloc(1, new_size);
rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
if (rc != BZ_OK) {
crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
bz2_strerror(rc), rc);
free(uncompressed);
goto badmsg;
}
CRM_ASSERT(rc == BZ_OK);
CRM_ASSERT(new_size == msg->size);
data = uncompressed;
} else if (!check_message_sanity(msg)) {
goto badmsg;
} else {
data = strdup(msg->data);
}
// Is this necessary?
crm_get_peer(msg->sender.id, msg->sender.uname);
crm_trace("Payload: %.200s", data);
return data;
badmsg:
crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
" min=%d, total=%d, size=%d, bz2_size=%d",
msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
msg->header.size, msg->size, msg->compressed_size);
free(data);
return NULL;
}
/*!
* \internal
* \brief Compare cpg_address objects by node ID
*
* \param[in] first First cpg_address structure to compare
* \param[in] second Second cpg_address structure to compare
*
* \return Negative number if first's node ID is lower,
* positive number if first's node ID is greater,
* or 0 if both node IDs are equal
*/
static int
cmp_member_list_nodeid(const void *first, const void *second)
{
const struct cpg_address *const a = *((const struct cpg_address **) first),
*const b = *((const struct cpg_address **) second);
if (a->nodeid < b->nodeid) {
return -1;
} else if (a->nodeid > b->nodeid) {
return 1;
}
/* don't bother with "reason" nor "pid" */
return 0;
}
/*!
* \internal
* \brief Get a readable string equivalent of a cpg_reason_t value
*
* \param[in] reason CPG reason value
*
* \return Readable string suitable for logging
*/
static const char *
cpgreason2str(cpg_reason_t reason)
{
switch (reason) {
case CPG_REASON_JOIN: return " via cpg_join";
case CPG_REASON_LEAVE: return " via cpg_leave";
case CPG_REASON_NODEDOWN: return " via cluster exit";
case CPG_REASON_NODEUP: return " via cluster join";
case CPG_REASON_PROCDOWN: return " for unknown reason";
default: break;
}
return "";
}
/*!
* \internal
* \brief Get a log-friendly node name
*
* \param[in] peer Node to check
*
* \return Node's uname, or readable string if not known
*/
static inline const char *
peer_name(const crm_node_t *peer)
{
if (peer == NULL) {
return "unknown node";
} else if (peer->uname == NULL) {
return "peer node";
} else {
return peer->uname;
}
}
/*!
* \internal
* \brief Process a CPG peer's leaving the cluster
*
* \param[in] cpg_group_name CPG group name (for logging)
* \param[in] event_counter Event number (for logging)
* \param[in] local_nodeid Node ID of local node
* \param[in] cpg_peer CPG peer that left
* \param[in] sorted_member_list List of remaining members, qsort()-ed by ID
* \param[in] member_list_entries Number of entries in \p sorted_member_list
*/
static void
node_left(const char *cpg_group_name, int event_counter,
uint32_t local_nodeid, const struct cpg_address *cpg_peer,
const struct cpg_address **sorted_member_list,
size_t member_list_entries)
{
crm_node_t *peer = pcmk__search_cluster_node_cache(cpg_peer->nodeid,
- NULL);
+ NULL, NULL);
const struct cpg_address **rival = NULL;
/* Most CPG-related Pacemaker code assumes that only one process on a node
* can be in the process group, but Corosync does not impose this
* limitation, and more than one can be a member in practice due to a
* daemon attempting to start while another instance is already running.
*
* Check for any such duplicate instances, because we don't want to process
* their leaving as if our actual peer left. If the peer that left still has
* an entry in sorted_member_list (with a different PID), we will ignore the
* leaving.
*
* @TODO Track CPG members' PIDs so we can tell exactly who left.
*/
if (peer != NULL) {
rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
sizeof(const struct cpg_address *),
cmp_member_list_nodeid);
}
if (rival == NULL) {
crm_info("Group %s event %d: %s (node %u pid %u) left%s",
cpg_group_name, event_counter, peer_name(peer),
cpg_peer->nodeid, cpg_peer->pid,
cpgreason2str(cpg_peer->reason));
if (peer != NULL) {
crm_update_peer_proc(__func__, peer, crm_proc_cpg,
OFFLINESTATUS);
}
} else if (cpg_peer->nodeid == local_nodeid) {
crm_warn("Group %s event %d: duplicate local pid %u left%s",
cpg_group_name, event_counter,
cpg_peer->pid, cpgreason2str(cpg_peer->reason));
} else {
crm_warn("Group %s event %d: "
"%s (node %u) duplicate pid %u left%s (%u remains)",
cpg_group_name, event_counter, peer_name(peer),
cpg_peer->nodeid, cpg_peer->pid,
cpgreason2str(cpg_peer->reason), (*rival)->pid);
}
}
/*!
* \brief Handle a CPG configuration change event
*
* \param[in] handle CPG connection
* \param[in] cpg_name CPG group name
* \param[in] member_list List of current CPG members
* \param[in] member_list_entries Number of entries in \p member_list
* \param[in] left_list List of CPG members that left
* \param[in] left_list_entries Number of entries in \p left_list
* \param[in] joined_list List of CPG members that joined
* \param[in] joined_list_entries Number of entries in \p joined_list
*/
void
pcmk_cpg_membership(cpg_handle_t handle,
const struct cpg_name *groupName,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
const struct cpg_address *joined_list, size_t joined_list_entries)
{
int i;
gboolean found = FALSE;
static int counter = 0;
uint32_t local_nodeid = get_local_nodeid(handle);
const struct cpg_address **sorted;
sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
CRM_ASSERT(sorted != NULL);
for (size_t iter = 0; iter < member_list_entries; iter++) {
sorted[iter] = member_list + iter;
}
/* so that the cross-matching multiply-subscribed nodes is then cheap */
qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
cmp_member_list_nodeid);
for (i = 0; i < left_list_entries; i++) {
node_left(groupName->value, counter, local_nodeid, &left_list[i],
sorted, member_list_entries);
}
free(sorted);
sorted = NULL;
for (i = 0; i < joined_list_entries; i++) {
crm_info("Group %s event %d: node %u pid %u joined%s",
groupName->value, counter, joined_list[i].nodeid,
joined_list[i].pid, cpgreason2str(joined_list[i].reason));
}
for (i = 0; i < member_list_entries; i++) {
crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
if (member_list[i].nodeid == local_nodeid
&& member_list[i].pid != getpid()) {
// See the note in node_left()
crm_warn("Group %s event %d: detected duplicate local pid %u",
groupName->value, counter, member_list[i].pid);
continue;
}
crm_info("Group %s event %d: %s (node %u pid %u) is member",
groupName->value, counter, peer_name(peer),
member_list[i].nodeid, member_list[i].pid);
/* If the caller left auto-reaping enabled, this will also update the
* state to member.
*/
peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
ONLINESTATUS);
if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
/* The node is a CPG member, but we currently think it's not a
* cluster member. This is possible only if auto-reaping was
* disabled. The node may be joining, and we happened to get the CPG
* notification before the quorum notification; or the node may have
* just died, and we are processing its final messages; or a bug
* has affected the peer cache.
*/
time_t now = time(NULL);
if (peer->when_lost == 0) {
// Track when we first got into this contradictory state
peer->when_lost = now;
} else if (now > (peer->when_lost + 60)) {
// If it persists for more than a minute, update the state
crm_warn("Node %u is member of group %s but was believed offline",
member_list[i].nodeid, groupName->value);
pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
}
}
if (local_nodeid == member_list[i].nodeid) {
found = TRUE;
}
}
if (!found) {
crm_err("Local node was evicted from group %s", groupName->value);
cpg_evicted = true;
}
counter++;
}
/*!
* \brief Connect to Corosync CPG
*
* \param[in,out] cluster Cluster object
*
* \return TRUE on success, otherwise FALSE
*/
gboolean
cluster_connect_cpg(crm_cluster_t *cluster)
{
cs_error_t rc;
int fd = -1;
int retries = 0;
uint32_t id = 0;
crm_node_t *peer = NULL;
cpg_handle_t handle = 0;
const char *message_name = pcmk__message_name(crm_system_name);
uid_t found_uid = 0;
gid_t found_gid = 0;
pid_t found_pid = 0;
int rv;
struct mainloop_fd_callbacks cpg_fd_callbacks = {
.dispatch = pcmk_cpg_dispatch,
.destroy = cluster->destroy,
};
cpg_model_v1_data_t cpg_model_info = {
.model = CPG_MODEL_V1,
.cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
.cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
.cpg_totem_confchg_fn = NULL,
.flags = 0,
};
cpg_evicted = false;
cluster->group.length = 0;
cluster->group.value[0] = 0;
/* group.value is char[128] */
strncpy(cluster->group.value, message_name, 127);
cluster->group.value[127] = 0;
cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
if (rc != CS_OK) {
crm_err("Could not connect to the CPG API: %s (%d)",
cs_strerror(rc), rc);
goto bail;
}
rc = cpg_fd_get(handle, &fd);
if (rc != CS_OK) {
crm_err("Could not obtain the CPG API connection: %s (%d)",
cs_strerror(rc), rc);
goto bail;
}
/* CPG provider run as root (in given user namespace, anyway)? */
if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
&found_uid, &found_gid))) {
crm_err("CPG provider is not authentic:"
" process %lld (uid: %lld, gid: %lld)",
(long long) PCMK__SPECIAL_PID_AS_0(found_pid),
(long long) found_uid, (long long) found_gid);
rc = CS_ERR_ACCESS;
goto bail;
} else if (rv < 0) {
crm_err("Could not verify authenticity of CPG provider: %s (%d)",
strerror(-rv), -rv);
rc = CS_ERR_ACCESS;
goto bail;
}
id = get_local_nodeid(handle);
if (id == 0) {
crm_err("Could not get local node id from the CPG API");
goto bail;
}
cluster->nodeid = id;
retries = 0;
cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
if (rc != CS_OK) {
crm_err("Could not join the CPG group '%s': %d", message_name, rc);
goto bail;
}
pcmk_cpg_handle = handle;
cluster->cpg_handle = handle;
mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
bail:
if (rc != CS_OK) {
cpg_finalize(handle);
return FALSE;
}
peer = crm_get_peer(id, NULL);
crm_update_peer_proc(__func__, peer, crm_proc_cpg, ONLINESTATUS);
return TRUE;
}
/*!
* \internal
* \brief Send an XML message via Corosync CPG
*
* \param[in] msg XML message to send
* \param[in] node Cluster node to send message to
* \param[in] dest Type of message to send
*
* \return TRUE on success, otherwise FALSE
*/
gboolean
pcmk__cpg_send_xml(xmlNode *msg, const crm_node_t *node,
enum crm_ais_msg_types dest)
{
gboolean rc = TRUE;
char *data = NULL;
data = dump_xml_unformatted(msg);
rc = send_cluster_text(crm_class_cluster, data, FALSE, node, dest);
free(data);
return rc;
}
/*!
* \internal
* \brief Send string data via Corosync CPG
*
* \param[in] msg_class Message class (to set as CPG header ID)
* \param[in] data Data to send
* \param[in] local What to set as host "local" value (which is never used)
* \param[in] node Cluster node to send message to
* \param[in] dest Type of message to send
*
* \return TRUE on success, otherwise FALSE
*/
gboolean
send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
gboolean local, const crm_node_t *node,
enum crm_ais_msg_types dest)
{
static int msg_id = 0;
static int local_pid = 0;
static int local_name_len = 0;
static const char *local_name = NULL;
char *target = NULL;
struct iovec *iov;
pcmk__cpg_msg_t *msg = NULL;
enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
switch (msg_class) {
case crm_class_cluster:
break;
default:
crm_err("Invalid message class: %d", msg_class);
return FALSE;
}
CRM_CHECK(dest != crm_msg_ais, return FALSE);
if (local_name == NULL) {
local_name = get_local_node_name();
}
if ((local_name_len == 0) && (local_name != NULL)) {
local_name_len = strlen(local_name);
}
if (data == NULL) {
data = "";
}
if (local_pid == 0) {
local_pid = getpid();
}
if (sender == crm_msg_none) {
sender = local_pid;
}
msg = calloc(1, sizeof(pcmk__cpg_msg_t));
msg_id++;
msg->id = msg_id;
msg->header.id = msg_class;
msg->header.error = CS_OK;
msg->host.type = dest;
msg->host.local = local;
if (node) {
if (node->uname) {
target = strdup(node->uname);
msg->host.size = strlen(node->uname);
memset(msg->host.uname, 0, MAX_NAME);
memcpy(msg->host.uname, node->uname, msg->host.size);
} else {
target = crm_strdup_printf("%u", node->id);
}
msg->host.id = node->id;
} else {
target = strdup("all");
}
msg->sender.id = 0;
msg->sender.type = sender;
msg->sender.pid = local_pid;
msg->sender.size = local_name_len;
memset(msg->sender.uname, 0, MAX_NAME);
if ((local_name != NULL) && (msg->sender.size != 0)) {
memcpy(msg->sender.uname, local_name, msg->sender.size);
}
msg->size = 1 + strlen(data);
msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
if (msg->size < CRM_BZ2_THRESHOLD) {
msg = pcmk__realloc(msg, msg->header.size);
memcpy(msg->data, data, msg->size);
} else {
char *compressed = NULL;
unsigned int new_size = 0;
char *uncompressed = strdup(data);
if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
&compressed, &new_size) == pcmk_rc_ok) {
msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
msg = pcmk__realloc(msg, msg->header.size);
memcpy(msg->data, compressed, new_size);
msg->is_compressed = TRUE;
msg->compressed_size = new_size;
} else {
// cppcheck seems not to understand the abort logic in pcmk__realloc
// cppcheck-suppress memleak
msg = pcmk__realloc(msg, msg->header.size);
memcpy(msg->data, data, msg->size);
}
free(uncompressed);
free(compressed);
}
iov = calloc(1, sizeof(struct iovec));
iov->iov_base = msg;
iov->iov_len = msg->header.size;
if (msg->compressed_size) {
crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
msg->id, target, (unsigned long long) iov->iov_len,
msg->compressed_size, data);
} else {
crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
msg->id, target, (unsigned long long) iov->iov_len,
msg->size, data);
}
free(target);
cs_message_queue = g_list_append(cs_message_queue, iov);
crm_cs_flush(&pcmk_cpg_handle);
return TRUE;
}
/*!
* \brief Get the message type equivalent of a string
*
* \param[in] text String of message type
*
* \return Message type equivalent of \p text
*/
enum crm_ais_msg_types
text2msg_type(const char *text)
{
int type = crm_msg_none;
CRM_CHECK(text != NULL, return type);
text = pcmk__message_name(text);
if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
type = crm_msg_ais;
} else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
type = crm_msg_cib;
} else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
type = crm_msg_crmd;
} else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
type = crm_msg_te;
} else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
type = crm_msg_pe;
} else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
type = crm_msg_lrmd;
} else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
type = crm_msg_stonithd;
} else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
type = crm_msg_stonith_ng;
} else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
type = crm_msg_attrd;
} else {
/* This will normally be a transient client rather than
* a cluster daemon. Set the type to the pid of the client
*/
int scan_rc = sscanf(text, "%d", &type);
if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
/* Ensure it's sane */
type = crm_msg_none;
}
}
return type;
}
diff --git a/lib/cluster/membership.c b/lib/cluster/membership.c
index 122ee99fd6..d5db518cbd 100644
--- a/lib/cluster/membership.c
+++ b/lib/cluster/membership.c
@@ -1,1315 +1,1328 @@
/*
* Copyright 2004-2023 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#ifndef _GNU_SOURCE
# define _GNU_SOURCE
#endif
#include <sys/param.h>
#include <sys/types.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <glib.h>
#include <crm/common/ipc.h>
#include <crm/common/xml_internal.h>
#include <crm/cluster/internal.h>
#include <crm/msg_xml.h>
#include <crm/stonith-ng.h>
#include "crmcluster_private.h"
/* The peer cache remembers cluster nodes that have been seen.
* This is managed mostly automatically by libcluster, based on
* cluster membership events.
*
* Because cluster nodes can have conflicting names or UUIDs,
* the hash table key is a uniquely generated ID.
*/
GHashTable *crm_peer_cache = NULL;
/*
* The remote peer cache tracks pacemaker_remote nodes. While the
* value has the same type as the peer cache's, it is tracked separately for
* three reasons: pacemaker_remote nodes can't have conflicting names or UUIDs,
* so the name (which is also the UUID) is used as the hash table key; there
* is no equivalent of membership events, so management is not automatic; and
* most users of the peer cache need to exclude pacemaker_remote nodes.
*
* That said, using a single cache would be more logical and less error-prone,
* so it would be a good idea to merge them one day.
*
* libcluster provides two avenues for populating the cache:
* crm_remote_peer_get() and crm_remote_peer_cache_remove() directly manage it,
* while crm_remote_peer_cache_refresh() populates it via the CIB.
*/
GHashTable *crm_remote_peer_cache = NULL;
/*
* The known node cache tracks cluster and remote nodes that have been seen in
* the CIB. It is useful mainly when a caller needs to know about a node that
* may no longer be in the membership, but doesn't want to add the node to the
* main peer cache tables.
*/
static GHashTable *known_node_cache = NULL;
unsigned long long crm_peer_seq = 0;
gboolean crm_have_quorum = FALSE;
static gboolean crm_autoreap = TRUE;
// Flag setting and clearing for crm_node_t:flags
#define set_peer_flags(peer, flags_to_set) do { \
(peer)->flags = pcmk__set_flags_as(__func__, __LINE__, LOG_TRACE, \
"Peer", (peer)->uname, \
(peer)->flags, (flags_to_set), \
#flags_to_set); \
} while (0)
#define clear_peer_flags(peer, flags_to_clear) do { \
(peer)->flags = pcmk__clear_flags_as(__func__, __LINE__, \
LOG_TRACE, \
"Peer", (peer)->uname, \
(peer)->flags, (flags_to_clear), \
#flags_to_clear); \
} while (0)
static void update_peer_uname(crm_node_t *node, const char *uname);
int
crm_remote_peer_cache_size(void)
{
if (crm_remote_peer_cache == NULL) {
return 0;
}
return g_hash_table_size(crm_remote_peer_cache);
}
/*!
* \brief Get a remote node peer cache entry, creating it if necessary
*
* \param[in] node_name Name of remote node
*
* \return Cache entry for node on success, NULL (and set errno) otherwise
*
* \note When creating a new entry, this will leave the node state undetermined,
* so the caller should also call pcmk__update_peer_state() if the state
* is known.
*/
crm_node_t *
crm_remote_peer_get(const char *node_name)
{
crm_node_t *node;
if (node_name == NULL) {
errno = -EINVAL;
return NULL;
}
/* Return existing cache entry if one exists */
node = g_hash_table_lookup(crm_remote_peer_cache, node_name);
if (node) {
return node;
}
/* Allocate a new entry */
node = calloc(1, sizeof(crm_node_t));
if (node == NULL) {
return NULL;
}
/* Populate the essential information */
set_peer_flags(node, crm_remote_node);
node->uuid = strdup(node_name);
if (node->uuid == NULL) {
free(node);
errno = -ENOMEM;
return NULL;
}
/* Add the new entry to the cache */
g_hash_table_replace(crm_remote_peer_cache, node->uuid, node);
crm_trace("added %s to remote cache", node_name);
/* Update the entry's uname, ensuring peer status callbacks are called */
update_peer_uname(node, node_name);
return node;
}
void
crm_remote_peer_cache_remove(const char *node_name)
{
if (g_hash_table_remove(crm_remote_peer_cache, node_name)) {
crm_trace("removed %s from remote peer cache", node_name);
}
}
/*!
* \internal
* \brief Return node status based on a CIB status entry
*
* \param[in] node_state XML of node state
*
* \return CRM_NODE_LOST if XML_NODE_IN_CLUSTER is false in node_state,
* CRM_NODE_MEMBER otherwise
* \note Unlike most boolean XML attributes, this one defaults to true, for
* backward compatibility with older controllers that don't set it.
*/
static const char *
remote_state_from_cib(const xmlNode *node_state)
{
bool status = false;
if (pcmk__xe_get_bool_attr(node_state, XML_NODE_IN_CLUSTER, &status) == pcmk_rc_ok && !status) {
return CRM_NODE_LOST;
} else {
return CRM_NODE_MEMBER;
}
}
/* user data for looping through remote node xpath searches */
struct refresh_data {
const char *field; /* XML attribute to check for node name */
gboolean has_state; /* whether to update node state based on XML */
};
/*!
* \internal
* \brief Process one pacemaker_remote node xpath search result
*
* \param[in] result XML search result
* \param[in] user_data what to look for in the XML
*/
static void
remote_cache_refresh_helper(xmlNode *result, void *user_data)
{
const struct refresh_data *data = user_data;
const char *remote = crm_element_value(result, data->field);
const char *state = NULL;
crm_node_t *node;
CRM_CHECK(remote != NULL, return);
/* Determine node's state, if the result has it */
if (data->has_state) {
state = remote_state_from_cib(result);
}
/* Check whether cache already has entry for node */
node = g_hash_table_lookup(crm_remote_peer_cache, remote);
if (node == NULL) {
/* Node is not in cache, so add a new entry for it */
node = crm_remote_peer_get(remote);
CRM_ASSERT(node);
if (state) {
pcmk__update_peer_state(__func__, node, state, 0);
}
} else if (pcmk_is_set(node->flags, crm_node_dirty)) {
/* Node is in cache and hasn't been updated already, so mark it clean */
clear_peer_flags(node, crm_node_dirty);
if (state) {
pcmk__update_peer_state(__func__, node, state, 0);
}
}
}
static void
mark_dirty(gpointer key, gpointer value, gpointer user_data)
{
set_peer_flags((crm_node_t *) value, crm_node_dirty);
}
static gboolean
is_dirty(gpointer key, gpointer value, gpointer user_data)
{
return pcmk_is_set(((crm_node_t*)value)->flags, crm_node_dirty);
}
/*!
* \brief Repopulate the remote peer cache based on CIB XML
*
* \param[in] xmlNode CIB XML to parse
*/
void
crm_remote_peer_cache_refresh(xmlNode *cib)
{
struct refresh_data data;
crm_peer_init();
/* First, we mark all existing cache entries as dirty,
* so that later we can remove any that weren't in the CIB.
* We don't empty the cache, because we need to detect changes in state.
*/
g_hash_table_foreach(crm_remote_peer_cache, mark_dirty, NULL);
/* Look for guest nodes and remote nodes in the status section */
data.field = "id";
data.has_state = TRUE;
crm_foreach_xpath_result(cib, PCMK__XP_REMOTE_NODE_STATUS,
remote_cache_refresh_helper, &data);
/* Look for guest nodes and remote nodes in the configuration section,
* because they may have just been added and not have a status entry yet.
* In that case, the cached node state will be left NULL, so that the
* peer status callback isn't called until we're sure the node started
* successfully.
*/
data.field = "value";
data.has_state = FALSE;
crm_foreach_xpath_result(cib, PCMK__XP_GUEST_NODE_CONFIG,
remote_cache_refresh_helper, &data);
data.field = "id";
data.has_state = FALSE;
crm_foreach_xpath_result(cib, PCMK__XP_REMOTE_NODE_CONFIG,
remote_cache_refresh_helper, &data);
/* Remove all old cache entries that weren't seen in the CIB */
g_hash_table_foreach_remove(crm_remote_peer_cache, is_dirty, NULL);
}
gboolean
crm_is_peer_active(const crm_node_t * node)
{
if(node == NULL) {
return FALSE;
}
if (pcmk_is_set(node->flags, crm_remote_node)) {
/* remote nodes are never considered active members. This
* guarantees they will never be considered for DC membership.*/
return FALSE;
}
#if SUPPORT_COROSYNC
if (is_corosync_cluster()) {
return crm_is_corosync_peer_active(node);
}
#endif
crm_err("Unhandled cluster type: %s", name_for_cluster_type(get_cluster_type()));
return FALSE;
}
static gboolean
crm_reap_dead_member(gpointer key, gpointer value, gpointer user_data)
{
crm_node_t *node = value;
crm_node_t *search = user_data;
if (search == NULL) {
return FALSE;
} else if (search->id && node->id != search->id) {
return FALSE;
} else if (search->id == 0 && !pcmk__str_eq(node->uname, search->uname, pcmk__str_casei)) {
return FALSE;
} else if (crm_is_peer_active(value) == FALSE) {
crm_info("Removing node with name %s and id %u from membership cache",
(node->uname? node->uname : "unknown"), node->id);
return TRUE;
}
return FALSE;
}
/*!
* \brief Remove all peer cache entries matching a node ID and/or uname
*
* \param[in] id ID of node to remove (or 0 to ignore)
* \param[in] name Uname of node to remove (or NULL to ignore)
*
* \return Number of cache entries removed
*/
guint
reap_crm_member(uint32_t id, const char *name)
{
int matches = 0;
crm_node_t search = { 0, };
if (crm_peer_cache == NULL) {
crm_trace("Membership cache not initialized, ignoring purge request");
return 0;
}
search.id = id;
pcmk__str_update(&search.uname, name);
matches = g_hash_table_foreach_remove(crm_peer_cache, crm_reap_dead_member, &search);
if(matches) {
crm_notice("Purged %d peer%s with id=%u%s%s from the membership cache",
matches, pcmk__plural_s(matches), search.id,
(search.uname? " and/or uname=" : ""),
(search.uname? search.uname : ""));
} else {
crm_info("No peers with id=%u%s%s to purge from the membership cache",
search.id, (search.uname? " and/or uname=" : ""),
(search.uname? search.uname : ""));
}
free(search.uname);
return matches;
}
static void
count_peer(gpointer key, gpointer value, gpointer user_data)
{
guint *count = user_data;
crm_node_t *node = value;
if (crm_is_peer_active(node)) {
*count = *count + 1;
}
}
guint
crm_active_peers(void)
{
guint count = 0;
if (crm_peer_cache) {
g_hash_table_foreach(crm_peer_cache, count_peer, &count);
}
return count;
}
static void
destroy_crm_node(gpointer data)
{
crm_node_t *node = data;
crm_trace("Destroying entry for node %u: %s", node->id, node->uname);
free(node->uname);
free(node->state);
free(node->uuid);
free(node->expected);
free(node->conn_host);
free(node);
}
void
crm_peer_init(void)
{
if (crm_peer_cache == NULL) {
crm_peer_cache = pcmk__strikey_table(free, destroy_crm_node);
}
if (crm_remote_peer_cache == NULL) {
crm_remote_peer_cache = pcmk__strikey_table(NULL, destroy_crm_node);
}
if (known_node_cache == NULL) {
known_node_cache = pcmk__strikey_table(free, destroy_crm_node);
}
}
void
crm_peer_destroy(void)
{
if (crm_peer_cache != NULL) {
crm_trace("Destroying peer cache with %d members", g_hash_table_size(crm_peer_cache));
g_hash_table_destroy(crm_peer_cache);
crm_peer_cache = NULL;
}
if (crm_remote_peer_cache != NULL) {
crm_trace("Destroying remote peer cache with %d members", g_hash_table_size(crm_remote_peer_cache));
g_hash_table_destroy(crm_remote_peer_cache);
crm_remote_peer_cache = NULL;
}
if (known_node_cache != NULL) {
crm_trace("Destroying known node cache with %d members",
g_hash_table_size(known_node_cache));
g_hash_table_destroy(known_node_cache);
known_node_cache = NULL;
}
}
static void (*peer_status_callback)(enum crm_status_type, crm_node_t *,
const void *) = NULL;
/*!
* \brief Set a client function that will be called after peer status changes
*
* \param[in] dispatch Pointer to function to use as callback
*
* \note Previously, client callbacks were responsible for peer cache
* management. This is no longer the case, and client callbacks should do
* only client-specific handling. Callbacks MUST NOT add or remove entries
* in the peer caches.
*/
void
crm_set_status_callback(void (*dispatch) (enum crm_status_type, crm_node_t *, const void *))
{
peer_status_callback = dispatch;
}
/*!
* \brief Tell the library whether to automatically reap lost nodes
*
* If TRUE (the default), calling crm_update_peer_proc() will also update the
* peer state to CRM_NODE_MEMBER or CRM_NODE_LOST, and pcmk__update_peer_state()
* will reap peers whose state changes to anything other than CRM_NODE_MEMBER.
* Callers should leave this enabled unless they plan to manage the cache
* separately on their own.
*
* \param[in] autoreap TRUE to enable automatic reaping, FALSE to disable
*/
void
crm_set_autoreap(gboolean autoreap)
{
crm_autoreap = autoreap;
}
static void
dump_peer_hash(int level, const char *caller)
{
GHashTableIter iter;
const char *id = NULL;
crm_node_t *node = NULL;
g_hash_table_iter_init(&iter, crm_peer_cache);
while (g_hash_table_iter_next(&iter, (gpointer *) &id, (gpointer *) &node)) {
do_crm_log(level, "%s: Node %u/%s = %p - %s", caller, node->id, node->uname, node, id);
}
}
static gboolean
hash_find_by_data(gpointer key, gpointer value, gpointer user_data)
{
return value == user_data;
}
/*!
* \internal
* \brief Search caches for a node (cluster or Pacemaker Remote)
*
* \param[in] id If not 0, cluster node ID to search for
* \param[in] uname If not NULL, node name to search for
* \param[in] flags Bitmask of enum crm_get_peer_flags
*
* \return Node cache entry if found, otherwise NULL
*/
crm_node_t *
pcmk__search_node_caches(unsigned int id, const char *uname, uint32_t flags)
{
crm_node_t *node = NULL;
CRM_ASSERT(id > 0 || uname != NULL);
crm_peer_init();
if ((uname != NULL) && pcmk_is_set(flags, CRM_GET_PEER_REMOTE)) {
node = g_hash_table_lookup(crm_remote_peer_cache, uname);
}
if ((node == NULL) && pcmk_is_set(flags, CRM_GET_PEER_CLUSTER)) {
- node = pcmk__search_cluster_node_cache(id, uname);
+ node = pcmk__search_cluster_node_cache(id, uname, NULL);
}
return node;
}
/*!
* \brief Get a node cache entry (cluster or Pacemaker Remote)
*
* \param[in] id If not 0, cluster node ID to search for
* \param[in] uname If not NULL, node name to search for
* \param[in] flags Bitmask of enum crm_get_peer_flags
*
* \return (Possibly newly created) node cache entry
*/
crm_node_t *
crm_get_peer_full(unsigned int id, const char *uname, int flags)
{
crm_node_t *node = NULL;
CRM_ASSERT(id > 0 || uname != NULL);
crm_peer_init();
if (pcmk_is_set(flags, CRM_GET_PEER_REMOTE)) {
node = g_hash_table_lookup(crm_remote_peer_cache, uname);
}
if ((node == NULL) && pcmk_is_set(flags, CRM_GET_PEER_CLUSTER)) {
node = crm_get_peer(id, uname);
}
return node;
}
/*!
* \internal
* \brief Search cluster node cache
*
* \param[in] id If not 0, cluster node ID to search for
* \param[in] uname If not NULL, node name to search for
+ * \param[in] uuid If not NULL while id is 0, node UUID instead of cluster
+ * node ID to search for
*
* \return Cluster node cache entry if found, otherwise NULL
*/
crm_node_t *
-pcmk__search_cluster_node_cache(unsigned int id, const char *uname)
+pcmk__search_cluster_node_cache(unsigned int id, const char *uname,
+ const char *uuid)
{
GHashTableIter iter;
crm_node_t *node = NULL;
crm_node_t *by_id = NULL;
crm_node_t *by_name = NULL;
CRM_ASSERT(id > 0 || uname != NULL);
crm_peer_init();
if (uname != NULL) {
g_hash_table_iter_init(&iter, crm_peer_cache);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
if(node->uname && strcasecmp(node->uname, uname) == 0) {
crm_trace("Name match: %s = %p", node->uname, node);
by_name = node;
break;
}
}
}
if (id > 0) {
g_hash_table_iter_init(&iter, crm_peer_cache);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
if(node->id == id) {
crm_trace("ID match: %u = %p", node->id, node);
by_id = node;
break;
}
}
+
+ } else if (uuid != NULL) {
+ g_hash_table_iter_init(&iter, crm_peer_cache);
+ while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
+ if (pcmk__str_eq(node->uuid, uuid, pcmk__str_casei)) {
+ crm_trace("UUID match: %s = %p", node->uuid, node);
+ by_id = node;
+ break;
+ }
+ }
}
node = by_id; /* Good default */
if(by_id == by_name) {
/* Nothing to do if they match (both NULL counts) */
crm_trace("Consistent: %p for %u/%s", by_id, id, uname);
} else if(by_id == NULL && by_name) {
crm_trace("Only one: %p for %u/%s", by_name, id, uname);
if(id && by_name->id) {
dump_peer_hash(LOG_WARNING, __func__);
crm_crit("Node %u and %u share the same name '%s'",
id, by_name->id, uname);
node = NULL; /* Create a new one */
} else {
node = by_name;
}
} else if(by_name == NULL && by_id) {
crm_trace("Only one: %p for %u/%s", by_id, id, uname);
if(uname && by_id->uname) {
dump_peer_hash(LOG_WARNING, __func__);
crm_crit("Node '%s' and '%s' share the same cluster nodeid %u: assuming '%s' is correct",
uname, by_id->uname, id, uname);
}
} else if(uname && by_id->uname) {
if(pcmk__str_eq(uname, by_id->uname, pcmk__str_casei)) {
crm_notice("Node '%s' has changed its ID from %u to %u", by_id->uname, by_name->id, by_id->id);
g_hash_table_foreach_remove(crm_peer_cache, hash_find_by_data, by_name);
} else {
crm_warn("Node '%s' and '%s' share the same cluster nodeid: %u %s", by_id->uname, by_name->uname, id, uname);
dump_peer_hash(LOG_INFO, __func__);
crm_abort(__FILE__, __func__, __LINE__, "member weirdness", TRUE,
TRUE);
}
} else if(id && by_name->id) {
crm_warn("Node %u and %u share the same name: '%s'", by_id->id, by_name->id, uname);
} else {
/* Simple merge */
/* Only corosync-based clusters use node IDs. The functions that call
* pcmk__update_peer_state() and crm_update_peer_proc() only know
* nodeid, so 'by_id' is authoritative when merging.
*/
dump_peer_hash(LOG_DEBUG, __func__);
crm_info("Merging %p into %p", by_name, by_id);
g_hash_table_foreach_remove(crm_peer_cache, hash_find_by_data, by_name);
}
return node;
}
#if SUPPORT_COROSYNC
static guint
remove_conflicting_peer(crm_node_t *node)
{
int matches = 0;
GHashTableIter iter;
crm_node_t *existing_node = NULL;
if (node->id == 0 || node->uname == NULL) {
return 0;
}
if (!pcmk__corosync_has_nodelist()) {
return 0;
}
g_hash_table_iter_init(&iter, crm_peer_cache);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &existing_node)) {
if (existing_node->id > 0
&& existing_node->id != node->id
&& existing_node->uname != NULL
&& strcasecmp(existing_node->uname, node->uname) == 0) {
if (crm_is_peer_active(existing_node)) {
continue;
}
crm_warn("Removing cached offline node %u/%s which has conflicting uname with %u",
existing_node->id, existing_node->uname, node->id);
g_hash_table_iter_remove(&iter);
matches++;
}
}
return matches;
}
#endif
/*!
* \brief Get a cluster node cache entry
*
* \param[in] id If not 0, cluster node ID to search for
* \param[in] uname If not NULL, node name to search for
*
* \return (Possibly newly created) cluster node cache entry
*/
/* coverity[-alloc] Memory is referenced in one or both hashtables */
crm_node_t *
crm_get_peer(unsigned int id, const char *uname)
{
crm_node_t *node = NULL;
char *uname_lookup = NULL;
CRM_ASSERT(id > 0 || uname != NULL);
crm_peer_init();
- node = pcmk__search_cluster_node_cache(id, uname);
+ node = pcmk__search_cluster_node_cache(id, uname, NULL);
/* if uname wasn't provided, and find_peer did not turn up a uname based on id.
* we need to do a lookup of the node name using the id in the cluster membership. */
if ((node == NULL || node->uname == NULL) && (uname == NULL)) {
uname_lookup = get_node_name(id);
}
if (uname_lookup) {
uname = uname_lookup;
crm_trace("Inferred a name of '%s' for node %u", uname, id);
/* try to turn up the node one more time now that we know the uname. */
if (node == NULL) {
- node = pcmk__search_cluster_node_cache(id, uname);
+ node = pcmk__search_cluster_node_cache(id, uname, NULL);
}
}
if (node == NULL) {
char *uniqueid = crm_generate_uuid();
node = calloc(1, sizeof(crm_node_t));
CRM_ASSERT(node);
crm_info("Created entry %s/%p for node %s/%u (%d total)",
uniqueid, node, uname, id, 1 + g_hash_table_size(crm_peer_cache));
g_hash_table_replace(crm_peer_cache, uniqueid, node);
}
if(id > 0 && uname && (node->id == 0 || node->uname == NULL)) {
crm_info("Node %u is now known as %s", id, uname);
}
if(id > 0 && node->id == 0) {
node->id = id;
}
if (uname && (node->uname == NULL)) {
update_peer_uname(node, uname);
}
if(node->uuid == NULL) {
const char *uuid = crm_peer_uuid(node);
if (uuid) {
crm_info("Node %u has uuid %s", id, uuid);
} else {
crm_info("Cannot obtain a UUID for node %u/%s", id, node->uname);
}
}
free(uname_lookup);
return node;
}
/*!
* \internal
* \brief Update a node's uname
*
* \param[in,out] node Node object to update
* \param[in] uname New name to set
*
* \note This function should not be called within a peer cache iteration,
* because in some cases it can remove conflicting cache entries,
* which would invalidate the iterator.
*/
static void
update_peer_uname(crm_node_t *node, const char *uname)
{
CRM_CHECK(uname != NULL,
crm_err("Bug: can't update node name without name"); return);
CRM_CHECK(node != NULL,
crm_err("Bug: can't update node name to %s without node", uname);
return);
if (pcmk__str_eq(uname, node->uname, pcmk__str_casei)) {
crm_debug("Node uname '%s' did not change", uname);
return;
}
for (const char *c = uname; *c; ++c) {
if ((*c >= 'A') && (*c <= 'Z')) {
crm_warn("Node names with capitals are discouraged, consider changing '%s'",
uname);
break;
}
}
pcmk__str_update(&node->uname, uname);
if (peer_status_callback != NULL) {
peer_status_callback(crm_status_uname, node, NULL);
}
#if SUPPORT_COROSYNC
if (is_corosync_cluster() && !pcmk_is_set(node->flags, crm_remote_node)) {
remove_conflicting_peer(node);
}
#endif
}
/*!
* \internal
* \brief Get log-friendly string equivalent of a process flag
*
* \param[in] proc Process flag
*
* \return Log-friendly string equivalent of \p proc
*/
static inline const char *
proc2text(enum crm_proc_flag proc)
{
const char *text = "unknown";
switch (proc) {
case crm_proc_none:
text = "none";
break;
case crm_proc_based:
text = "pacemaker-based";
break;
case crm_proc_controld:
text = "pacemaker-controld";
break;
case crm_proc_schedulerd:
text = "pacemaker-schedulerd";
break;
case crm_proc_execd:
text = "pacemaker-execd";
break;
case crm_proc_attrd:
text = "pacemaker-attrd";
break;
case crm_proc_fenced:
text = "pacemaker-fenced";
break;
case crm_proc_cpg:
text = "corosync-cpg";
break;
}
return text;
}
/*!
* \internal
* \brief Update a node's process information (and potentially state)
*
* \param[in] source Caller's function name (for log messages)
* \param[in,out] node Node object to update
* \param[in] flag Bitmask of new process information
* \param[in] status node status (online, offline, etc.)
*
* \return NULL if any node was reaped from peer caches, value of node otherwise
*
* \note If this function returns NULL, the supplied node object was likely
* freed and should not be used again. This function should not be
* called within a cache iteration if reaping is possible, otherwise
* reaping could invalidate the iterator.
*/
crm_node_t *
crm_update_peer_proc(const char *source, crm_node_t * node, uint32_t flag, const char *status)
{
uint32_t last = 0;
gboolean changed = FALSE;
CRM_CHECK(node != NULL, crm_err("%s: Could not set %s to %s for NULL",
source, proc2text(flag), status);
return NULL);
/* Pacemaker doesn't spawn processes on remote nodes */
if (pcmk_is_set(node->flags, crm_remote_node)) {
return node;
}
last = node->processes;
if (status == NULL) {
node->processes = flag;
if (node->processes != last) {
changed = TRUE;
}
} else if (pcmk__str_eq(status, ONLINESTATUS, pcmk__str_casei)) {
if ((node->processes & flag) != flag) {
node->processes = pcmk__set_flags_as(__func__, __LINE__,
LOG_TRACE, "Peer process",
node->uname, node->processes,
flag, "processes");
changed = TRUE;
}
} else if (node->processes & flag) {
node->processes = pcmk__clear_flags_as(__func__, __LINE__,
LOG_TRACE, "Peer process",
node->uname, node->processes,
flag, "processes");
changed = TRUE;
}
if (changed) {
if (status == NULL && flag <= crm_proc_none) {
crm_info("%s: Node %s[%u] - all processes are now offline", source, node->uname,
node->id);
} else {
crm_info("%s: Node %s[%u] - %s is now %s", source, node->uname, node->id,
proc2text(flag), status);
}
if (pcmk_is_set(node->processes, crm_get_cluster_proc())) {
node->when_online = time(NULL);
} else {
node->when_online = 0;
}
/* Call the client callback first, then update the peer state,
* in case the node will be reaped
*/
if (peer_status_callback != NULL) {
peer_status_callback(crm_status_processes, node, &last);
}
/* The client callback shouldn't touch the peer caches,
* but as a safety net, bail if the peer cache was destroyed.
*/
if (crm_peer_cache == NULL) {
return NULL;
}
if (crm_autoreap) {
const char *peer_state = NULL;
if (pcmk_is_set(node->processes, crm_get_cluster_proc())) {
peer_state = CRM_NODE_MEMBER;
} else {
peer_state = CRM_NODE_LOST;
}
node = pcmk__update_peer_state(__func__, node, peer_state, 0);
}
} else {
crm_trace("%s: Node %s[%u] - %s is unchanged (%s)", source, node->uname, node->id,
proc2text(flag), status);
}
return node;
}
/*!
* \internal
* \brief Update a cluster node cache entry's expected join state
*
* \param[in] source Caller's function name (for logging)
* \param[in,out] node Node to update
* \param[in] expected Node's new join state
*/
void
pcmk__update_peer_expected(const char *source, crm_node_t *node,
const char *expected)
{
char *last = NULL;
gboolean changed = FALSE;
CRM_CHECK(node != NULL, crm_err("%s: Could not set 'expected' to %s", source, expected);
return);
/* Remote nodes don't participate in joins */
if (pcmk_is_set(node->flags, crm_remote_node)) {
return;
}
last = node->expected;
if (expected != NULL && !pcmk__str_eq(node->expected, expected, pcmk__str_casei)) {
node->expected = strdup(expected);
changed = TRUE;
}
if (changed) {
crm_info("%s: Node %s[%u] - expected state is now %s (was %s)", source, node->uname, node->id,
expected, last);
free(last);
} else {
crm_trace("%s: Node %s[%u] - expected state is unchanged (%s)", source, node->uname,
node->id, expected);
}
}
/*!
* \internal
* \brief Update a node's state and membership information
*
* \param[in] source Caller's function name (for log messages)
* \param[in,out] node Node object to update
* \param[in] state Node's new state
* \param[in] membership Node's new membership ID
* \param[in,out] iter If not NULL, pointer to node's peer cache iterator
*
* \return NULL if any node was reaped, value of node otherwise
*
* \note If this function returns NULL, the supplied node object was likely
* freed and should not be used again. This function may be called from
* within a peer cache iteration if the iterator is supplied.
*/
static crm_node_t *
update_peer_state_iter(const char *source, crm_node_t *node, const char *state,
uint64_t membership, GHashTableIter *iter)
{
gboolean is_member;
CRM_CHECK(node != NULL,
crm_err("Could not set state for unknown host to %s"
CRM_XS " source=%s", state, source);
return NULL);
is_member = pcmk__str_eq(state, CRM_NODE_MEMBER, pcmk__str_casei);
if (is_member) {
node->when_lost = 0;
if (membership) {
node->last_seen = membership;
}
}
if (state && !pcmk__str_eq(node->state, state, pcmk__str_casei)) {
char *last = node->state;
if (is_member) {
node->when_member = time(NULL);
} else {
node->when_member = 0;
}
node->state = strdup(state);
crm_notice("Node %s state is now %s " CRM_XS
" nodeid=%u previous=%s source=%s", node->uname, state,
node->id, (last? last : "unknown"), source);
if (peer_status_callback != NULL) {
peer_status_callback(crm_status_nstate, node, last);
}
free(last);
if (crm_autoreap && !is_member
&& !pcmk_is_set(node->flags, crm_remote_node)) {
/* We only autoreap from the peer cache, not the remote peer cache,
* because the latter should be managed only by
* crm_remote_peer_cache_refresh().
*/
if(iter) {
crm_notice("Purged 1 peer with id=%u and/or uname=%s from the membership cache", node->id, node->uname);
g_hash_table_iter_remove(iter);
} else {
reap_crm_member(node->id, node->uname);
}
node = NULL;
}
} else {
crm_trace("Node %s state is unchanged (%s) " CRM_XS
" nodeid=%u source=%s", node->uname, state, node->id, source);
}
return node;
}
/*!
* \brief Update a node's state and membership information
*
* \param[in] source Caller's function name (for log messages)
* \param[in,out] node Node object to update
* \param[in] state Node's new state
* \param[in] membership Node's new membership ID
*
* \return NULL if any node was reaped, value of node otherwise
*
* \note If this function returns NULL, the supplied node object was likely
* freed and should not be used again. This function should not be
* called within a cache iteration if reaping is possible,
* otherwise reaping could invalidate the iterator.
*/
crm_node_t *
pcmk__update_peer_state(const char *source, crm_node_t *node,
const char *state, uint64_t membership)
{
return update_peer_state_iter(source, node, state, membership, NULL);
}
/*!
* \internal
* \brief Reap all nodes from cache whose membership information does not match
*
* \param[in] membership Membership ID of nodes to keep
*/
void
pcmk__reap_unseen_nodes(uint64_t membership)
{
GHashTableIter iter;
crm_node_t *node = NULL;
crm_trace("Reaping unseen nodes...");
g_hash_table_iter_init(&iter, crm_peer_cache);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *)&node)) {
if (node->last_seen != membership) {
if (node->state) {
/*
* Calling update_peer_state_iter() allows us to
* remove the node from crm_peer_cache without
* invalidating our iterator
*/
update_peer_state_iter(__func__, node, CRM_NODE_LOST,
membership, &iter);
} else {
crm_info("State of node %s[%u] is still unknown",
node->uname, node->id);
}
}
}
}
static crm_node_t *
find_known_node(const char *id, const char *uname)
{
GHashTableIter iter;
crm_node_t *node = NULL;
crm_node_t *by_id = NULL;
crm_node_t *by_name = NULL;
if (uname) {
g_hash_table_iter_init(&iter, known_node_cache);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
if (node->uname && strcasecmp(node->uname, uname) == 0) {
crm_trace("Name match: %s = %p", node->uname, node);
by_name = node;
break;
}
}
}
if (id) {
g_hash_table_iter_init(&iter, known_node_cache);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
if(strcasecmp(node->uuid, id) == 0) {
crm_trace("ID match: %s= %p", id, node);
by_id = node;
break;
}
}
}
node = by_id; /* Good default */
if (by_id == by_name) {
/* Nothing to do if they match (both NULL counts) */
crm_trace("Consistent: %p for %s/%s", by_id, id, uname);
} else if (by_id == NULL && by_name) {
crm_trace("Only one: %p for %s/%s", by_name, id, uname);
if (id) {
node = NULL;
} else {
node = by_name;
}
} else if (by_name == NULL && by_id) {
crm_trace("Only one: %p for %s/%s", by_id, id, uname);
if (uname) {
node = NULL;
}
} else if (uname && by_id->uname
&& pcmk__str_eq(uname, by_id->uname, pcmk__str_casei)) {
/* Multiple nodes have the same uname in the CIB.
* Return by_id. */
} else if (id && by_name->uuid
&& pcmk__str_eq(id, by_name->uuid, pcmk__str_casei)) {
/* Multiple nodes have the same id in the CIB.
* Return by_name. */
node = by_name;
} else {
node = NULL;
}
if (node == NULL) {
crm_debug("Couldn't find node%s%s%s%s",
id? " " : "",
id? id : "",
uname? " with name " : "",
uname? uname : "");
}
return node;
}
static void
known_node_cache_refresh_helper(xmlNode *xml_node, void *user_data)
{
const char *id = crm_element_value(xml_node, XML_ATTR_ID);
const char *uname = crm_element_value(xml_node, XML_ATTR_UNAME);
crm_node_t * node = NULL;
CRM_CHECK(id != NULL && uname !=NULL, return);
node = find_known_node(id, uname);
if (node == NULL) {
char *uniqueid = crm_generate_uuid();
node = calloc(1, sizeof(crm_node_t));
CRM_ASSERT(node != NULL);
node->uname = strdup(uname);
CRM_ASSERT(node->uname != NULL);
node->uuid = strdup(id);
CRM_ASSERT(node->uuid != NULL);
g_hash_table_replace(known_node_cache, uniqueid, node);
} else if (pcmk_is_set(node->flags, crm_node_dirty)) {
pcmk__str_update(&node->uname, uname);
/* Node is in cache and hasn't been updated already, so mark it clean */
clear_peer_flags(node, crm_node_dirty);
}
}
static void
refresh_known_node_cache(xmlNode *cib)
{
crm_peer_init();
g_hash_table_foreach(known_node_cache, mark_dirty, NULL);
crm_foreach_xpath_result(cib, PCMK__XP_MEMBER_NODE_CONFIG,
known_node_cache_refresh_helper, NULL);
/* Remove all old cache entries that weren't seen in the CIB */
g_hash_table_foreach_remove(known_node_cache, is_dirty, NULL);
}
void
pcmk__refresh_node_caches_from_cib(xmlNode *cib)
{
crm_remote_peer_cache_refresh(cib);
refresh_known_node_cache(cib);
}
/*!
* \internal
* \brief Search known node cache
*
* \param[in] id If not 0, cluster node ID to search for
* \param[in] uname If not NULL, node name to search for
* \param[in] flags Bitmask of enum crm_get_peer_flags
*
* \return Known node cache entry if found, otherwise NULL
*/
crm_node_t *
pcmk__search_known_node_cache(unsigned int id, const char *uname,
uint32_t flags)
{
crm_node_t *node = NULL;
char *id_str = NULL;
CRM_ASSERT(id > 0 || uname != NULL);
node = pcmk__search_node_caches(id, uname, flags);
if (node || !(flags & CRM_GET_PEER_CLUSTER)) {
return node;
}
if (id > 0) {
id_str = crm_strdup_printf("%u", id);
}
node = find_known_node(id_str, uname);
free(id_str);
return node;
}
// Deprecated functions kept only for backward API compatibility
// LCOV_EXCL_START
#include <crm/cluster/compat.h>
int
crm_terminate_member(int nodeid, const char *uname, void *unused)
{
return stonith_api_kick(nodeid, uname, 120, TRUE);
}
int
crm_terminate_member_no_mainloop(int nodeid, const char *uname, int *connection)
{
return stonith_api_kick(nodeid, uname, 120, TRUE);
}
// LCOV_EXCL_STOP
// End deprecated API
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Nov 23, 3:23 PM (15 h, 53 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1018846
Default Alt Text
(197 KB)
Attached To
Mode
rP Pacemaker
Attached
Detach File
Event Timeline
Log In to Comment