diff --git a/daemons/attrd/attrd_ipc.c b/daemons/attrd/attrd_ipc.c
index b8bf5989bf..94d2f30e4b 100644
--- a/daemons/attrd/attrd_ipc.c
+++ b/daemons/attrd/attrd_ipc.c
@@ -1,631 +1,632 @@
 /*
  * Copyright 2004-2024 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU General Public License version 2
  * or later (GPLv2+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #include <errno.h>
 #include <stdint.h>
 #include <stdlib.h>
 #include <sys/types.h>
 
 #include <crm/cluster.h>
 #include <crm/cluster/internal.h>
 #include <crm/msg_xml.h>
 #include <crm/common/acl_internal.h>
 #include <crm/common/ipc_internal.h>
 #include <crm/common/logging.h>
 #include <crm/common/results.h>
 #include <crm/common/strings_internal.h>
 #include <crm/common/util.h>
 
 #include "pacemaker-attrd.h"
 
 static qb_ipcs_service_t *ipcs = NULL;
 
 /*!
  * \internal
  * \brief Build the XML reply to a client query
  *
  * param[in] attr Name of requested attribute
  * param[in] host Name of requested host (or NULL for all hosts)
  *
  * \return New XML reply
  * \note Caller is responsible for freeing the resulting XML
  */
 static xmlNode *build_query_reply(const char *attr, const char *host)
 {
     xmlNode *reply = create_xml_node(NULL, __func__);
     attribute_t *a;
 
     if (reply == NULL) {
         return NULL;
     }
     crm_xml_add(reply, PCMK__XA_T, T_ATTRD);
     crm_xml_add(reply, PCMK__XA_SUBT, PCMK__ATTRD_CMD_QUERY);
     crm_xml_add(reply, PCMK__XA_ATTR_VERSION, ATTRD_PROTOCOL_VERSION);
 
     /* If desired attribute exists, add its value(s) to the reply */
     a = g_hash_table_lookup(attributes, attr);
     if (a) {
         attribute_value_t *v;
         xmlNode *host_value;
 
         crm_xml_add(reply, PCMK__XA_ATTR_NAME, attr);
 
         /* Allow caller to use "localhost" to refer to local node */
         if (pcmk__str_eq(host, "localhost", pcmk__str_casei)) {
             host = attrd_cluster->uname;
             crm_trace("Mapped localhost to %s", host);
         }
 
         /* If a specific node was requested, add its value */
         if (host) {
             v = g_hash_table_lookup(a->values, host);
             host_value = create_xml_node(reply, XML_CIB_TAG_NODE);
             if (host_value == NULL) {
                 free_xml(reply);
                 return NULL;
             }
             pcmk__xe_add_node(host_value, host, 0);
             crm_xml_add(host_value, PCMK__XA_ATTR_VALUE,
                         (v? v->current : NULL));
 
         /* Otherwise, add all nodes' values */
         } else {
             GHashTableIter iter;
 
             g_hash_table_iter_init(&iter, a->values);
             while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &v)) {
                 host_value = create_xml_node(reply, XML_CIB_TAG_NODE);
                 if (host_value == NULL) {
                     free_xml(reply);
                     return NULL;
                 }
                 pcmk__xe_add_node(host_value, v->nodename, 0);
                 crm_xml_add(host_value, PCMK__XA_ATTR_VALUE, v->current);
             }
         }
     }
     return reply;
 }
 
 xmlNode *
 attrd_client_clear_failure(pcmk__request_t *request)
 {
     xmlNode *xml = request->xml;
     const char *rsc, *op, *interval_spec;
 
     if (minimum_protocol_version >= 2) {
         /* Propagate to all peers (including ourselves).
          * This ends up at attrd_peer_message().
          */
         attrd_send_message(NULL, xml, false);
         pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
         return NULL;
     }
 
     rsc = crm_element_value(xml, PCMK__XA_ATTR_RESOURCE);
     op = crm_element_value(xml, PCMK__XA_ATTR_OPERATION);
     interval_spec = crm_element_value(xml, PCMK__XA_ATTR_INTERVAL);
 
     /* Map this to an update */
     crm_xml_add(xml, PCMK__XA_TASK, PCMK__ATTRD_CMD_UPDATE);
 
     /* Add regular expression matching desired attributes */
 
     if (rsc) {
         char *pattern;
 
         if (op == NULL) {
             pattern = crm_strdup_printf(ATTRD_RE_CLEAR_ONE, rsc);
 
         } else {
             guint interval_ms = 0U;
 
             pcmk_parse_interval_spec(interval_spec, &interval_ms);
             pattern = crm_strdup_printf(ATTRD_RE_CLEAR_OP,
                                         rsc, op, interval_ms);
         }
 
         crm_xml_add(xml, PCMK__XA_ATTR_PATTERN, pattern);
         free(pattern);
 
     } else {
         crm_xml_add(xml, PCMK__XA_ATTR_PATTERN, ATTRD_RE_CLEAR_ALL);
     }
 
     /* Make sure attribute and value are not set, so we delete via regex */
     xml_remove_prop(xml, PCMK__XA_ATTR_NAME);
     xml_remove_prop(xml, PCMK__XA_ATTR_VALUE);
 
     return attrd_client_update(request);
 }
 
 xmlNode *
 attrd_client_peer_remove(pcmk__request_t *request)
 {
     xmlNode *xml = request->xml;
 
     // Host and ID are not used in combination, rather host has precedence
     const char *host = crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME);
     char *host_alloc = NULL;
 
     attrd_send_ack(request->ipc_client, request->ipc_id, request->ipc_flags);
 
     if (host == NULL) {
         int nodeid = 0;
 
         crm_element_value_int(xml, PCMK__XA_ATTR_NODE_ID, &nodeid);
         if (nodeid > 0) {
-            crm_node_t *node = pcmk__search_cluster_node_cache(nodeid, NULL,
-                                                               NULL);
+            crm_node_t *node = NULL;
             char *host_alloc = NULL;
 
+            node = pcmk__search_node_caches(nodeid, NULL,
+                                            pcmk__node_search_cluster);
             if (node && node->uname) {
                 // Use cached name if available
                 host = node->uname;
             } else {
                 // Otherwise ask cluster layer
                 host_alloc = get_node_name(nodeid);
                 host = host_alloc;
             }
             pcmk__xe_add_node(xml, host, 0);
         }
     }
 
     if (host) {
         crm_info("Client %s is requesting all values for %s be removed",
                  pcmk__client_name(request->ipc_client), host);
         attrd_send_message(NULL, xml, false); /* ends up at attrd_peer_message() */
         free(host_alloc);
     } else {
         crm_info("Ignoring request by client %s to remove all peer values without specifying peer",
                  pcmk__client_name(request->ipc_client));
     }
 
     pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
     return NULL;
 }
 
 xmlNode *
 attrd_client_query(pcmk__request_t *request)
 {
     xmlNode *query = request->xml;
     xmlNode *reply = NULL;
     const char *attr = NULL;
 
     crm_debug("Query arrived from %s", pcmk__client_name(request->ipc_client));
 
     /* Request must specify attribute name to query */
     attr = crm_element_value(query, PCMK__XA_ATTR_NAME);
     if (attr == NULL) {
         pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR,
                             "Ignoring malformed query from %s (no attribute name given)",
                             pcmk__client_name(request->ipc_client));
         return NULL;
     }
 
     /* Build the XML reply */
     reply = build_query_reply(attr, crm_element_value(query,
                                                       PCMK__XA_ATTR_NODE_NAME));
     if (reply == NULL) {
         pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR,
                             "Could not respond to query from %s: could not create XML reply",
                             pcmk__client_name(request->ipc_client));
         return NULL;
     } else {
         pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
     }
 
     request->ipc_client->request_id = 0;
     return reply;
 }
 
 xmlNode *
 attrd_client_refresh(pcmk__request_t *request)
 {
     crm_info("Updating all attributes");
 
     attrd_send_ack(request->ipc_client, request->ipc_id, request->ipc_flags);
     attrd_write_attributes(attrd_write_all|attrd_write_no_delay);
 
     pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
     return NULL;
 }
 
 static void
 handle_missing_host(xmlNode *xml)
 {
     const char *host = crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME);
 
     if (host == NULL) {
         crm_trace("Inferring host");
         pcmk__xe_add_node(xml, attrd_cluster->uname, attrd_cluster->nodeid);
     }
 }
 
 /* Convert a single IPC message with a regex into one with multiple children, one
  * for each regex match.
  */
 static int
 expand_regexes(xmlNode *xml, const char *attr, const char *value, const char *regex)
 {
     if (attr == NULL && regex) {
         bool matched = false;
         GHashTableIter aIter;
         regex_t r_patt;
 
         crm_debug("Setting %s to %s", regex, value);
         if (regcomp(&r_patt, regex, REG_EXTENDED|REG_NOSUB)) {
             return EINVAL;
         }
 
         g_hash_table_iter_init(&aIter, attributes);
         while (g_hash_table_iter_next(&aIter, (gpointer *) & attr, NULL)) {
             int status = regexec(&r_patt, attr, 0, NULL, 0);
 
             if (status == 0) {
                 xmlNode *child = create_xml_node(xml, PCMK_XE_OP);
 
                 crm_trace("Matched %s with %s", attr, regex);
                 matched = true;
 
                 /* Copy all the attributes from the parent over, but remove the
                  * regex and replace it with the name.
                  */
                 attrd_copy_xml_attributes(xml, child);
                 xml_remove_prop(child, PCMK__XA_ATTR_PATTERN);
                 crm_xml_add(child, PCMK__XA_ATTR_NAME, attr);
             }
         }
 
         regfree(&r_patt);
 
         /* Return a code if we never matched anything.  This should not be treated
          * as an error.  It indicates there was a regex, and it was a valid regex,
          * but simply did not match anything and the caller should not continue
          * doing any regex-related processing.
          */
         if (!matched) {
             return pcmk_rc_op_unsatisfied;
         }
 
     } else if (attr == NULL) {
         return pcmk_rc_bad_nvpair;
     }
 
     return pcmk_rc_ok;
 }
 
 static int
 handle_regexes(pcmk__request_t *request)
 {
     xmlNode *xml = request->xml;
     int rc = pcmk_rc_ok;
 
     const char *attr = crm_element_value(xml, PCMK__XA_ATTR_NAME);
     const char *value = crm_element_value(xml, PCMK__XA_ATTR_VALUE);
     const char *regex = crm_element_value(xml, PCMK__XA_ATTR_PATTERN);
 
     rc = expand_regexes(xml, attr, value, regex);
 
     if (rc == EINVAL) {
         pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR,
                             "Bad regex '%s' for update from client %s", regex,
                             pcmk__client_name(request->ipc_client));
 
     } else if (rc == pcmk_rc_bad_nvpair) {
         crm_err("Update request did not specify attribute or regular expression");
         pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR,
                             "Client %s update request did not specify attribute or regular expression",
                             pcmk__client_name(request->ipc_client));
     }
 
     return rc;
 }
 
 static int
 handle_value_expansion(const char **value, xmlNode *xml, const char *op,
                        const char *attr)
 {
     attribute_t *a = g_hash_table_lookup(attributes, attr);
 
     if (a == NULL && pcmk__str_eq(op, PCMK__ATTRD_CMD_UPDATE_DELAY, pcmk__str_none)) {
         return EINVAL;
     }
 
     if (*value && attrd_value_needs_expansion(*value)) {
         int int_value;
         attribute_value_t *v = NULL;
 
         if (a) {
             const char *host = crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME);
             v = g_hash_table_lookup(a->values, host);
         }
 
         int_value = attrd_expand_value(*value, (v? v->current : NULL));
 
         crm_info("Expanded %s=%s to %d", attr, *value, int_value);
         crm_xml_add_int(xml, PCMK__XA_ATTR_VALUE, int_value);
 
         /* Replacing the value frees the previous memory, so re-query it */
         *value = crm_element_value(xml, PCMK__XA_ATTR_VALUE);
     }
 
     return pcmk_rc_ok;
 }
 
 static void
 send_update_msg_to_cluster(pcmk__request_t *request, xmlNode *xml)
 {
     if (pcmk__str_eq(attrd_request_sync_point(xml), PCMK__VALUE_CLUSTER, pcmk__str_none)) {
         /* The client is waiting on the cluster-wide sync point.  In this case,
          * the response ACK is not sent until this attrd broadcasts the update
          * and receives its own confirmation back from all peers.
          */
         attrd_expect_confirmations(request, attrd_cluster_sync_point_update);
         attrd_send_message(NULL, xml, true); /* ends up at attrd_peer_message() */
 
     } else {
         /* The client is either waiting on the local sync point or was not
          * waiting on any sync point at all.  For the local sync point, the
          * response ACK is sent in attrd_peer_update.  For clients not
          * waiting on any sync point, the response ACK is sent in
          * handle_update_request immediately before this function was called.
          */
         attrd_send_message(NULL, xml, false); /* ends up at attrd_peer_message() */
     }
 }
 
 static int
 send_child_update(xmlNode *child, void *data)
 {
     pcmk__request_t *request = (pcmk__request_t *) data;
 
     /* Calling pcmk__set_result is handled by one of these calls to
      * attrd_client_update, so no need to do it again here.
      */
     request->xml = child;
     attrd_client_update(request);
     return pcmk_rc_ok;
 }
 
 xmlNode *
 attrd_client_update(pcmk__request_t *request)
 {
     xmlNode *xml = NULL;
     const char *attr, *value, *regex;
 
     CRM_CHECK((request != NULL) && (request->xml != NULL), return NULL);
 
     xml = request->xml;
 
     /* If the message has children, that means it is a message from a newer
      * client that supports sending multiple operations at a time.  There are
      * two ways we can handle that.
      */
     if (xml->children != NULL) {
         if (ATTRD_SUPPORTS_MULTI_MESSAGE(minimum_protocol_version)) {
             /* First, if all peers support a certain protocol version, we can
              * just broadcast the big message and they'll handle it.  However,
              * we also need to apply all the transformations in this function
              * to the children since they don't happen anywhere else.
              */
             for (xmlNode *child = first_named_child(xml, PCMK_XE_OP);
                  child != NULL; child = crm_next_same_xml(child)) {
 
                 attr = crm_element_value(child, PCMK__XA_ATTR_NAME);
                 value = crm_element_value(child, PCMK__XA_ATTR_VALUE);
 
                 handle_missing_host(child);
 
                 if (handle_value_expansion(&value, child, request->op, attr) == EINVAL) {
                     pcmk__format_result(&request->result, CRM_EX_NOSUCH, PCMK_EXEC_ERROR,
                                         "Attribute %s does not exist", attr);
                     return NULL;
                 }
             }
 
             send_update_msg_to_cluster(request, xml);
             pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
 
         } else {
             /* Save the original xml node pointer so it can be restored after iterating
              * over all the children.
              */
             xmlNode *orig_xml = request->xml;
 
             /* Second, if they do not support that protocol version, split it
              * up into individual messages and call attrd_client_update on
              * each one.
              */
             pcmk__xe_foreach_child(xml, PCMK_XE_OP, send_child_update, request);
             request->xml = orig_xml;
         }
 
         return NULL;
     }
 
     attr = crm_element_value(xml, PCMK__XA_ATTR_NAME);
     value = crm_element_value(xml, PCMK__XA_ATTR_VALUE);
     regex = crm_element_value(xml, PCMK__XA_ATTR_PATTERN);
 
     if (handle_regexes(request) != pcmk_rc_ok) {
         /* Error handling was already dealt with in handle_regexes, so just return. */
         return NULL;
     } else if (regex) {
         /* Recursively call attrd_client_update on the new message with regexes
          * expanded.  If supported by the attribute daemon, this means that all
          * matches can also be handled atomically.
          */
         return attrd_client_update(request);
     }
 
     handle_missing_host(xml);
 
     if (handle_value_expansion(&value, xml, request->op, attr) == EINVAL) {
         pcmk__format_result(&request->result, CRM_EX_NOSUCH, PCMK_EXEC_ERROR,
                             "Attribute %s does not exist", attr);
         return NULL;
     }
 
     crm_debug("Broadcasting %s[%s]=%s%s", attr, crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME),
               value, (attrd_election_won()? " (writer)" : ""));
 
     send_update_msg_to_cluster(request, xml);
     pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL);
     return NULL;
 }
 
 /*!
  * \internal
  * \brief Accept a new client IPC connection
  *
  * \param[in,out] c    New connection
  * \param[in]     uid  Client user id
  * \param[in]     gid  Client group id
  *
  * \return pcmk_ok on success, -errno otherwise
  */
 static int32_t
 attrd_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid)
 {
     crm_trace("New client connection %p", c);
     if (attrd_shutting_down(false)) {
         crm_info("Ignoring new connection from pid %d during shutdown",
                  pcmk__client_pid(c));
         return -EPERM;
     }
 
     if (pcmk__new_client(c, uid, gid) == NULL) {
         return -EIO;
     }
     return pcmk_ok;
 }
 
 /*!
  * \internal
  * \brief Destroy a client IPC connection
  *
  * \param[in] c  Connection to destroy
  *
  * \return FALSE (i.e. do not re-run this callback)
  */
 static int32_t
 attrd_ipc_closed(qb_ipcs_connection_t *c)
 {
     pcmk__client_t *client = pcmk__find_client(c);
 
     if (client == NULL) {
         crm_trace("Ignoring request to clean up unknown connection %p", c);
     } else {
         crm_trace("Cleaning up closed client connection %p", c);
 
         /* Remove the client from the sync point waitlist if it's present. */
         attrd_remove_client_from_waitlist(client);
 
         /* And no longer wait for confirmations from any peers. */
         attrd_do_not_wait_for_client(client);
 
         pcmk__free_client(client);
     }
 
     return FALSE;
 }
 
 /*!
  * \internal
  * \brief Destroy a client IPC connection
  *
  * \param[in,out] c  Connection to destroy
  *
  * \note We handle a destroyed connection the same as a closed one,
  *       but we need a separate handler because the return type is different.
  */
 static void
 attrd_ipc_destroy(qb_ipcs_connection_t *c)
 {
     crm_trace("Destroying client connection %p", c);
     attrd_ipc_closed(c);
 }
 
 static int32_t
 attrd_ipc_dispatch(qb_ipcs_connection_t * c, void *data, size_t size)
 {
     uint32_t id = 0;
     uint32_t flags = 0;
     pcmk__client_t *client = pcmk__find_client(c);
     xmlNode *xml = NULL;
 
     // Sanity-check, and parse XML from IPC data
     CRM_CHECK((c != NULL) && (client != NULL), return 0);
     if (data == NULL) {
         crm_debug("No IPC data from PID %d", pcmk__client_pid(c));
         return 0;
     }
 
     xml = pcmk__client_data2xml(client, data, &id, &flags);
 
     if (xml == NULL) {
         crm_debug("Unrecognizable IPC data from PID %d", pcmk__client_pid(c));
         pcmk__ipc_send_ack(client, id, flags, "ack", NULL, CRM_EX_PROTOCOL);
         return 0;
 
     } else {
         pcmk__request_t request = {
             .ipc_client     = client,
             .ipc_id         = id,
             .ipc_flags      = flags,
             .peer           = NULL,
             .xml            = xml,
             .call_options   = 0,
             .result         = PCMK__UNKNOWN_RESULT,
         };
 
         CRM_ASSERT(client->user != NULL);
         pcmk__update_acl_user(xml, PCMK__XA_ATTR_USER, client->user);
 
         request.op = crm_element_value_copy(request.xml, PCMK__XA_TASK);
         CRM_CHECK(request.op != NULL, return 0);
 
         attrd_handle_request(&request);
         pcmk__reset_request(&request);
     }
 
     free_xml(xml);
     return 0;
 }
 
 static struct qb_ipcs_service_handlers ipc_callbacks = {
     .connection_accept = attrd_ipc_accept,
     .connection_created = NULL,
     .msg_process = attrd_ipc_dispatch,
     .connection_closed = attrd_ipc_closed,
     .connection_destroyed = attrd_ipc_destroy
 };
 
 void
 attrd_ipc_fini(void)
 {
     if (ipcs != NULL) {
         pcmk__drop_all_clients(ipcs);
         qb_ipcs_destroy(ipcs);
         ipcs = NULL;
     }
 }
 
 /*!
  * \internal
  * \brief Set up attrd IPC communication
  */
 void
 attrd_init_ipc(void)
 {
     pcmk__serve_attrd_ipc(&ipcs, &ipc_callbacks);
 }
diff --git a/daemons/based/based_messages.c b/daemons/based/based_messages.c
index a31a8f2ae6..e7d83ec593 100644
--- a/daemons/based/based_messages.c
+++ b/daemons/based/based_messages.c
@@ -1,529 +1,532 @@
 /*
  * Copyright 2004-2024 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU General Public License version 2
  * or later (GPLv2+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #include <stdio.h>
 #include <unistd.h>
 #include <stdlib.h>
 #include <errno.h>
 #include <fcntl.h>
 #include <time.h>
 
 #include <sys/param.h>
 #include <sys/types.h>
 
 #include <glib.h>
 #include <libxml/tree.h>
 
 #include <crm/crm.h>
 #include <crm/cib/internal.h>
 #include <crm/msg_xml.h>
 
 #include <crm/common/xml.h>
 #include <crm/common/ipc_internal.h>
 #include <crm/common/xml_internal.h>
 #include <crm/cluster/internal.h>
 
 #include <pacemaker-based.h>
 
 /* Maximum number of diffs to ignore while waiting for a resync */
 #define MAX_DIFF_RETRY 5
 
 bool based_is_primary = false;
 
 xmlNode *the_cib = NULL;
 
 int
 cib_process_shutdown_req(const char *op, int options, const char *section, xmlNode * req,
                          xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
                          xmlNode ** answer)
 {
     const char *host = crm_element_value(req, PCMK__XA_SRC);
 
     *answer = NULL;
 
     if (crm_element_value(req, F_CIB_ISREPLY) == NULL) {
         crm_info("Peer %s is requesting to shut down", host);
         return pcmk_ok;
     }
 
     if (cib_shutdown_flag == FALSE) {
         crm_err("Peer %s mistakenly thinks we wanted to shut down", host);
         return -EINVAL;
     }
 
     crm_info("Peer %s has acknowledged our shutdown request", host);
     terminate_cib(__func__, 0);
     return pcmk_ok;
 }
 
 // @COMPAT: Remove when PCMK__CIB_REQUEST_NOOP is removed
 int
 cib_process_noop(const char *op, int options, const char *section, xmlNode *req,
                  xmlNode *input, xmlNode *existing_cib, xmlNode **result_cib,
                  xmlNode **answer)
 {
     crm_trace("Processing \"%s\" event", op);
     *answer = NULL;
     return pcmk_ok;
 }
 
 int
 cib_process_readwrite(const char *op, int options, const char *section, xmlNode * req,
                       xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
                       xmlNode ** answer)
 {
     int result = pcmk_ok;
 
     crm_trace("Processing \"%s\" event", op);
 
     if (pcmk__str_eq(op, PCMK__CIB_REQUEST_IS_PRIMARY, pcmk__str_none)) {
         if (based_is_primary) {
             result = pcmk_ok;
         } else {
             result = -EPERM;
         }
         return result;
     }
 
     if (pcmk__str_eq(op, PCMK__CIB_REQUEST_PRIMARY, pcmk__str_none)) {
         if (!based_is_primary) {
             crm_info("We are now in R/W mode");
             based_is_primary = true;
         } else {
             crm_debug("We are still in R/W mode");
         }
 
     } else if (based_is_primary) {
         crm_info("We are now in R/O mode");
         based_is_primary = false;
     }
 
     return result;
 }
 
 /* Set to 1 when a sync is requested, incremented when a diff is ignored,
  * reset to 0 when a sync is received
  */
 static int sync_in_progress = 0;
 
 void
 send_sync_request(const char *host)
 {
     xmlNode *sync_me = create_xml_node(NULL, "sync-me");
     crm_node_t *peer = NULL;
 
     crm_info("Requesting re-sync from %s", (host? host : "all peers"));
     sync_in_progress = 1;
 
     crm_xml_add(sync_me, PCMK__XA_T, T_CIB);
     crm_xml_add(sync_me, F_CIB_OPERATION, PCMK__CIB_REQUEST_SYNC_TO_ONE);
     crm_xml_add(sync_me, F_CIB_DELEGATED,
                 stand_alone? "localhost" : crm_cluster->uname);
 
     if (host != NULL) {
         peer = pcmk__get_node(0, host, NULL, pcmk__node_search_cluster);
     }
     send_cluster_message(peer, crm_msg_cib, sync_me, FALSE);
     free_xml(sync_me);
 }
 
 int
 cib_process_ping(const char *op, int options, const char *section, xmlNode * req, xmlNode * input,
                  xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer)
 {
     const char *host = crm_element_value(req, PCMK__XA_SRC);
     const char *seq = crm_element_value(req, F_CIB_PING_ID);
     char *digest = calculate_xml_versioned_digest(the_cib, FALSE, TRUE, CRM_FEATURE_SET);
 
     crm_trace("Processing \"%s\" event %s from %s", op, seq, host);
     *answer = create_xml_node(NULL, XML_CRM_TAG_PING);
 
     crm_xml_add(*answer, PCMK_XA_CRM_FEATURE_SET, CRM_FEATURE_SET);
     crm_xml_add(*answer, PCMK__XA_DIGEST, digest);
     crm_xml_add(*answer, F_CIB_PING_ID, seq);
 
     pcmk__if_tracing(
         {
             // Append additional detail so the receiver can log the differences
             add_message_xml(*answer, F_CIB_CALLDATA, the_cib);
         },
         if (the_cib != NULL) {
             // Always include at least the version details
             xmlNode *shallow = create_xml_node(NULL,
                                                (const char *) the_cib->name);
 
             copy_in_properties(shallow, the_cib);
             add_message_xml(*answer, F_CIB_CALLDATA, shallow);
             free_xml(shallow);
         }
     );
 
     crm_info("Reporting our current digest to %s: %s for %s.%s.%s",
              host, digest,
              crm_element_value(existing_cib, PCMK_XA_ADMIN_EPOCH),
              crm_element_value(existing_cib, PCMK_XA_EPOCH),
              crm_element_value(existing_cib, PCMK_XA_NUM_UPDATES));
 
     free(digest);
 
     return pcmk_ok;
 }
 
 int
 cib_process_sync(const char *op, int options, const char *section, xmlNode * req, xmlNode * input,
                  xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer)
 {
     return sync_our_cib(req, TRUE);
 }
 
 int
 cib_process_upgrade_server(const char *op, int options, const char *section, xmlNode * req, xmlNode * input,
                            xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer)
 {
     int rc = pcmk_ok;
 
     *answer = NULL;
 
     if(crm_element_value(req, F_CIB_SCHEMA_MAX)) {
         /* The originator of an upgrade request sends it to the DC, without
          * F_CIB_SCHEMA_MAX. If an upgrade is needed, the DC re-broadcasts the
          * request with F_CIB_SCHEMA_MAX, and each node performs the upgrade
          * (and notifies its local clients) here.
          */
         return cib_process_upgrade(
             op, options, section, req, input, existing_cib, result_cib, answer);
 
     } else {
         int new_version = 0;
         int current_version = 0;
         xmlNode *scratch = copy_xml(existing_cib);
         const char *host = crm_element_value(req, PCMK__XA_SRC);
         const char *value = crm_element_value(existing_cib,
                                               PCMK_XA_VALIDATE_WITH);
         const char *client_id = crm_element_value(req, F_CIB_CLIENTID);
         const char *call_opts = crm_element_value(req, F_CIB_CALLOPTS);
         const char *call_id = crm_element_value(req, F_CIB_CALLID);
 
         crm_trace("Processing \"%s\" event", op);
         if (value != NULL) {
             current_version = get_schema_version(value);
         }
 
         rc = update_validation(&scratch, &new_version, 0, TRUE, TRUE);
         if (new_version > current_version) {
             xmlNode *up = create_xml_node(NULL, __func__);
 
             rc = pcmk_ok;
             crm_notice("Upgrade request from %s verified", host);
 
             crm_xml_add(up, PCMK__XA_T, T_CIB);
             crm_xml_add(up, F_CIB_OPERATION, PCMK__CIB_REQUEST_UPGRADE);
             crm_xml_add(up, F_CIB_SCHEMA_MAX, get_schema_name(new_version));
             crm_xml_add(up, F_CIB_DELEGATED, host);
             crm_xml_add(up, F_CIB_CLIENTID, client_id);
             crm_xml_add(up, F_CIB_CALLOPTS, call_opts);
             crm_xml_add(up, F_CIB_CALLID, call_id);
 
             if (cib_legacy_mode() && based_is_primary) {
                 rc = cib_process_upgrade(
                     op, options, section, up, input, existing_cib, result_cib, answer);
 
             } else {
                 send_cluster_message(NULL, crm_msg_cib, up, FALSE);
             }
 
             free_xml(up);
 
         } else if(rc == pcmk_ok) {
             rc = -pcmk_err_schema_unchanged;
         }
 
         if (rc != pcmk_ok) {
             // Notify originating peer so it can notify its local clients
-            crm_node_t *origin = pcmk__search_cluster_node_cache(0, host, NULL);
+            crm_node_t *origin = NULL;
+
+            origin = pcmk__search_node_caches(0, host,
+                                              pcmk__node_search_cluster);
 
             crm_info("Rejecting upgrade request from %s: %s "
                      CRM_XS " rc=%d peer=%s", host, pcmk_strerror(rc), rc,
                      (origin? origin->uname : "lost"));
 
             if (origin) {
                 xmlNode *up = create_xml_node(NULL, __func__);
 
                 crm_xml_add(up, PCMK__XA_T, T_CIB);
                 crm_xml_add(up, F_CIB_OPERATION, PCMK__CIB_REQUEST_UPGRADE);
                 crm_xml_add(up, F_CIB_DELEGATED, host);
                 crm_xml_add(up, F_CIB_ISREPLY, host);
                 crm_xml_add(up, F_CIB_CLIENTID, client_id);
                 crm_xml_add(up, F_CIB_CALLOPTS, call_opts);
                 crm_xml_add(up, F_CIB_CALLID, call_id);
                 crm_xml_add_int(up, F_CIB_UPGRADE_RC, rc);
                 if (send_cluster_message(origin, crm_msg_cib, up, TRUE)
                     == FALSE) {
                     crm_warn("Could not send CIB upgrade result to %s", host);
                 }
                 free_xml(up);
             }
         }
         free_xml(scratch);
     }
     return rc;
 }
 
 int
 cib_process_sync_one(const char *op, int options, const char *section, xmlNode * req,
                      xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
                      xmlNode ** answer)
 {
     return sync_our_cib(req, FALSE);
 }
 
 int
 cib_server_process_diff(const char *op, int options, const char *section, xmlNode * req,
                         xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
                         xmlNode ** answer)
 {
     int rc = pcmk_ok;
 
     if (sync_in_progress > MAX_DIFF_RETRY) {
         /* Don't ignore diffs forever; the last request may have been lost.
          * If the diff fails, we'll ask for another full resync.
          */
         sync_in_progress = 0;
     }
 
     // The primary instance should never ignore a diff
     if (sync_in_progress && !based_is_primary) {
         int diff_add_updates = 0;
         int diff_add_epoch = 0;
         int diff_add_admin_epoch = 0;
 
         int diff_del_updates = 0;
         int diff_del_epoch = 0;
         int diff_del_admin_epoch = 0;
 
         cib_diff_version_details(input,
                                  &diff_add_admin_epoch, &diff_add_epoch, &diff_add_updates,
                                  &diff_del_admin_epoch, &diff_del_epoch, &diff_del_updates);
 
         sync_in_progress++;
         crm_notice("Not applying diff %d.%d.%d -> %d.%d.%d (sync in progress)",
                    diff_del_admin_epoch, diff_del_epoch, diff_del_updates,
                    diff_add_admin_epoch, diff_add_epoch, diff_add_updates);
         return -pcmk_err_diff_resync;
     }
 
     rc = cib_process_diff(op, options, section, req, input, existing_cib, result_cib, answer);
     crm_trace("result: %s (%d), %s", pcmk_strerror(rc), rc,
               (based_is_primary? "primary": "secondary"));
 
     if ((rc == -pcmk_err_diff_resync) && !based_is_primary) {
         free_xml(*result_cib);
         *result_cib = NULL;
         send_sync_request(NULL);
 
     } else if (rc == -pcmk_err_diff_resync) {
         rc = -pcmk_err_diff_failed;
         if (options & cib_force_diff) {
             crm_warn("Not requesting full refresh in R/W mode");
         }
 
     } else if ((rc != pcmk_ok) && !based_is_primary && cib_legacy_mode()) {
         crm_warn("Requesting full CIB refresh because update failed: %s"
                  CRM_XS " rc=%d", pcmk_strerror(rc), rc);
 
         pcmk__log_xml_patchset(LOG_INFO, input);
         free_xml(*result_cib);
         *result_cib = NULL;
         send_sync_request(NULL);
     }
 
     return rc;
 }
 
 int
 cib_process_replace_svr(const char *op, int options, const char *section, xmlNode * req,
                         xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
                         xmlNode ** answer)
 {
     int rc =
         cib_process_replace(op, options, section, req, input, existing_cib, result_cib, answer);
 
     if ((rc == pcmk_ok) && pcmk__xe_is(input, XML_TAG_CIB)) {
         sync_in_progress = 0;
     }
     return rc;
 }
 
 // @COMPAT: Remove when PCMK__CIB_REQUEST_ABS_DELETE is removed
 int
 cib_process_delete_absolute(const char *op, int options, const char *section, xmlNode * req,
                             xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
                             xmlNode ** answer)
 {
     return -EINVAL;
 }
 
 static xmlNode *
 cib_msg_copy(xmlNode *msg)
 {
     static const char *field_list[] = {
         PCMK__XA_T,
         F_CIB_CLIENTID,
         F_CIB_CALLOPTS,
         F_CIB_CALLID,
         F_CIB_OPERATION,
         F_CIB_ISREPLY,
         F_CIB_SECTION,
         F_CIB_HOST,
         F_CIB_RC,
         F_CIB_DELEGATED,
         F_CIB_OBJID,
         F_CIB_OBJTYPE,
         F_CIB_EXISTING,
         F_CIB_SEENCOUNT,
         F_CIB_TIMEOUT,
         F_CIB_GLOBAL_UPDATE,
         F_CIB_CLIENTNAME,
         F_CIB_USER,
         F_CIB_NOTIFY_TYPE,
         F_CIB_NOTIFY_ACTIVATE
     };
 
     xmlNode *copy = create_xml_node(NULL, "copy");
 
     CRM_ASSERT(copy != NULL);
 
     for (int lpc = 0; lpc < PCMK__NELEM(field_list); lpc++) {
         const char *field = field_list[lpc];
         const char *value = crm_element_value(msg, field);
 
         if (value != NULL) {
             crm_xml_add(copy, field, value);
         }
     }
 
     return copy;
 }
 
 int
 sync_our_cib(xmlNode * request, gboolean all)
 {
     int result = pcmk_ok;
     char *digest = NULL;
     const char *host = crm_element_value(request, PCMK__XA_SRC);
     const char *op = crm_element_value(request, F_CIB_OPERATION);
     crm_node_t *peer = NULL;
     xmlNode *replace_request = NULL;
 
     CRM_CHECK(the_cib != NULL, return -EINVAL);
     CRM_CHECK(all || (host != NULL), return -EINVAL);
 
     crm_debug("Syncing CIB to %s", all ? "all peers" : host);
 
     replace_request = cib_msg_copy(request);
 
     if (host != NULL) {
         crm_xml_add(replace_request, F_CIB_ISREPLY, host);
     }
     if (all) {
         xml_remove_prop(replace_request, F_CIB_HOST);
     }
 
     crm_xml_add(replace_request, F_CIB_OPERATION, PCMK__CIB_REQUEST_REPLACE);
     crm_xml_add(replace_request, "original_" F_CIB_OPERATION, op);
     pcmk__xe_set_bool_attr(replace_request, F_CIB_GLOBAL_UPDATE, true);
 
     crm_xml_add(replace_request, PCMK_XA_CRM_FEATURE_SET, CRM_FEATURE_SET);
     digest = calculate_xml_versioned_digest(the_cib, FALSE, TRUE, CRM_FEATURE_SET);
     crm_xml_add(replace_request, PCMK__XA_DIGEST, digest);
 
     add_message_xml(replace_request, F_CIB_CALLDATA, the_cib);
 
     if (!all) {
         peer = pcmk__get_node(0, host, NULL, pcmk__node_search_cluster);
     }
     if (!send_cluster_message(peer, crm_msg_cib, replace_request, FALSE)) {
         result = -ENOTCONN;
     }
     free_xml(replace_request);
     free(digest);
     return result;
 }
 
 int
 cib_process_commit_transaction(const char *op, int options, const char *section,
                                xmlNode *req, xmlNode *input,
                                xmlNode *existing_cib, xmlNode **result_cib,
                                xmlNode **answer)
 {
     /* On success, our caller will activate *result_cib locally, trigger a
      * replace notification if appropriate, and sync *result_cib to all nodes.
      * On failure, our caller will free *result_cib.
      */
     int rc = pcmk_rc_ok;
     const char *client_id = crm_element_value(req, F_CIB_CLIENTID);
     const char *origin = crm_element_value(req, PCMK__XA_SRC);
     pcmk__client_t *client = pcmk__find_client_by_id(client_id);
 
     rc = based_commit_transaction(input, client, origin, result_cib);
 
     if (rc != pcmk_rc_ok) {
         char *source = based_transaction_source_str(client, origin);
 
         crm_err("Could not commit transaction for %s: %s",
                 source, pcmk_rc_str(rc));
         free(source);
     }
     return pcmk_rc2legacy(rc);
 }
 
 int
 cib_process_schemas(const char *op, int options, const char *section, xmlNode *req,
                     xmlNode *input, xmlNode *existing_cib, xmlNode **result_cib,
                     xmlNode **answer)
 {
     xmlNode *data = NULL;
     const char *after_ver = NULL;
     GList *schemas = NULL;
     GList *already_included = NULL;
 
     *answer = create_xml_node(NULL, PCMK__XA_SCHEMAS);
 
     data = get_message_xml(req, F_CIB_CALLDATA);
     if (data == NULL) {
         crm_warn("No data specified in request");
         return -EPROTO;
     }
 
     after_ver = crm_element_value(data, PCMK_XA_VERSION);
     if (after_ver == NULL) {
         crm_warn("No version specified in request");
         return -EPROTO;
     }
 
     /* The client requested all schemas after the latest one we know about, which
      * means the client is fully up-to-date.  Return a properly formatted reply
      * with no schemas.
      */
     if (pcmk__str_eq(after_ver, xml_latest_schema(), pcmk__str_none)) {
         return pcmk_ok;
     }
 
     schemas = pcmk__schema_files_later_than(after_ver);
 
     for (GList *iter = schemas; iter != NULL; iter = iter->next) {
         pcmk__build_schema_xml_node(*answer, iter->data, &already_included);
     }
 
     g_list_free_full(schemas, free);
     g_list_free_full(already_included, free);
     return pcmk_ok;
 }
diff --git a/daemons/controld/controld_corosync.c b/daemons/controld/controld_corosync.c
index 7022d65c22..f4bff2a3fd 100644
--- a/daemons/controld/controld_corosync.c
+++ b/daemons/controld/controld_corosync.c
@@ -1,161 +1,161 @@
 /*
  * Copyright 2004-2024 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU General Public License version 2
  * or later (GPLv2+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #include <sys/param.h>
 #include <sys/types.h>
 #include <sys/stat.h>
 
 #include <crm/crm.h>
 #include <crm/cluster/internal.h>
 #include <crm/common/xml.h>
 
 #include <pacemaker-controld.h>
 
 #if SUPPORT_COROSYNC
 
 extern void post_cache_update(int seq);
 
 /*	 A_HA_CONNECT	*/
 
 static void
 crmd_cs_dispatch(cpg_handle_t handle, const struct cpg_name *groupName,
                  uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
 {
     uint32_t kind = 0;
     const char *from = NULL;
     char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
 
     if(data == NULL) {
         return;
     }
     if (kind == crm_class_cluster) {
         crm_node_t *peer = NULL;
         xmlNode *xml = string2xml(data);
 
         if (xml == NULL) {
             crm_err("Could not parse message content (%d): %.100s", kind, data);
             free(data);
             return;
         }
 
         crm_xml_add(xml, PCMK__XA_SRC, from);
 
         peer = pcmk__get_node(0, from, NULL, pcmk__node_search_cluster);
         if (!pcmk_is_set(peer->processes, crm_proc_cpg)) {
             /* If we can still talk to our peer process on that node,
              * then it must be part of the corosync membership
              */
             crm_warn("Receiving messages from a node we think is dead: %s[%d]",
                      peer->uname, peer->id);
             crm_update_peer_proc(__func__, peer, crm_proc_cpg,
                                  ONLINESTATUS);
         }
         crmd_ha_msg_filter(xml);
         free_xml(xml);
     } else {
         crm_err("Invalid message class (%d): %.100s", kind, data);
     }
     free(data);
 }
 
 static gboolean
 crmd_quorum_callback(unsigned long long seq, gboolean quorate)
 {
     crm_update_quorum(quorate, FALSE);
     post_cache_update(seq);
     return TRUE;
 }
 
 static void
 crmd_cs_destroy(gpointer user_data)
 {
     if (!pcmk_is_set(controld_globals.fsa_input_register, R_HA_DISCONNECTED)) {
         crm_crit("Lost connection to cluster layer, shutting down");
         crmd_exit(CRM_EX_DISCONNECT);
     }
 }
 
 /*!
  * \brief Handle a Corosync notification of a CPG configuration change
  *
  * \param[in] handle               CPG connection
  * \param[in] cpg_name             CPG group name
  * \param[in] member_list          List of current CPG members
  * \param[in] member_list_entries  Number of entries in \p member_list
  * \param[in] left_list            List of CPG members that left
  * \param[in] left_list_entries    Number of entries in \p left_list
  * \param[in] joined_list          List of CPG members that joined
  * \param[in] joined_list_entries  Number of entries in \p joined_list
  */
 static void
 cpg_membership_callback(cpg_handle_t handle, const struct cpg_name *cpg_name,
                         const struct cpg_address *member_list,
                         size_t member_list_entries,
                         const struct cpg_address *left_list,
                         size_t left_list_entries,
                         const struct cpg_address *joined_list,
                         size_t joined_list_entries)
 {
     /* When nodes leave CPG, the DC clears their transient node attributes.
      *
      * However if there is no DC, or the DC is among the nodes that left, each
      * remaining node needs to do the clearing, to ensure it gets done.
      * Otherwise, the attributes would persist when the nodes rejoin, which
      * could have serious consequences for unfencing, agents that use attributes
      * for internal logic, etc.
      *
      * Here, we set a global boolean if the DC is among the nodes that left, for
      * use by the peer callback.
      */
     if (controld_globals.dc_name != NULL) {
         crm_node_t *peer = NULL;
 
-        peer = pcmk__search_cluster_node_cache(0, controld_globals.dc_name,
-                                               NULL);
+        peer = pcmk__search_node_caches(0, controld_globals.dc_name,
+                                        pcmk__node_search_cluster);
         if (peer != NULL) {
             for (int i = 0; i < left_list_entries; ++i) {
                 if (left_list[i].nodeid == peer->id) {
                     controld_set_global_flags(controld_dc_left);
                     break;
                 }
             }
         }
     }
 
     // Process the change normally, which will call the peer callback as needed
     pcmk_cpg_membership(handle, cpg_name, member_list, member_list_entries,
                         left_list, left_list_entries,
                         joined_list, joined_list_entries);
 
     controld_clear_global_flags(controld_dc_left);
 }
 
 extern gboolean crm_connect_corosync(crm_cluster_t * cluster);
 
 gboolean
 crm_connect_corosync(crm_cluster_t * cluster)
 {
     if (is_corosync_cluster()) {
         crm_set_status_callback(&peer_update_callback);
         cluster->cpg.cpg_deliver_fn = crmd_cs_dispatch;
         cluster->cpg.cpg_confchg_fn = cpg_membership_callback;
         cluster->destroy = crmd_cs_destroy;
 
         if (crm_cluster_connect(cluster)) {
             pcmk__corosync_quorum_connect(crmd_quorum_callback,
                                           crmd_cs_destroy);
             return TRUE;
         }
     }
     return FALSE;
 }
 
 #endif
diff --git a/daemons/controld/controld_messages.c b/daemons/controld/controld_messages.c
index 3c0d36893e..11379a876d 100644
--- a/daemons/controld/controld_messages.c
+++ b/daemons/controld/controld_messages.c
@@ -1,1337 +1,1339 @@
 /*
  * Copyright 2004-2024 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU General Public License version 2
  * or later (GPLv2+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #include <sys/param.h>
 #include <string.h>
 #include <time.h>
 
 #include <crm/crm.h>
 #include <crm/msg_xml.h>
 #include <crm/common/xml.h>
 #include <crm/cluster/internal.h>
 #include <crm/cib.h>
 #include <crm/common/ipc_internal.h>
 
 #include <pacemaker-controld.h>
 
 extern void crm_shutdown(int nsig);
 
 static enum crmd_fsa_input handle_message(xmlNode *msg,
                                           enum crmd_fsa_cause cause);
 static void handle_response(xmlNode *stored_msg);
 static enum crmd_fsa_input handle_request(xmlNode *stored_msg,
                                           enum crmd_fsa_cause cause);
 static enum crmd_fsa_input handle_shutdown_request(xmlNode *stored_msg);
 static void send_msg_via_ipc(xmlNode * msg, const char *sys);
 
 /* debug only, can wrap all it likes */
 static int last_data_id = 0;
 
 void
 register_fsa_error_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input,
                        fsa_data_t * cur_data, void *new_data, const char *raised_from)
 {
     /* save the current actions if any */
     if (controld_globals.fsa_actions != A_NOTHING) {
         register_fsa_input_adv(cur_data ? cur_data->fsa_cause : C_FSA_INTERNAL,
                                I_NULL, cur_data ? cur_data->data : NULL,
                                controld_globals.fsa_actions, TRUE, __func__);
     }
 
     /* reset the action list */
     crm_info("Resetting the current action list");
     fsa_dump_actions(controld_globals.fsa_actions, "Drop");
     controld_globals.fsa_actions = A_NOTHING;
 
     /* register the error */
     register_fsa_input_adv(cause, input, new_data, A_NOTHING, TRUE, raised_from);
 }
 
 void
 register_fsa_input_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input,
                        void *data, uint64_t with_actions,
                        gboolean prepend, const char *raised_from)
 {
     unsigned old_len = g_list_length(controld_globals.fsa_message_queue);
     fsa_data_t *fsa_data = NULL;
 
     if (raised_from == NULL) {
         raised_from = "<unknown>";
     }
 
     if (input == I_NULL && with_actions == A_NOTHING /* && data == NULL */ ) {
         /* no point doing anything */
         crm_err("Cannot add entry to queue: no input and no action");
         return;
     }
 
     if (input == I_WAIT_FOR_EVENT) {
         controld_set_global_flags(controld_fsa_is_stalled);
         crm_debug("Stalling the FSA pending further input: source=%s cause=%s data=%p queue=%d",
                   raised_from, fsa_cause2string(cause), data, old_len);
 
         if (old_len > 0) {
             fsa_dump_queue(LOG_TRACE);
             prepend = FALSE;
         }
 
         if (data == NULL) {
             controld_set_fsa_action_flags(with_actions);
             fsa_dump_actions(with_actions, "Restored");
             return;
         }
 
         /* Store everything in the new event and reset
          * controld_globals.fsa_actions
          */
         with_actions |= controld_globals.fsa_actions;
         controld_globals.fsa_actions = A_NOTHING;
     }
 
     last_data_id++;
     crm_trace("%s %s FSA input %d (%s) due to %s, %s data",
               raised_from, (prepend? "prepended" : "appended"), last_data_id,
               fsa_input2string(input), fsa_cause2string(cause),
               (data? "with" : "without"));
 
     fsa_data = calloc(1, sizeof(fsa_data_t));
     fsa_data->id = last_data_id;
     fsa_data->fsa_input = input;
     fsa_data->fsa_cause = cause;
     fsa_data->origin = raised_from;
     fsa_data->data = NULL;
     fsa_data->data_type = fsa_dt_none;
     fsa_data->actions = with_actions;
 
     if (with_actions != A_NOTHING) {
         crm_trace("Adding actions %.16llx to input",
                   (unsigned long long) with_actions);
     }
 
     if (data != NULL) {
         switch (cause) {
             case C_FSA_INTERNAL:
             case C_CRMD_STATUS_CALLBACK:
             case C_IPC_MESSAGE:
             case C_HA_MESSAGE:
                 CRM_CHECK(((ha_msg_input_t *) data)->msg != NULL,
                           crm_err("Bogus data from %s", raised_from));
                 crm_trace("Copying %s data from %s as cluster message data",
                           fsa_cause2string(cause), raised_from);
                 fsa_data->data = copy_ha_msg_input(data);
                 fsa_data->data_type = fsa_dt_ha_msg;
                 break;
 
             case C_LRM_OP_CALLBACK:
                 crm_trace("Copying %s data from %s as lrmd_event_data_t",
                           fsa_cause2string(cause), raised_from);
                 fsa_data->data = lrmd_copy_event((lrmd_event_data_t *) data);
                 fsa_data->data_type = fsa_dt_lrm;
                 break;
 
             case C_TIMER_POPPED:
             case C_SHUTDOWN:
             case C_UNKNOWN:
             case C_STARTUP:
                 crm_crit("Copying %s data (from %s) is not yet implemented",
                          fsa_cause2string(cause), raised_from);
                 crmd_exit(CRM_EX_SOFTWARE);
                 break;
         }
     }
 
     /* make sure to free it properly later */
     if (prepend) {
         controld_globals.fsa_message_queue
             = g_list_prepend(controld_globals.fsa_message_queue, fsa_data);
     } else {
         controld_globals.fsa_message_queue
             = g_list_append(controld_globals.fsa_message_queue, fsa_data);
     }
 
     crm_trace("FSA message queue length is %d",
               g_list_length(controld_globals.fsa_message_queue));
 
     /* fsa_dump_queue(LOG_TRACE); */
 
     if (old_len == g_list_length(controld_globals.fsa_message_queue)) {
         crm_err("Couldn't add message to the queue");
     }
 
     if (input != I_WAIT_FOR_EVENT) {
         controld_trigger_fsa();
     }
 }
 
 void
 fsa_dump_queue(int log_level)
 {
     int offset = 0;
 
     for (GList *iter = controld_globals.fsa_message_queue; iter != NULL;
          iter = iter->next) {
         fsa_data_t *data = (fsa_data_t *) iter->data;
 
         do_crm_log_unlikely(log_level,
                             "queue[%d.%d]: input %s raised by %s(%p.%d)\t(cause=%s)",
                             offset++, data->id, fsa_input2string(data->fsa_input),
                             data->origin, data->data, data->data_type,
                             fsa_cause2string(data->fsa_cause));
     }
 }
 
 ha_msg_input_t *
 copy_ha_msg_input(ha_msg_input_t * orig)
 {
     ha_msg_input_t *copy = calloc(1, sizeof(ha_msg_input_t));
 
     CRM_ASSERT(copy != NULL);
     copy->msg = (orig && orig->msg)? copy_xml(orig->msg) : NULL;
     copy->xml = get_message_xml(copy->msg, F_CRM_DATA);
     return copy;
 }
 
 void
 delete_fsa_input(fsa_data_t * fsa_data)
 {
     lrmd_event_data_t *op = NULL;
     xmlNode *foo = NULL;
 
     if (fsa_data == NULL) {
         return;
     }
     crm_trace("About to free %s data", fsa_cause2string(fsa_data->fsa_cause));
 
     if (fsa_data->data != NULL) {
         switch (fsa_data->data_type) {
             case fsa_dt_ha_msg:
                 delete_ha_msg_input(fsa_data->data);
                 break;
 
             case fsa_dt_xml:
                 foo = fsa_data->data;
                 free_xml(foo);
                 break;
 
             case fsa_dt_lrm:
                 op = (lrmd_event_data_t *) fsa_data->data;
                 lrmd_free_event(op);
                 break;
 
             case fsa_dt_none:
                 if (fsa_data->data != NULL) {
                     crm_err("Don't know how to free %s data from %s",
                             fsa_cause2string(fsa_data->fsa_cause), fsa_data->origin);
                     crmd_exit(CRM_EX_SOFTWARE);
                 }
                 break;
         }
         crm_trace("%s data freed", fsa_cause2string(fsa_data->fsa_cause));
     }
 
     free(fsa_data);
 }
 
 /* returns the next message */
 fsa_data_t *
 get_message(void)
 {
     fsa_data_t *message
         = (fsa_data_t *) controld_globals.fsa_message_queue->data;
 
     controld_globals.fsa_message_queue
         = g_list_remove(controld_globals.fsa_message_queue, message);
     crm_trace("Processing input %d", message->id);
     return message;
 }
 
 void *
 fsa_typed_data_adv(fsa_data_t * fsa_data, enum fsa_data_type a_type, const char *caller)
 {
     void *ret_val = NULL;
 
     if (fsa_data == NULL) {
         crm_err("%s: No FSA data available", caller);
 
     } else if (fsa_data->data == NULL) {
         crm_err("%s: No message data available. Origin: %s", caller, fsa_data->origin);
 
     } else if (fsa_data->data_type != a_type) {
         crm_crit("%s: Message data was the wrong type! %d vs. requested=%d.  Origin: %s",
                  caller, fsa_data->data_type, a_type, fsa_data->origin);
         CRM_ASSERT(fsa_data->data_type == a_type);
     } else {
         ret_val = fsa_data->data;
     }
 
     return ret_val;
 }
 
 /*	A_MSG_ROUTE	*/
 void
 do_msg_route(long long action,
              enum crmd_fsa_cause cause,
              enum crmd_fsa_state cur_state,
              enum crmd_fsa_input current_input, fsa_data_t * msg_data)
 {
     ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg);
 
     route_message(msg_data->fsa_cause, input->msg);
 }
 
 void
 route_message(enum crmd_fsa_cause cause, xmlNode * input)
 {
     ha_msg_input_t fsa_input;
     enum crmd_fsa_input result = I_NULL;
 
     fsa_input.msg = input;
     CRM_CHECK(cause == C_IPC_MESSAGE || cause == C_HA_MESSAGE, return);
 
     /* try passing the buck first */
     if (relay_message(input, cause == C_IPC_MESSAGE)) {
         return;
     }
 
     /* handle locally */
     result = handle_message(input, cause);
 
     /* done or process later? */
     switch (result) {
         case I_NULL:
         case I_CIB_OP:
         case I_ROUTER:
         case I_NODE_JOIN:
         case I_JOIN_REQUEST:
         case I_JOIN_RESULT:
             break;
         default:
             /* Defering local processing of message */
             register_fsa_input_later(cause, result, &fsa_input);
             return;
     }
 
     if (result != I_NULL) {
         /* add to the front of the queue */
         register_fsa_input(cause, result, &fsa_input);
     }
 }
 
 gboolean
 relay_message(xmlNode * msg, gboolean originated_locally)
 {
     enum crm_ais_msg_types dest = crm_msg_ais;
     bool is_for_dc = false;
     bool is_for_dcib = false;
     bool is_for_te = false;
     bool is_for_crm = false;
     bool is_for_cib = false;
     bool is_local = false;
     bool broadcast = false;
     const char *host_to = NULL;
     const char *sys_to = NULL;
     const char *sys_from = NULL;
     const char *type = NULL;
     const char *task = NULL;
     const char *ref = NULL;
     crm_node_t *node_to = NULL;
 
     CRM_CHECK(msg != NULL, return TRUE);
 
     host_to = crm_element_value(msg, F_CRM_HOST_TO);
     sys_to = crm_element_value(msg, F_CRM_SYS_TO);
     sys_from = crm_element_value(msg, F_CRM_SYS_FROM);
     type = crm_element_value(msg, PCMK__XA_T);
     task = crm_element_value(msg, F_CRM_TASK);
     ref = crm_element_value(msg, PCMK_XA_REFERENCE);
 
     broadcast = pcmk__str_empty(host_to);
 
     if (ref == NULL) {
         ref = "without reference ID";
     }
 
     if (pcmk__str_eq(task, CRM_OP_HELLO, pcmk__str_casei)) {
         crm_trace("Received hello %s from %s (no processing needed)",
                   ref, pcmk__s(sys_from, "unidentified source"));
         crm_log_xml_trace(msg, "hello");
         return TRUE;
     }
 
     // Require message type (set by create_request())
     if (!pcmk__str_eq(type, T_CRM, pcmk__str_casei)) {
         crm_warn("Ignoring invalid message %s with type '%s' (not '" T_CRM "')",
                  ref, pcmk__s(type, ""));
         crm_log_xml_trace(msg, "ignored");
         return TRUE;
     }
 
     // Require a destination subsystem (also set by create_request())
     if (sys_to == NULL) {
         crm_warn("Ignoring invalid message %s with no " F_CRM_SYS_TO, ref);
         crm_log_xml_trace(msg, "ignored");
         return TRUE;
     }
 
     // Get the message type appropriate to the destination subsystem
     if (is_corosync_cluster()) {
         dest = text2msg_type(sys_to);
         if ((dest < crm_msg_ais) || (dest > crm_msg_stonith_ng)) {
             /* Unrecognized value, use a sane default
              *
              * @TODO Maybe we should bail instead
              */
             dest = crm_msg_crmd;
         }
     }
 
     is_for_dc = (strcasecmp(CRM_SYSTEM_DC, sys_to) == 0);
     is_for_dcib = (strcasecmp(CRM_SYSTEM_DCIB, sys_to) == 0);
     is_for_te = (strcasecmp(CRM_SYSTEM_TENGINE, sys_to) == 0);
     is_for_cib = (strcasecmp(CRM_SYSTEM_CIB, sys_to) == 0);
     is_for_crm = (strcasecmp(CRM_SYSTEM_CRMD, sys_to) == 0);
 
     // Check whether message should be processed locally
     is_local = false;
     if (broadcast) {
         if (is_for_dc || is_for_te) {
             is_local = false;
 
         } else if (is_for_crm) {
             if (pcmk__strcase_any_of(task, CRM_OP_NODE_INFO,
                                      PCMK__CONTROLD_CMD_NODES, NULL)) {
                 /* Node info requests do not specify a host, which is normally
                  * treated as "all hosts", because the whole point is that the
                  * client may not know the local node name. Always handle these
                  * requests locally.
                  */
                 is_local = true;
             } else {
                 is_local = !originated_locally;
             }
 
         } else {
             is_local = true;
         }
 
     } else if (pcmk__str_eq(controld_globals.our_nodename, host_to,
                             pcmk__str_casei)) {
         is_local = true;
 
     } else if (is_for_crm && pcmk__str_eq(task, CRM_OP_LRM_DELETE, pcmk__str_casei)) {
         xmlNode *msg_data = get_message_xml(msg, F_CRM_DATA);
         const char *mode = crm_element_value(msg_data, PCMK__XA_MODE);
 
         if (pcmk__str_eq(mode, XML_TAG_CIB, pcmk__str_casei)) {
             // Local delete of an offline node's resource history
             is_local = true;
         }
     }
 
     // Check whether message should be relayed
 
     if (is_for_dc || is_for_dcib || is_for_te) {
         if (AM_I_DC) {
             if (is_for_te) {
                 crm_trace("Route message %s locally as transition request",
                           ref);
                 crm_log_xml_trace(msg, sys_to);
                 send_msg_via_ipc(msg, sys_to);
                 return TRUE; // No further processing of message is needed
             }
             crm_trace("Route message %s locally as DC request", ref);
             return FALSE; // More to be done by caller
         }
 
         if (originated_locally
             && !pcmk__strcase_any_of(sys_from, CRM_SYSTEM_PENGINE,
                                      CRM_SYSTEM_TENGINE, NULL)) {
             crm_trace("Relay message %s to DC (via %s)",
                       ref, pcmk__s(host_to, "broadcast"));
             crm_log_xml_trace(msg, "relayed");
             if (!broadcast) {
                 node_to = pcmk__get_node(0, host_to, NULL,
                                          pcmk__node_search_cluster);
             }
             send_cluster_message(node_to, dest, msg, TRUE);
             return TRUE;
         }
 
         /* Transition engine and scheduler messages are sent only to the DC on
          * the same node. If we are no longer the DC, discard this message.
          */
         crm_trace("Ignoring message %s because we are no longer DC", ref);
         crm_log_xml_trace(msg, "ignored");
         return TRUE; // No further processing of message is needed
     }
 
     if (is_local) {
         if (is_for_crm || is_for_cib) {
             crm_trace("Route message %s locally as controller request", ref);
             return FALSE; // More to be done by caller
         }
         crm_trace("Relay message %s locally to %s", ref, sys_to);
         crm_log_xml_trace(msg, "IPC-relay");
         send_msg_via_ipc(msg, sys_to);
         return TRUE;
     }
 
     if (!broadcast) {
-        node_to = pcmk__search_cluster_node_cache(0, host_to, NULL);
+        node_to = pcmk__search_node_caches(0, host_to,
+                                           pcmk__node_search_cluster);
         if (node_to == NULL) {
             crm_warn("Ignoring message %s because node %s is unknown",
                      ref, host_to);
             crm_log_xml_trace(msg, "ignored");
             return TRUE;
         }
     }
 
     crm_trace("Relay message %s to %s",
               ref, pcmk__s(host_to, "all peers"));
     crm_log_xml_trace(msg, "relayed");
     send_cluster_message(node_to, dest, msg, TRUE);
     return TRUE;
 }
 
 // Return true if field contains a positive integer
 static bool
 authorize_version(xmlNode *message_data, const char *field,
                   const char *client_name, const char *ref, const char *uuid)
 {
     const char *version = crm_element_value(message_data, field);
     long long version_num;
 
     if ((pcmk__scan_ll(version, &version_num, -1LL) != pcmk_rc_ok)
         || (version_num < 0LL)) {
 
         crm_warn("Rejected IPC hello from %s: '%s' is not a valid protocol %s "
                  CRM_XS " ref=%s uuid=%s",
                  client_name, ((version == NULL)? "" : version),
                  field, (ref? ref : "none"), uuid);
         return false;
     }
     return true;
 }
 
 /*!
  * \internal
  * \brief Check whether a client IPC message is acceptable
  *
  * If a given client IPC message is a hello, "authorize" it by ensuring it has
  * valid information such as a protocol version, and return false indicating
  * that nothing further needs to be done with the message. If the message is not
  * a hello, just return true to indicate it needs further processing.
  *
  * \param[in]     client_msg     XML of IPC message
  * \param[in,out] curr_client    If IPC is not proxied, client that sent message
  * \param[in]     proxy_session  If IPC is proxied, the session ID
  *
  * \return true if message needs further processing, false if it doesn't
  */
 bool
 controld_authorize_ipc_message(const xmlNode *client_msg, pcmk__client_t *curr_client,
                                const char *proxy_session)
 {
     xmlNode *message_data = NULL;
     const char *client_name = NULL;
     const char *op = crm_element_value(client_msg, F_CRM_TASK);
     const char *ref = crm_element_value(client_msg, PCMK_XA_REFERENCE);
     const char *uuid = (curr_client? curr_client->id : proxy_session);
 
     if (uuid == NULL) {
         crm_warn("IPC message from client rejected: No client identifier "
                  CRM_XS " ref=%s", (ref? ref : "none"));
         goto rejected;
     }
 
     if (!pcmk__str_eq(CRM_OP_HELLO, op, pcmk__str_casei)) {
         // Only hello messages need to be authorized
         return true;
     }
 
     message_data = get_message_xml(client_msg, F_CRM_DATA);
 
     client_name = crm_element_value(message_data, "client_name");
     if (pcmk__str_empty(client_name)) {
         crm_warn("IPC hello from client rejected: No client name",
                  CRM_XS " ref=%s uuid=%s", (ref? ref : "none"), uuid);
         goto rejected;
     }
     if (!authorize_version(message_data, "major_version", client_name, ref,
                            uuid)) {
         goto rejected;
     }
     if (!authorize_version(message_data, "minor_version", client_name, ref,
                            uuid)) {
         goto rejected;
     }
 
     crm_trace("Validated IPC hello from client %s", client_name);
     crm_log_xml_trace(client_msg, "hello");
     if (curr_client) {
         curr_client->userdata = strdup(client_name);
     }
     controld_trigger_fsa();
     return false;
 
 rejected:
     crm_log_xml_trace(client_msg, "rejected");
     if (curr_client) {
         qb_ipcs_disconnect(curr_client->ipcs);
     }
     return false;
 }
 
 static enum crmd_fsa_input
 handle_message(xmlNode *msg, enum crmd_fsa_cause cause)
 {
     const char *type = NULL;
 
     CRM_CHECK(msg != NULL, return I_NULL);
 
     type = crm_element_value(msg, PCMK__XA_SUBT);
     if (pcmk__str_eq(type, PCMK__VALUE_REQUEST, pcmk__str_none)) {
         return handle_request(msg, cause);
     }
 
     if (pcmk__str_eq(type, PCMK__VALUE_RESPONSE, pcmk__str_none)) {
         handle_response(msg);
         return I_NULL;
     }
 
     crm_warn("Ignoring message with unknown " PCMK__XA_SUBT" '%s'",
              pcmk__s(type, ""));
     crm_log_xml_trace(msg, "bad");
     return I_NULL;
 }
 
 static enum crmd_fsa_input
 handle_failcount_op(xmlNode * stored_msg)
 {
     const char *rsc = NULL;
     const char *uname = NULL;
     const char *op = NULL;
     char *interval_spec = NULL;
     guint interval_ms = 0;
     gboolean is_remote_node = FALSE;
     xmlNode *xml_op = get_message_xml(stored_msg, F_CRM_DATA);
 
     if (xml_op) {
         xmlNode *xml_rsc = first_named_child(xml_op, XML_CIB_TAG_RESOURCE);
         xmlNode *xml_attrs = first_named_child(xml_op, XML_TAG_ATTRS);
 
         if (xml_rsc) {
             rsc = ID(xml_rsc);
         }
         if (xml_attrs) {
             op = crm_element_value(xml_attrs,
                                    CRM_META "_" PCMK__META_CLEAR_FAILURE_OP);
             crm_element_value_ms(xml_attrs,
                                  CRM_META "_" PCMK__META_CLEAR_FAILURE_INTERVAL,
                                  &interval_ms);
         }
     }
     uname = crm_element_value(xml_op, XML_LRM_ATTR_TARGET);
 
     if ((rsc == NULL) || (uname == NULL)) {
         crm_log_xml_warn(stored_msg, "invalid failcount op");
         return I_NULL;
     }
 
     if (crm_element_value(xml_op, XML_LRM_ATTR_ROUTER_NODE)) {
         is_remote_node = TRUE;
     }
 
     crm_debug("Clearing failures for %s-interval %s on %s "
               "from attribute manager, CIB, and executor state",
               pcmk__readable_interval(interval_ms), rsc, uname);
 
     if (interval_ms) {
         interval_spec = crm_strdup_printf("%ums", interval_ms);
     }
     update_attrd_clear_failures(uname, rsc, op, interval_spec, is_remote_node);
     free(interval_spec);
 
     controld_cib_delete_last_failure(rsc, uname, op, interval_ms);
 
     lrm_clear_last_failure(rsc, uname, op, interval_ms);
 
     return I_NULL;
 }
 
 static enum crmd_fsa_input
 handle_lrm_delete(xmlNode *stored_msg)
 {
     const char *mode = NULL;
     xmlNode *msg_data = get_message_xml(stored_msg, F_CRM_DATA);
 
     CRM_CHECK(msg_data != NULL, return I_NULL);
 
     /* CRM_OP_LRM_DELETE has two distinct modes. The default behavior is to
      * relay the operation to the affected node, which will unregister the
      * resource from the local executor, clear the resource's history from the
      * CIB, and do some bookkeeping in the controller.
      *
      * However, if the affected node is offline, the client will specify
      * mode="cib" which means the controller receiving the operation should
      * clear the resource's history from the CIB and nothing else. This is used
      * to clear shutdown locks.
      */
     mode = crm_element_value(msg_data, PCMK__XA_MODE);
     if ((mode == NULL) || strcmp(mode, XML_TAG_CIB)) {
         // Relay to affected node
         crm_xml_add(stored_msg, F_CRM_SYS_TO, CRM_SYSTEM_LRMD);
         return I_ROUTER;
 
     } else {
         // Delete CIB history locally (compare with do_lrm_delete())
         const char *from_sys = NULL;
         const char *user_name = NULL;
         const char *rsc_id = NULL;
         const char *node = NULL;
         xmlNode *rsc_xml = NULL;
         int rc = pcmk_rc_ok;
 
         rsc_xml = first_named_child(msg_data, XML_CIB_TAG_RESOURCE);
         CRM_CHECK(rsc_xml != NULL, return I_NULL);
 
         rsc_id = ID(rsc_xml);
         from_sys = crm_element_value(stored_msg, F_CRM_SYS_FROM);
         node = crm_element_value(msg_data, XML_LRM_ATTR_TARGET);
         user_name = pcmk__update_acl_user(stored_msg, F_CRM_USER, NULL);
         crm_debug("Handling " CRM_OP_LRM_DELETE " for %s on %s locally%s%s "
                   "(clearing CIB resource history only)", rsc_id, node,
                   (user_name? " for user " : ""), (user_name? user_name : ""));
         rc = controld_delete_resource_history(rsc_id, node, user_name,
                                               cib_dryrun|cib_sync_call);
         if (rc == pcmk_rc_ok) {
             rc = controld_delete_resource_history(rsc_id, node, user_name,
                                                   crmd_cib_smart_opt());
         }
 
         //Notify client and tengine.(Only notify tengine if mode = "cib" and CRM_OP_LRM_DELETE.)
         if (from_sys) {
             lrmd_event_data_t *op = NULL;
             const char *from_host = crm_element_value(stored_msg, PCMK__XA_SRC);
             const char *transition;
 
             if (strcmp(from_sys, CRM_SYSTEM_TENGINE)) {
                 transition = crm_element_value(msg_data,
                                                PCMK__XA_TRANSITION_KEY);
             } else {
                 transition = crm_element_value(stored_msg,
                                                PCMK__XA_TRANSITION_KEY);
             }
 
             crm_info("Notifying %s on %s that %s was%s deleted",
                      from_sys, (from_host? from_host : "local node"), rsc_id,
                      ((rc == pcmk_rc_ok)? "" : " not"));
             op = lrmd_new_event(rsc_id, PCMK_ACTION_DELETE, 0);
             op->type = lrmd_event_exec_complete;
             op->user_data = strdup(transition? transition : FAKE_TE_ID);
             op->params = pcmk__strkey_table(free, free);
             g_hash_table_insert(op->params, strdup(PCMK_XA_CRM_FEATURE_SET),
                                 strdup(CRM_FEATURE_SET));
             controld_rc2event(op, rc);
             controld_ack_event_directly(from_host, from_sys, NULL, op, rsc_id);
             lrmd_free_event(op);
             controld_trigger_delete_refresh(from_sys, rsc_id);
         }
         return I_NULL;
     }
 }
 
 /*!
  * \brief Handle a CRM_OP_REMOTE_STATE message by updating remote peer cache
  *
  * \param[in] msg  Message XML
  *
  * \return Next FSA input
  */
 static enum crmd_fsa_input
 handle_remote_state(const xmlNode *msg)
 {
     const char *conn_host = NULL;
     const char *remote_uname = ID(msg);
     crm_node_t *remote_peer;
     bool remote_is_up = false;
     int rc = pcmk_rc_ok;
 
     rc = pcmk__xe_get_bool_attr(msg, PCMK__XA_IN_CCM, &remote_is_up);
 
     CRM_CHECK(remote_uname && rc == pcmk_rc_ok, return I_NULL);
 
     remote_peer = crm_remote_peer_get(remote_uname);
     CRM_CHECK(remote_peer, return I_NULL);
 
     pcmk__update_peer_state(__func__, remote_peer,
                             remote_is_up ? CRM_NODE_MEMBER : CRM_NODE_LOST,
                             0);
 
     conn_host = crm_element_value(msg, PCMK__XA_CONN_HOST);
     if (conn_host) {
         pcmk__str_update(&remote_peer->conn_host, conn_host);
     } else if (remote_peer->conn_host) {
         free(remote_peer->conn_host);
         remote_peer->conn_host = NULL;
     }
 
     return I_NULL;
 }
 
 /*!
  * \brief Handle a CRM_OP_PING message
  *
  * \param[in] msg  Message XML
  *
  * \return Next FSA input
  */
 static enum crmd_fsa_input
 handle_ping(const xmlNode *msg)
 {
     const char *value = NULL;
     xmlNode *ping = NULL;
     xmlNode *reply = NULL;
 
     // Build reply
 
     ping = create_xml_node(NULL, XML_CRM_TAG_PING);
     value = crm_element_value(msg, F_CRM_SYS_TO);
     crm_xml_add(ping, XML_PING_ATTR_SYSFROM, value);
 
     // Add controller state
     value = fsa_state2string(controld_globals.fsa_state);
     crm_xml_add(ping, XML_PING_ATTR_CRMDSTATE, value);
     crm_notice("Current ping state: %s", value); // CTS needs this
 
     // Add controller health
     // @TODO maybe do some checks to determine meaningful status
     crm_xml_add(ping, XML_PING_ATTR_STATUS, "ok");
 
     // Send reply
     reply = create_reply(msg, ping);
     free_xml(ping);
     if (reply != NULL) {
         (void) relay_message(reply, TRUE);
         free_xml(reply);
     }
 
     // Nothing further to do
     return I_NULL;
 }
 
 /*!
  * \brief Handle a PCMK__CONTROLD_CMD_NODES message
  *
  * \param[in] request  Message XML
  *
  * \return Next FSA input
  */
 static enum crmd_fsa_input
 handle_node_list(const xmlNode *request)
 {
     GHashTableIter iter;
     crm_node_t *node = NULL;
     xmlNode *reply = NULL;
     xmlNode *reply_data = NULL;
 
     // Create message data for reply
     reply_data = create_xml_node(NULL, XML_CIB_TAG_NODES);
     g_hash_table_iter_init(&iter, crm_peer_cache);
     while (g_hash_table_iter_next(&iter, NULL, (gpointer *) & node)) {
         xmlNode *xml = create_xml_node(reply_data, XML_CIB_TAG_NODE);
 
         crm_xml_add_ll(xml, PCMK_XA_ID, (long long) node->id); // uint32_t
         crm_xml_add(xml, PCMK_XA_UNAME, node->uname);
         crm_xml_add(xml, PCMK__XA_IN_CCM, node->state);
     }
 
     // Create and send reply
     reply = create_reply(request, reply_data);
     free_xml(reply_data);
     if (reply) {
         (void) relay_message(reply, TRUE);
         free_xml(reply);
     }
 
     // Nothing further to do
     return I_NULL;
 }
 
 /*!
  * \brief Handle a CRM_OP_NODE_INFO request
  *
  * \param[in] msg  Message XML
  *
  * \return Next FSA input
  */
 static enum crmd_fsa_input
 handle_node_info_request(const xmlNode *msg)
 {
     const char *value = NULL;
     crm_node_t *node = NULL;
     int node_id = 0;
     xmlNode *reply = NULL;
     xmlNode *reply_data = NULL;
 
     // Build reply
 
     reply_data = create_xml_node(NULL, XML_CIB_TAG_NODE);
     crm_xml_add(reply_data, XML_PING_ATTR_SYSFROM, CRM_SYSTEM_CRMD);
 
     // Add whether current partition has quorum
     pcmk__xe_set_bool_attr(reply_data, PCMK_XA_HAVE_QUORUM,
                            pcmk_is_set(controld_globals.flags,
                                        controld_has_quorum));
 
     // Check whether client requested node info by ID and/or name
     crm_element_value_int(msg, PCMK_XA_ID, &node_id);
     if (node_id < 0) {
         node_id = 0;
     }
     value = crm_element_value(msg, PCMK_XA_UNAME);
 
     // Default to local node if none given
     if ((node_id == 0) && (value == NULL)) {
         value = controld_globals.our_nodename;
     }
 
     node = pcmk__search_node_caches(node_id, value, pcmk__node_search_any);
     if (node) {
         crm_xml_add(reply_data, PCMK_XA_ID, node->uuid);
         crm_xml_add(reply_data, PCMK_XA_UNAME, node->uname);
         crm_xml_add(reply_data, PCMK__XA_CRMD, node->state);
         pcmk__xe_set_bool_attr(reply_data, XML_NODE_IS_REMOTE,
                                pcmk_is_set(node->flags, crm_remote_node));
     }
 
     // Send reply
     reply = create_reply(msg, reply_data);
     free_xml(reply_data);
     if (reply != NULL) {
         (void) relay_message(reply, TRUE);
         free_xml(reply);
     }
 
     // Nothing further to do
     return I_NULL;
 }
 
 static void
 verify_feature_set(xmlNode *msg)
 {
     const char *dc_version = crm_element_value(msg, PCMK_XA_CRM_FEATURE_SET);
 
     if (dc_version == NULL) {
         /* All we really know is that the DC feature set is older than 3.1.0,
          * but that's also all that really matters.
          */
         dc_version = "3.0.14";
     }
 
     if (feature_set_compatible(dc_version, CRM_FEATURE_SET)) {
         crm_trace("Local feature set (%s) is compatible with DC's (%s)",
                   CRM_FEATURE_SET, dc_version);
     } else {
         crm_err("Local feature set (%s) is incompatible with DC's (%s)",
                 CRM_FEATURE_SET, dc_version);
 
         // Nothing is likely to improve without administrator involvement
         controld_set_fsa_input_flags(R_STAYDOWN);
         crmd_exit(CRM_EX_FATAL);
     }
 }
 
 // DC gets own shutdown all-clear
 static enum crmd_fsa_input
 handle_shutdown_self_ack(xmlNode *stored_msg)
 {
     const char *host_from = crm_element_value(stored_msg, PCMK__XA_SRC);
 
     if (pcmk_is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) {
         // The expected case -- we initiated own shutdown sequence
         crm_info("Shutting down controller");
         return I_STOP;
     }
 
     if (pcmk__str_eq(host_from, controld_globals.dc_name, pcmk__str_casei)) {
         // Must be logic error -- DC confirming its own unrequested shutdown
         crm_err("Shutting down controller immediately due to "
                 "unexpected shutdown confirmation");
         return I_TERMINATE;
     }
 
     if (controld_globals.fsa_state != S_STOPPING) {
         // Shouldn't happen -- non-DC confirming unrequested shutdown
         crm_err("Starting new DC election because %s is "
                 "confirming shutdown we did not request",
                 (host_from? host_from : "another node"));
         return I_ELECTION;
     }
 
     // Shouldn't happen, but we are already stopping anyway
     crm_debug("Ignoring unexpected shutdown confirmation from %s",
               (host_from? host_from : "another node"));
     return I_NULL;
 }
 
 // Non-DC gets shutdown all-clear from DC
 static enum crmd_fsa_input
 handle_shutdown_ack(xmlNode *stored_msg)
 {
     const char *host_from = crm_element_value(stored_msg, PCMK__XA_SRC);
 
     if (host_from == NULL) {
         crm_warn("Ignoring shutdown request without origin specified");
         return I_NULL;
     }
 
     if (pcmk__str_eq(host_from, controld_globals.dc_name,
                      pcmk__str_null_matches|pcmk__str_casei)) {
 
         if (pcmk_is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) {
             crm_info("Shutting down controller after confirmation from %s",
                      host_from);
         } else {
             crm_err("Shutting down controller after unexpected "
                     "shutdown request from %s", host_from);
             controld_set_fsa_input_flags(R_STAYDOWN);
         }
         return I_STOP;
     }
 
     crm_warn("Ignoring shutdown request from %s because DC is %s",
              host_from, controld_globals.dc_name);
     return I_NULL;
 }
 
 static enum crmd_fsa_input
 handle_request(xmlNode *stored_msg, enum crmd_fsa_cause cause)
 {
     xmlNode *msg = NULL;
     const char *op = crm_element_value(stored_msg, F_CRM_TASK);
 
     /* Optimize this for the DC - it has the most to do */
 
     crm_log_xml_trace(stored_msg, "request");
     if (op == NULL) {
         crm_warn("Ignoring request without " F_CRM_TASK);
         return I_NULL;
     }
 
     if (strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0) {
         const char *from = crm_element_value(stored_msg, PCMK__XA_SRC);
-        crm_node_t *node = pcmk__search_cluster_node_cache(0, from, NULL);
+        crm_node_t *node = pcmk__search_node_caches(0, from,
+                                                    pcmk__node_search_cluster);
 
         pcmk__update_peer_expected(__func__, node, CRMD_JOINSTATE_DOWN);
         if(AM_I_DC == FALSE) {
             return I_NULL; /* Done */
         }
     }
 
     /*========== DC-Only Actions ==========*/
     if (AM_I_DC) {
         if (strcmp(op, CRM_OP_JOIN_ANNOUNCE) == 0) {
             return I_NODE_JOIN;
 
         } else if (strcmp(op, CRM_OP_JOIN_REQUEST) == 0) {
             return I_JOIN_REQUEST;
 
         } else if (strcmp(op, CRM_OP_JOIN_CONFIRM) == 0) {
             return I_JOIN_RESULT;
 
         } else if (strcmp(op, CRM_OP_SHUTDOWN) == 0) {
             return handle_shutdown_self_ack(stored_msg);
 
         } else if (strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0) {
             // Another controller wants to shut down its node
             return handle_shutdown_request(stored_msg);
         }
     }
 
     /*========== common actions ==========*/
     if (strcmp(op, CRM_OP_NOVOTE) == 0) {
         ha_msg_input_t fsa_input;
 
         fsa_input.msg = stored_msg;
         register_fsa_input_adv(C_HA_MESSAGE, I_NULL, &fsa_input,
                                A_ELECTION_COUNT | A_ELECTION_CHECK, FALSE,
                                __func__);
 
     } else if (strcmp(op, CRM_OP_REMOTE_STATE) == 0) {
         /* a remote connection host is letting us know the node state */
         return handle_remote_state(stored_msg);
 
     } else if (strcmp(op, CRM_OP_THROTTLE) == 0) {
         throttle_update(stored_msg);
         if (AM_I_DC && (controld_globals.transition_graph != NULL)
             && !controld_globals.transition_graph->complete) {
 
             crm_debug("The throttle changed. Trigger a graph.");
             trigger_graph();
         }
         return I_NULL;
 
     } else if (strcmp(op, CRM_OP_CLEAR_FAILCOUNT) == 0) {
         return handle_failcount_op(stored_msg);
 
     } else if (strcmp(op, CRM_OP_VOTE) == 0) {
         /* count the vote and decide what to do after that */
         ha_msg_input_t fsa_input;
 
         fsa_input.msg = stored_msg;
         register_fsa_input_adv(C_HA_MESSAGE, I_NULL, &fsa_input,
                                A_ELECTION_COUNT | A_ELECTION_CHECK, FALSE,
                                __func__);
 
         /* Sometimes we _must_ go into S_ELECTION */
         if (controld_globals.fsa_state == S_HALT) {
             crm_debug("Forcing an election from S_HALT");
             return I_ELECTION;
         }
 
     } else if (strcmp(op, CRM_OP_JOIN_OFFER) == 0) {
         verify_feature_set(stored_msg);
         crm_debug("Raising I_JOIN_OFFER: join-%s", crm_element_value(stored_msg, F_CRM_JOIN_ID));
         return I_JOIN_OFFER;
 
     } else if (strcmp(op, CRM_OP_JOIN_ACKNAK) == 0) {
         crm_debug("Raising I_JOIN_RESULT: join-%s", crm_element_value(stored_msg, F_CRM_JOIN_ID));
         return I_JOIN_RESULT;
 
     } else if (strcmp(op, CRM_OP_LRM_DELETE) == 0) {
         return handle_lrm_delete(stored_msg);
 
     } else if ((strcmp(op, CRM_OP_LRM_FAIL) == 0)
                || (strcmp(op, CRM_OP_LRM_REFRESH) == 0) // @COMPAT
                || (strcmp(op, CRM_OP_REPROBE) == 0)) {
 
         crm_xml_add(stored_msg, F_CRM_SYS_TO, CRM_SYSTEM_LRMD);
         return I_ROUTER;
 
     } else if (strcmp(op, CRM_OP_NOOP) == 0) {
         return I_NULL;
 
     } else if (strcmp(op, CRM_OP_LOCAL_SHUTDOWN) == 0) {
 
         crm_shutdown(SIGTERM);
         /*return I_SHUTDOWN; */
         return I_NULL;
 
     } else if (strcmp(op, CRM_OP_PING) == 0) {
         return handle_ping(stored_msg);
 
     } else if (strcmp(op, CRM_OP_NODE_INFO) == 0) {
         return handle_node_info_request(stored_msg);
 
     } else if (strcmp(op, CRM_OP_RM_NODE_CACHE) == 0) {
         int id = 0;
         const char *name = NULL;
 
         crm_element_value_int(stored_msg, PCMK_XA_ID, &id);
         name = crm_element_value(stored_msg, PCMK_XA_UNAME);
 
         if(cause == C_IPC_MESSAGE) {
             msg = create_request(CRM_OP_RM_NODE_CACHE, NULL, NULL, CRM_SYSTEM_CRMD, CRM_SYSTEM_CRMD, NULL);
             if (send_cluster_message(NULL, crm_msg_crmd, msg, TRUE) == FALSE) {
                 crm_err("Could not instruct peers to remove references to node %s/%u", name, id);
             } else {
                 crm_notice("Instructing peers to remove references to node %s/%u", name, id);
             }
             free_xml(msg);
 
         } else {
             reap_crm_member(id, name);
 
             /* If we're forgetting this node, also forget any failures to fence
              * it, so we don't carry that over to any node added later with the
              * same name.
              */
             st_fail_count_reset(name);
         }
 
     } else if (strcmp(op, CRM_OP_MAINTENANCE_NODES) == 0) {
         xmlNode *xml = get_message_xml(stored_msg, F_CRM_DATA);
 
         remote_ra_process_maintenance_nodes(xml);
 
     } else if (strcmp(op, PCMK__CONTROLD_CMD_NODES) == 0) {
         return handle_node_list(stored_msg);
 
         /*========== (NOT_DC)-Only Actions ==========*/
     } else if (!AM_I_DC) {
 
         if (strcmp(op, CRM_OP_SHUTDOWN) == 0) {
             return handle_shutdown_ack(stored_msg);
         }
 
     } else {
         crm_err("Unexpected request (%s) sent to %s", op, AM_I_DC ? "the DC" : "non-DC node");
         crm_log_xml_err(stored_msg, "Unexpected");
     }
 
     return I_NULL;
 }
 
 static void
 handle_response(xmlNode *stored_msg)
 {
     const char *op = crm_element_value(stored_msg, F_CRM_TASK);
 
     crm_log_xml_trace(stored_msg, "reply");
     if (op == NULL) {
         crm_warn("Ignoring reply without " F_CRM_TASK);
 
     } else if (AM_I_DC && strcmp(op, CRM_OP_PECALC) == 0) {
         // Check whether scheduler answer been superseded by subsequent request
         const char *msg_ref = crm_element_value(stored_msg, PCMK_XA_REFERENCE);
 
         if (msg_ref == NULL) {
             crm_err("%s - Ignoring calculation with no reference", op);
 
         } else if (pcmk__str_eq(msg_ref, controld_globals.fsa_pe_ref,
                                 pcmk__str_none)) {
             ha_msg_input_t fsa_input;
 
             controld_stop_sched_timer();
             fsa_input.msg = stored_msg;
             register_fsa_input_later(C_IPC_MESSAGE, I_PE_SUCCESS, &fsa_input);
 
         } else {
             crm_info("%s calculation %s is obsolete", op, msg_ref);
         }
 
     } else if (strcmp(op, CRM_OP_VOTE) == 0
                || strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0 || strcmp(op, CRM_OP_SHUTDOWN) == 0) {
 
     } else {
         const char *host_from = crm_element_value(stored_msg, PCMK__XA_SRC);
 
         crm_err("Unexpected response (op=%s, src=%s) sent to the %s",
                 op, host_from, AM_I_DC ? "DC" : "controller");
     }
 }
 
 static enum crmd_fsa_input
 handle_shutdown_request(xmlNode * stored_msg)
 {
     /* handle here to avoid potential version issues
      *   where the shutdown message/procedure may have
      *   been changed in later versions.
      *
      * This way the DC is always in control of the shutdown
      */
 
     char *now_s = NULL;
     const char *host_from = crm_element_value(stored_msg, PCMK__XA_SRC);
 
     if (host_from == NULL) {
         /* we're shutting down and the DC */
         host_from = controld_globals.our_nodename;
     }
 
     crm_info("Creating shutdown request for %s (state=%s)", host_from,
              fsa_state2string(controld_globals.fsa_state));
     crm_log_xml_trace(stored_msg, "message");
 
     now_s = pcmk__ttoa(time(NULL));
     update_attrd(host_from, XML_CIB_ATTR_SHUTDOWN, now_s, NULL, FALSE);
     free(now_s);
 
     /* will be picked up by the TE as long as its running */
     return I_NULL;
 }
 
 static void
 send_msg_via_ipc(xmlNode * msg, const char *sys)
 {
     pcmk__client_t *client_channel = NULL;
 
     CRM_CHECK(sys != NULL, return);
 
     client_channel = pcmk__find_client_by_id(sys);
 
     if (crm_element_value(msg, PCMK__XA_SRC) == NULL) {
         crm_xml_add(msg, PCMK__XA_SRC, controld_globals.our_nodename);
     }
 
     if (client_channel != NULL) {
         /* Transient clients such as crmadmin */
         pcmk__ipc_send_xml(client_channel, 0, msg, crm_ipc_server_event);
 
     } else if (pcmk__str_eq(sys, CRM_SYSTEM_TENGINE, pcmk__str_none)) {
         xmlNode *data = get_message_xml(msg, F_CRM_DATA);
 
         process_te_message(msg, data);
 
     } else if (pcmk__str_eq(sys, CRM_SYSTEM_LRMD, pcmk__str_none)) {
         fsa_data_t fsa_data;
         ha_msg_input_t fsa_input;
 
         fsa_input.msg = msg;
         fsa_input.xml = get_message_xml(msg, F_CRM_DATA);
 
         fsa_data.id = 0;
         fsa_data.actions = 0;
         fsa_data.data = &fsa_input;
         fsa_data.fsa_input = I_MESSAGE;
         fsa_data.fsa_cause = C_IPC_MESSAGE;
         fsa_data.origin = __func__;
         fsa_data.data_type = fsa_dt_ha_msg;
 
         do_lrm_invoke(A_LRM_INVOKE, C_IPC_MESSAGE, controld_globals.fsa_state,
                       I_MESSAGE, &fsa_data);
 
     } else if (crmd_is_proxy_session(sys)) {
         crmd_proxy_send(sys, msg);
 
     } else {
         crm_info("Received invalid request: unknown subsystem '%s'", sys);
     }
 }
 
 void
 delete_ha_msg_input(ha_msg_input_t * orig)
 {
     if (orig == NULL) {
         return;
     }
     free_xml(orig->msg);
     free(orig);
 }
 
 /*!
  * \internal
  * \brief Notify the cluster of a remote node state change
  *
  * \param[in] node_name  Node's name
  * \param[in] node_up    true if node is up, false if down
  */
 void
 broadcast_remote_state_message(const char *node_name, bool node_up)
 {
     xmlNode *msg = create_request(CRM_OP_REMOTE_STATE, NULL, NULL,
                                   CRM_SYSTEM_CRMD, CRM_SYSTEM_CRMD, NULL);
 
     crm_info("Notifying cluster of Pacemaker Remote node %s %s",
              node_name, node_up? "coming up" : "going down");
 
     crm_xml_add(msg, PCMK_XA_ID, node_name);
     pcmk__xe_set_bool_attr(msg, PCMK__XA_IN_CCM, node_up);
 
     if (node_up) {
         crm_xml_add(msg, PCMK__XA_CONN_HOST, controld_globals.our_nodename);
     }
 
     send_cluster_message(NULL, crm_msg_crmd, msg, TRUE);
     free_xml(msg);
 }
 
diff --git a/lib/cluster/cluster.c b/lib/cluster/cluster.c
index f2cd428b57..1cdc2047ae 100644
--- a/lib/cluster/cluster.c
+++ b/lib/cluster/cluster.c
@@ -1,406 +1,407 @@
 /*
  * Copyright 2004-2023 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU Lesser General Public License
  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 #include <dlfcn.h>
 
 #include <stdio.h>
 #include <unistd.h>
 #include <string.h>
 #include <stdlib.h>
 #include <time.h>
 #include <sys/param.h>
 #include <sys/types.h>
 
 #include <crm/crm.h>
 #include <crm/msg_xml.h>
 
 #include <crm/common/ipc.h>
 #include <crm/cluster/internal.h>
 #include "crmcluster_private.h"
 
 CRM_TRACE_INIT_DATA(cluster);
 
 /*!
  * \brief Get (and set if needed) a node's UUID
  *
  * \param[in,out] peer  Node to check
  *
  * \return Node UUID of \p peer, or NULL if unknown
  */
 const char *
 crm_peer_uuid(crm_node_t *peer)
 {
     char *uuid = NULL;
 
     // Check simple cases first, to avoid any calls that might block
     if (peer == NULL) {
         return NULL;
     }
     if (peer->uuid != NULL) {
         return peer->uuid;
     }
 
     switch (get_cluster_type()) {
         case pcmk_cluster_corosync:
 #if SUPPORT_COROSYNC
             uuid = pcmk__corosync_uuid(peer);
 #endif
             break;
 
         case pcmk_cluster_unknown:
         case pcmk_cluster_invalid:
             crm_err("Unsupported cluster type");
             break;
     }
 
     peer->uuid = uuid;
     return peer->uuid;
 }
 
 /*!
  * \brief Connect to the cluster layer
  *
  * \param[in,out] Initialized cluster object to connect
  *
  * \return TRUE on success, otherwise FALSE
  */
 gboolean
 crm_cluster_connect(crm_cluster_t *cluster)
 {
     enum cluster_type_e type = get_cluster_type();
 
     crm_notice("Connecting to %s cluster infrastructure",
                name_for_cluster_type(type));
     switch (type) {
         case pcmk_cluster_corosync:
 #if SUPPORT_COROSYNC
             crm_peer_init();
             return pcmk__corosync_connect(cluster);
 #else
             break;
 #endif // SUPPORT_COROSYNC
         default:
             break;
     }
     return FALSE;
 }
 
 /*!
  * \brief Disconnect from the cluster layer
  *
  * \param[in,out] cluster  Cluster object to disconnect
  */
 void
 crm_cluster_disconnect(crm_cluster_t *cluster)
 {
     enum cluster_type_e type = get_cluster_type();
 
     crm_info("Disconnecting from %s cluster infrastructure",
              name_for_cluster_type(type));
     switch (type) {
         case pcmk_cluster_corosync:
 #if SUPPORT_COROSYNC
             crm_peer_destroy();
             pcmk__corosync_disconnect(cluster);
 #endif // SUPPORT_COROSYNC
             break;
         default:
             break;
     }
 }
 
 /*!
  * \brief Allocate a new \p crm_cluster_t object
  *
  * \return A newly allocated \p crm_cluster_t object (guaranteed not \p NULL)
  * \note The caller is responsible for freeing the return value using
  *       \p pcmk_cluster_free().
  */
 crm_cluster_t *
 pcmk_cluster_new(void)
 {
     crm_cluster_t *cluster = calloc(1, sizeof(crm_cluster_t));
 
     CRM_ASSERT(cluster != NULL);
     return cluster;
 }
 
 /*!
  * \brief Free a \p crm_cluster_t object and its dynamically allocated members
  *
  * \param[in,out] cluster  Cluster object to free
  */
 void
 pcmk_cluster_free(crm_cluster_t *cluster)
 {
     if (cluster == NULL) {
         return;
     }
     free(cluster->uuid);
     free(cluster->uname);
     free(cluster);
 }
 
 /*!
  * \brief Send an XML message via the cluster messaging layer
  *
  * \param[in] node     Cluster node to send message to
  * \param[in] service  Message type to use in message host info
  * \param[in] data     XML message to send
  * \param[in] ordered  Ignored for currently supported messaging layers
  *
  * \return TRUE on success, otherwise FALSE
  */
 gboolean
 send_cluster_message(const crm_node_t *node, enum crm_ais_msg_types service,
                      const xmlNode *data, gboolean ordered)
 {
     switch (get_cluster_type()) {
         case pcmk_cluster_corosync:
 #if SUPPORT_COROSYNC
             return pcmk__cpg_send_xml(data, node, service);
 #endif
             break;
         default:
             break;
     }
     return FALSE;
 }
 
 /*!
  * \brief Get the local node's name
  *
  * \return Local node's name
  * \note This will fatally exit if local node name cannot be known.
  */
 const char *
 get_local_node_name(void)
 {
     static char *name = NULL;
 
     if (name == NULL) {
         name = get_node_name(0);
     }
     return name;
 }
 
 /*!
  * \brief Get the node name corresponding to a cluster node ID
  *
  * \param[in] nodeid  Node ID to check (or 0 for local node)
  *
  * \return Node name corresponding to \p nodeid
  * \note This will fatally exit if \p nodeid is 0 and local node name cannot be
  *       known.
  */
 char *
 get_node_name(uint32_t nodeid)
 {
     char *name = NULL;
     enum cluster_type_e stack = get_cluster_type();
 
     switch (stack) {
         case pcmk_cluster_corosync:
 #if SUPPORT_COROSYNC
             name = pcmk__corosync_name(0, nodeid);
             break;
 #endif // SUPPORT_COROSYNC
 
         default:
             crm_err("Unknown cluster type: %s (%d)", name_for_cluster_type(stack), stack);
     }
 
     if ((name == NULL) && (nodeid == 0)) {
         name = pcmk_hostname();
         if (name == NULL) {
             // @TODO Maybe let the caller decide what to do
             crm_err("Could not obtain the local %s node name",
                     name_for_cluster_type(stack));
             crm_exit(CRM_EX_FATAL);
         }
         crm_notice("Defaulting to uname -n for the local %s node name",
                    name_for_cluster_type(stack));
     }
 
     if (name == NULL) {
         crm_notice("Could not obtain a node name for %s node with id %u",
                    name_for_cluster_type(stack), nodeid);
     }
     return name;
 }
 
 /*!
  * \brief Get the node name corresponding to a node UUID
  *
  * \param[in] uuid  UUID of desired node
  *
  * \return name of desired node
  *
  * \note This relies on the remote peer cache being populated with all
  *       remote nodes in the cluster, so callers should maintain that cache.
  */
 const char *
 crm_peer_uname(const char *uuid)
 {
     GHashTableIter iter;
     crm_node_t *node = NULL;
 
     CRM_CHECK(uuid != NULL, return NULL);
 
     /* remote nodes have the same uname and uuid */
     if (g_hash_table_lookup(crm_remote_peer_cache, uuid)) {
         return uuid;
     }
 
     /* avoid blocking calls where possible */
     g_hash_table_iter_init(&iter, crm_peer_cache);
     while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
         if (pcmk__str_eq(node->uuid, uuid, pcmk__str_casei)) {
             if (node->uname != NULL) {
                 return node->uname;
             }
             break;
         }
     }
     node = NULL;
 
     if (is_corosync_cluster()) {
         long long id;
 
         if ((pcmk__scan_ll(uuid, &id, 0LL) != pcmk_rc_ok)
             || (id < 1LL) || (id > UINT32_MAX))  {
             crm_err("Invalid Corosync node ID '%s'", uuid);
             return NULL;
         }
 
-        node = pcmk__search_cluster_node_cache((uint32_t) id, NULL, NULL);
+        node = pcmk__search_node_caches((uint32_t) id, NULL,
+                                        pcmk__node_search_cluster);
         if (node != NULL) {
             crm_info("Setting uuid for node %s[%u] to %s",
                      node->uname, node->id, uuid);
             node->uuid = strdup(uuid);
             return node->uname;
         }
         return NULL;
     }
 
     return NULL;
 }
 
 /*!
  * \brief  Get a log-friendly string equivalent of a cluster type
  *
  * \param[in] type  Cluster type
  *
  * \return Log-friendly string corresponding to \p type
  */
 const char *
 name_for_cluster_type(enum cluster_type_e type)
 {
     switch (type) {
         case pcmk_cluster_corosync:
             return "corosync";
         case pcmk_cluster_unknown:
             return "unknown";
         case pcmk_cluster_invalid:
             return "invalid";
     }
     crm_err("Invalid cluster type: %d", type);
     return "invalid";
 }
 
 /*!
  * \brief Get (and validate) the local cluster type
  *
  * \return Local cluster type
  * \note This will fatally exit if the local cluster type is invalid.
  */
 enum cluster_type_e
 get_cluster_type(void)
 {
     bool detected = false;
     const char *cluster = NULL;
     static enum cluster_type_e cluster_type = pcmk_cluster_unknown;
 
     /* Return the previous calculation, if any */
     if (cluster_type != pcmk_cluster_unknown) {
         return cluster_type;
     }
 
     cluster = pcmk__env_option(PCMK__ENV_CLUSTER_TYPE);
 
 #if SUPPORT_COROSYNC
     /* If nothing is defined in the environment, try corosync (if supported) */
     if (cluster == NULL) {
         crm_debug("Testing with Corosync");
         cluster_type = pcmk__corosync_detect();
         if (cluster_type != pcmk_cluster_unknown) {
             detected = true;
             goto done;
         }
     }
 #endif
 
     /* Something was defined in the environment, test it against what we support */
     crm_info("Verifying cluster type: '%s'",
              ((cluster == NULL)? "-unspecified-" : cluster));
     if (cluster == NULL) {
 
 #if SUPPORT_COROSYNC
     } else if (pcmk__str_eq(cluster, "corosync", pcmk__str_casei)) {
         cluster_type = pcmk_cluster_corosync;
 #endif
 
     } else {
         cluster_type = pcmk_cluster_invalid;
         goto done; /* Keep the compiler happy when no stacks are supported */
     }
 
   done:
     if (cluster_type == pcmk_cluster_unknown) {
         crm_notice("Could not determine the current cluster type");
 
     } else if (cluster_type == pcmk_cluster_invalid) {
         crm_notice("This installation does not support the '%s' cluster infrastructure: terminating.",
                    cluster);
         crm_exit(CRM_EX_FATAL);
 
     } else {
         crm_info("%s an active '%s' cluster",
                  (detected? "Detected" : "Assuming"),
                  name_for_cluster_type(cluster_type));
     }
 
     return cluster_type;
 }
 
 /*!
  * \brief Check whether the local cluster is a Corosync cluster
  *
  * \return TRUE if the local cluster is a Corosync cluster, otherwise FALSE
  */
 gboolean
 is_corosync_cluster(void)
 {
     return get_cluster_type() == pcmk_cluster_corosync;
 }
 
 // Deprecated functions kept only for backward API compatibility
 // LCOV_EXCL_START
 
 #include <crm/cluster/compat.h>
 
 void
 set_uuid(xmlNode *xml, const char *attr, crm_node_t *node)
 {
     crm_xml_add(xml, attr, crm_peer_uuid(node));
 }
 
 // LCOV_EXCL_STOP
 // End deprecated API
diff --git a/lib/cluster/cpg.c b/lib/cluster/cpg.c
index b5f28841cc..4f3e81c511 100644
--- a/lib/cluster/cpg.c
+++ b/lib/cluster/cpg.c
@@ -1,1095 +1,1095 @@
 /*
  * Copyright 2004-2024 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU Lesser General Public License
  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 #include <bzlib.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <netdb.h>
 
 #include <crm/common/ipc.h>
 #include <crm/cluster/internal.h>
 #include <crm/common/mainloop.h>
 #include <sys/utsname.h>
 
 #include <qb/qbipc_common.h>
 #include <qb/qbipcc.h>
 #include <qb/qbutil.h>
 
 #include <corosync/corodefs.h>
 #include <corosync/corotypes.h>
 #include <corosync/hdb.h>
 #include <corosync/cpg.h>
 
 #include <crm/msg_xml.h>
 
 #include <crm/common/ipc_internal.h>  /* PCMK__SPECIAL_PID* */
 #include "crmcluster_private.h"
 
 /* @TODO Once we can update the public API to require crm_cluster_t* in more
  *       functions, we can ditch this in favor of cluster->cpg_handle.
  */
 static cpg_handle_t pcmk_cpg_handle = 0;
 
 // @TODO These could be moved to crm_cluster_t* at that time as well
 static bool cpg_evicted = false;
 static GList *cs_message_queue = NULL;
 static int cs_message_timer = 0;
 
 struct pcmk__cpg_host_s {
     uint32_t id;
     uint32_t pid;
     gboolean local;
     enum crm_ais_msg_types type;
     uint32_t size;
     char uname[MAX_NAME];
 } __attribute__ ((packed));
 
 typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
 
 struct pcmk__cpg_msg_s {
     struct qb_ipc_response_header header __attribute__ ((aligned(8)));
     uint32_t id;
     gboolean is_compressed;
 
     pcmk__cpg_host_t host;
     pcmk__cpg_host_t sender;
 
     uint32_t size;
     uint32_t compressed_size;
     /* 584 bytes */
     char data[0];
 
 } __attribute__ ((packed));
 
 typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
 
 static void crm_cs_flush(gpointer data);
 
 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
 
 #define cs_repeat(rc, counter, max, code) do {                          \
         rc = code;                                                      \
         if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) {    \
             counter++;                                                  \
             crm_debug("Retrying operation after %ds", counter);         \
             sleep(counter);                                             \
         } else {                                                        \
             break;                                                      \
         }                                                               \
     } while (counter < max)
 
 /*!
  * \brief Disconnect from Corosync CPG
  *
  * \param[in,out] cluster  Cluster to disconnect
  */
 void
 cluster_disconnect_cpg(crm_cluster_t *cluster)
 {
     pcmk_cpg_handle = 0;
     if (cluster->cpg_handle) {
         crm_trace("Disconnecting CPG");
         cpg_leave(cluster->cpg_handle, &cluster->group);
         cpg_finalize(cluster->cpg_handle);
         cluster->cpg_handle = 0;
 
     } else {
         crm_info("No CPG connection");
     }
 }
 
 /*!
  * \brief Get the local Corosync node ID (via CPG)
  *
  * \param[in] handle  CPG connection to use (or 0 to use new connection)
  *
  * \return Corosync ID of local node (or 0 if not known)
  */
 uint32_t
 get_local_nodeid(cpg_handle_t handle)
 {
     cs_error_t rc = CS_OK;
     int retries = 0;
     static uint32_t local_nodeid = 0;
     cpg_handle_t local_handle = handle;
     cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
     int fd = -1;
     uid_t found_uid = 0;
     gid_t found_gid = 0;
     pid_t found_pid = 0;
     int rv;
 
     if(local_nodeid != 0) {
         return local_nodeid;
     }
 
     if(handle == 0) {
         crm_trace("Creating connection");
         cs_repeat(rc, retries, 5, cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
         if (rc != CS_OK) {
             crm_err("Could not connect to the CPG API: %s (%d)",
                     cs_strerror(rc), rc);
             return 0;
         }
 
         rc = cpg_fd_get(local_handle, &fd);
         if (rc != CS_OK) {
             crm_err("Could not obtain the CPG API connection: %s (%d)",
                     cs_strerror(rc), rc);
             goto bail;
         }
 
         /* CPG provider run as root (in given user namespace, anyway)? */
         if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
                                                 &found_uid, &found_gid))) {
             crm_err("CPG provider is not authentic:"
                     " process %lld (uid: %lld, gid: %lld)",
                     (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
                     (long long) found_uid, (long long) found_gid);
             goto bail;
         } else if (rv < 0) {
             crm_err("Could not verify authenticity of CPG provider: %s (%d)",
                     strerror(-rv), -rv);
             goto bail;
         }
     }
 
     if (rc == CS_OK) {
         retries = 0;
         crm_trace("Performing lookup");
         cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
     }
 
     if (rc != CS_OK) {
         crm_err("Could not get local node id from the CPG API: %s (%d)",
                 pcmk__cs_err_str(rc), rc);
     }
 
 bail:
     if(handle == 0) {
         crm_trace("Closing connection");
         cpg_finalize(local_handle);
     }
     crm_debug("Local nodeid is %u", local_nodeid);
     return local_nodeid;
 }
 
 /*!
  * \internal
  * \brief Callback function for Corosync message queue timer
  *
  * \param[in] data  CPG handle
  *
  * \return FALSE (to indicate to glib that timer should not be removed)
  */
 static gboolean
 crm_cs_flush_cb(gpointer data)
 {
     cs_message_timer = 0;
     crm_cs_flush(data);
     return FALSE;
 }
 
 // Send no more than this many CPG messages in one flush
 #define CS_SEND_MAX 200
 
 /*!
  * \internal
  * \brief Send messages in Corosync CPG message queue
  *
  * \param[in] data   CPG handle
  */
 static void
 crm_cs_flush(gpointer data)
 {
     unsigned int sent = 0;
     guint queue_len = 0;
     cs_error_t rc = 0;
     cpg_handle_t *handle = (cpg_handle_t *) data;
 
     if (*handle == 0) {
         crm_trace("Connection is dead");
         return;
     }
 
     queue_len = g_list_length(cs_message_queue);
     if (((queue_len % 1000) == 0) && (queue_len > 1)) {
         crm_err("CPG queue has grown to %d", queue_len);
 
     } else if (queue_len == CS_SEND_MAX) {
         crm_warn("CPG queue has grown to %d", queue_len);
     }
 
     if (cs_message_timer != 0) {
         /* There is already a timer, wait until it goes off */
         crm_trace("Timer active %d", cs_message_timer);
         return;
     }
 
     while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
         struct iovec *iov = cs_message_queue->data;
 
         rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
         if (rc != CS_OK) {
             break;
         }
 
         sent++;
         crm_trace("CPG message sent, size=%llu",
                   (unsigned long long) iov->iov_len);
 
         cs_message_queue = g_list_remove(cs_message_queue, iov);
         free(iov->iov_base);
         free(iov);
     }
 
     queue_len -= sent;
     do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
                "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
                sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
                (int) rc);
 
     if (cs_message_queue) {
         uint32_t delay_ms = 100;
         if (rc != CS_OK) {
             /* Proportionally more if sending failed but cap at 1s */
             delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
         }
         cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
     }
 }
 
 /*!
  * \internal
  * \brief Dispatch function for CPG handle
  *
  * \param[in,out] user_data  Cluster object
  *
  * \return 0 on success, -1 on error (per mainloop_io_t interface)
  */
 static int
 pcmk_cpg_dispatch(gpointer user_data)
 {
     cs_error_t rc = CS_OK;
     crm_cluster_t *cluster = (crm_cluster_t *) user_data;
 
     rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
     if (rc != CS_OK) {
         crm_err("Connection to the CPG API failed: %s (%d)",
                 pcmk__cs_err_str(rc), rc);
         cpg_finalize(cluster->cpg_handle);
         cluster->cpg_handle = 0;
         return -1;
 
     } else if (cpg_evicted) {
         crm_err("Evicted from CPG membership");
         return -1;
     }
     return 0;
 }
 
 static inline const char *
 ais_dest(const pcmk__cpg_host_t *host)
 {
     if (host->local) {
         return "local";
     } else if (host->size > 0) {
         return host->uname;
     } else {
         return "<all>";
     }
 }
 
 static inline const char *
 msg_type2text(enum crm_ais_msg_types type)
 {
     const char *text = "unknown";
 
     switch (type) {
         case crm_msg_none:
             text = "unknown";
             break;
         case crm_msg_ais:
             text = "ais";
             break;
         case crm_msg_cib:
             text = "cib";
             break;
         case crm_msg_crmd:
             text = "crmd";
             break;
         case crm_msg_pe:
             text = "pengine";
             break;
         case crm_msg_te:
             text = "tengine";
             break;
         case crm_msg_lrmd:
             text = "lrmd";
             break;
         case crm_msg_attrd:
             text = "attrd";
             break;
         case crm_msg_stonithd:
             text = "stonithd";
             break;
         case crm_msg_stonith_ng:
             text = "stonith-ng";
             break;
     }
     return text;
 }
 
 /*!
  * \internal
  * \brief Check whether a Corosync CPG message is valid
  *
  * \param[in] msg   Corosync CPG message to check
  *
  * \return true if \p msg is valid, otherwise false
  */
 static bool
 check_message_sanity(const pcmk__cpg_msg_t *msg)
 {
     int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
 
     if (payload_size < 1) {
         crm_err("%sCPG message %d from %s invalid: "
                 "Claimed size of %d bytes is too small "
                 CRM_XS " from %s[%u] to %s@%s",
                 (msg->is_compressed? "Compressed " : ""),
                 msg->id, ais_dest(&(msg->sender)),
                 (int) msg->header.size,
                 msg_type2text(msg->sender.type), msg->sender.pid,
                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
         return false;
     }
 
     if (msg->header.error != CS_OK) {
         crm_err("%sCPG message %d from %s invalid: "
                 "Sender indicated error %d "
                 CRM_XS " from %s[%u] to %s@%s",
                 (msg->is_compressed? "Compressed " : ""),
                 msg->id, ais_dest(&(msg->sender)),
                 msg->header.error,
                 msg_type2text(msg->sender.type), msg->sender.pid,
                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
         return false;
     }
 
     if (msg_data_len(msg) != payload_size) {
         crm_err("%sCPG message %d from %s invalid: "
                 "Total size %d inconsistent with payload size %d "
                 CRM_XS " from %s[%u] to %s@%s",
                 (msg->is_compressed? "Compressed " : ""),
                 msg->id, ais_dest(&(msg->sender)),
                 (int) msg->header.size, (int) msg_data_len(msg),
                 msg_type2text(msg->sender.type), msg->sender.pid,
                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
         return false;
     }
 
     if (!msg->is_compressed &&
         /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
          * but checking the last byte or two should be quick
          */
         (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
          || (msg->data[msg->size - 1] != '\0'))) {
         crm_err("CPG message %d from %s invalid: "
                 "Payload does not end at byte %llu "
                 CRM_XS " from %s[%u] to %s@%s",
                 msg->id, ais_dest(&(msg->sender)),
                 (unsigned long long) msg->size,
                 msg_type2text(msg->sender.type), msg->sender.pid,
                 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
         return false;
     }
 
     crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
               (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
               msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
               ais_dest(&(msg->sender)),
               msg_type2text(msg->host.type), ais_dest(&(msg->host)));
     return true;
 }
 
 /*!
  * \brief Extract text data from a Corosync CPG message
  *
  * \param[in]     handle   CPG connection (to get local node ID if not known)
  * \param[in]     nodeid   Corosync ID of node that sent message
  * \param[in]     pid      Process ID of message sender (for logging only)
  * \param[in,out] content  CPG message
  * \param[out]    kind     If not NULL, will be set to CPG header ID
  *                         (which should be an enum crm_ais_msg_class value,
  *                         currently always crm_class_cluster)
  * \param[out]    from     If not NULL, will be set to sender uname
  *                         (valid for the lifetime of \p content)
  *
  * \return Newly allocated string with message data
  * \note It is the caller's responsibility to free the return value with free().
  */
 char *
 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
                         uint32_t *kind, const char **from)
 {
     char *data = NULL;
     pcmk__cpg_msg_t *msg = (pcmk__cpg_msg_t *) content;
 
     if(handle) {
         // Do filtering and field massaging
         uint32_t local_nodeid = get_local_nodeid(handle);
         const char *local_name = get_local_node_name();
 
         if (msg->sender.id > 0 && msg->sender.id != nodeid) {
             crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
             return NULL;
 
         } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
             /* Not for us */
             crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
             return NULL;
         } else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
             /* Not for us */
             crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
             return NULL;
         }
 
         msg->sender.id = nodeid;
         if (msg->sender.size == 0) {
             crm_node_t *peer = pcmk__get_node(nodeid, NULL, NULL,
                                               pcmk__node_search_cluster);
 
             if (peer == NULL) {
                 crm_err("Peer with nodeid=%u is unknown", nodeid);
 
             } else if (peer->uname == NULL) {
                 crm_err("No uname for peer with nodeid=%u", nodeid);
 
             } else {
                 crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
                 msg->sender.size = strlen(peer->uname);
                 memset(msg->sender.uname, 0, MAX_NAME);
                 memcpy(msg->sender.uname, peer->uname, msg->sender.size);
             }
         }
     }
 
     crm_trace("Got new%s message (size=%d, %d, %d)",
               msg->is_compressed ? " compressed" : "",
               msg_data_len(msg), msg->size, msg->compressed_size);
 
     if (kind != NULL) {
         *kind = msg->header.id;
     }
     if (from != NULL) {
         *from = msg->sender.uname;
     }
 
     if (msg->is_compressed && msg->size > 0) {
         int rc = BZ_OK;
         char *uncompressed = NULL;
         unsigned int new_size = msg->size + 1;
 
         if (!check_message_sanity(msg)) {
             goto badmsg;
         }
 
         crm_trace("Decompressing message data");
         uncompressed = calloc(1, new_size);
         rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
 
         rc = pcmk__bzlib2rc(rc);
 
         if (rc != pcmk_rc_ok) {
             crm_err("Decompression failed: %s " CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
             free(uncompressed);
             goto badmsg;
         }
 
         CRM_ASSERT(new_size == msg->size);
 
         data = uncompressed;
 
     } else if (!check_message_sanity(msg)) {
         goto badmsg;
 
     } else {
         data = strdup(msg->data);
     }
 
     // Is this necessary?
     pcmk__get_node(msg->sender.id, msg->sender.uname, NULL,
                    pcmk__node_search_cluster);
 
     crm_trace("Payload: %.200s", data);
     return data;
 
   badmsg:
     crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
             " min=%d, total=%d, size=%d, bz2_size=%d",
             msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
             ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
             msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
             msg->header.size, msg->size, msg->compressed_size);
 
     free(data);
     return NULL;
 }
 
 /*!
  * \internal
  * \brief Compare cpg_address objects by node ID
  *
  * \param[in] first   First cpg_address structure to compare
  * \param[in] second  Second cpg_address structure to compare
  *
  * \return Negative number if first's node ID is lower,
  *         positive number if first's node ID is greater,
  *         or 0 if both node IDs are equal
  */
 static int
 cmp_member_list_nodeid(const void *first, const void *second)
 {
     const struct cpg_address *const a = *((const struct cpg_address **) first),
                              *const b = *((const struct cpg_address **) second);
     if (a->nodeid < b->nodeid) {
         return -1;
     } else if (a->nodeid > b->nodeid) {
         return 1;
     }
     /* don't bother with "reason" nor "pid" */
     return 0;
 }
 
 /*!
  * \internal
  * \brief Get a readable string equivalent of a cpg_reason_t value
  *
  * \param[in] reason  CPG reason value
  *
  * \return Readable string suitable for logging
  */
 static const char *
 cpgreason2str(cpg_reason_t reason)
 {
     switch (reason) {
         case CPG_REASON_JOIN:       return " via cpg_join";
         case CPG_REASON_LEAVE:      return " via cpg_leave";
         case CPG_REASON_NODEDOWN:   return " via cluster exit";
         case CPG_REASON_NODEUP:     return " via cluster join";
         case CPG_REASON_PROCDOWN:   return " for unknown reason";
         default:                    break;
     }
     return "";
 }
 
 /*!
  * \internal
  * \brief Get a log-friendly node name
  *
  * \param[in] peer  Node to check
  *
  * \return Node's uname, or readable string if not known
  */
 static inline const char *
 peer_name(const crm_node_t *peer)
 {
     if (peer == NULL) {
         return "unknown node";
     } else if (peer->uname == NULL) {
         return "peer node";
     } else {
         return peer->uname;
     }
 }
 
 /*!
  * \internal
  * \brief Process a CPG peer's leaving the cluster
  *
  * \param[in] cpg_group_name      CPG group name (for logging)
  * \param[in] event_counter       Event number (for logging)
  * \param[in] local_nodeid        Node ID of local node
  * \param[in] cpg_peer            CPG peer that left
  * \param[in] sorted_member_list  List of remaining members, qsort()-ed by ID
  * \param[in] member_list_entries Number of entries in \p sorted_member_list
  */
 static void
 node_left(const char *cpg_group_name, int event_counter,
           uint32_t local_nodeid, const struct cpg_address *cpg_peer,
           const struct cpg_address **sorted_member_list,
           size_t member_list_entries)
 {
-    crm_node_t *peer = pcmk__search_cluster_node_cache(cpg_peer->nodeid,
-                                                       NULL, NULL);
+    crm_node_t *peer = pcmk__search_node_caches(cpg_peer->nodeid, NULL,
+                                                pcmk__node_search_cluster);
     const struct cpg_address **rival = NULL;
 
     /* Most CPG-related Pacemaker code assumes that only one process on a node
      * can be in the process group, but Corosync does not impose this
      * limitation, and more than one can be a member in practice due to a
      * daemon attempting to start while another instance is already running.
      *
      * Check for any such duplicate instances, because we don't want to process
      * their leaving as if our actual peer left. If the peer that left still has
      * an entry in sorted_member_list (with a different PID), we will ignore the
      * leaving.
      *
      * @TODO Track CPG members' PIDs so we can tell exactly who left.
      */
     if (peer != NULL) {
         rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
                         sizeof(const struct cpg_address *),
                         cmp_member_list_nodeid);
     }
 
     if (rival == NULL) {
         crm_info("Group %s event %d: %s (node %u pid %u) left%s",
                  cpg_group_name, event_counter, peer_name(peer),
                  cpg_peer->nodeid, cpg_peer->pid,
                  cpgreason2str(cpg_peer->reason));
         if (peer != NULL) {
             crm_update_peer_proc(__func__, peer, crm_proc_cpg,
                                  OFFLINESTATUS);
         }
     } else if (cpg_peer->nodeid == local_nodeid) {
         crm_warn("Group %s event %d: duplicate local pid %u left%s",
                  cpg_group_name, event_counter,
                  cpg_peer->pid, cpgreason2str(cpg_peer->reason));
     } else {
         crm_warn("Group %s event %d: "
                  "%s (node %u) duplicate pid %u left%s (%u remains)",
                  cpg_group_name, event_counter, peer_name(peer),
                  cpg_peer->nodeid, cpg_peer->pid,
                  cpgreason2str(cpg_peer->reason), (*rival)->pid);
     }
 }
 
 /*!
  * \brief Handle a CPG configuration change event
  *
  * \param[in] handle               CPG connection
  * \param[in] cpg_name             CPG group name
  * \param[in] member_list          List of current CPG members
  * \param[in] member_list_entries  Number of entries in \p member_list
  * \param[in] left_list            List of CPG members that left
  * \param[in] left_list_entries    Number of entries in \p left_list
  * \param[in] joined_list          List of CPG members that joined
  * \param[in] joined_list_entries  Number of entries in \p joined_list
  */
 void
 pcmk_cpg_membership(cpg_handle_t handle,
                     const struct cpg_name *groupName,
                     const struct cpg_address *member_list, size_t member_list_entries,
                     const struct cpg_address *left_list, size_t left_list_entries,
                     const struct cpg_address *joined_list, size_t joined_list_entries)
 {
     int i;
     gboolean found = FALSE;
     static int counter = 0;
     uint32_t local_nodeid = get_local_nodeid(handle);
     const struct cpg_address **sorted;
 
     sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
     CRM_ASSERT(sorted != NULL);
 
     for (size_t iter = 0; iter < member_list_entries; iter++) {
         sorted[iter] = member_list + iter;
     }
     /* so that the cross-matching multiply-subscribed nodes is then cheap */
     qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
           cmp_member_list_nodeid);
 
     for (i = 0; i < left_list_entries; i++) {
         node_left(groupName->value, counter, local_nodeid, &left_list[i],
                   sorted, member_list_entries);
     }
     free(sorted);
     sorted = NULL;
 
     for (i = 0; i < joined_list_entries; i++) {
         crm_info("Group %s event %d: node %u pid %u joined%s",
                  groupName->value, counter, joined_list[i].nodeid,
                  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
     }
 
     for (i = 0; i < member_list_entries; i++) {
         crm_node_t *peer = pcmk__get_node(member_list[i].nodeid, NULL, NULL,
                                           pcmk__node_search_cluster);
 
         if (member_list[i].nodeid == local_nodeid
                 && member_list[i].pid != getpid()) {
             // See the note in node_left()
             crm_warn("Group %s event %d: detected duplicate local pid %u",
                      groupName->value, counter, member_list[i].pid);
             continue;
         }
         crm_info("Group %s event %d: %s (node %u pid %u) is member",
                  groupName->value, counter, peer_name(peer),
                  member_list[i].nodeid, member_list[i].pid);
 
         /* If the caller left auto-reaping enabled, this will also update the
          * state to member.
          */
         peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
                                     ONLINESTATUS);
 
         if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
             /* The node is a CPG member, but we currently think it's not a
              * cluster member. This is possible only if auto-reaping was
              * disabled. The node may be joining, and we happened to get the CPG
              * notification before the quorum notification; or the node may have
              * just died, and we are processing its final messages; or a bug
              * has affected the peer cache.
              */
             time_t now = time(NULL);
 
             if (peer->when_lost == 0) {
                 // Track when we first got into this contradictory state
                 peer->when_lost = now;
 
             } else if (now > (peer->when_lost + 60)) {
                 // If it persists for more than a minute, update the state
                 crm_warn("Node %u is member of group %s but was believed offline",
                          member_list[i].nodeid, groupName->value);
                 pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
             }
         }
 
         if (local_nodeid == member_list[i].nodeid) {
             found = TRUE;
         }
     }
 
     if (!found) {
         crm_err("Local node was evicted from group %s", groupName->value);
         cpg_evicted = true;
     }
 
     counter++;
 }
 
 /*!
  * \brief Connect to Corosync CPG
  *
  * \param[in,out] cluster  Cluster object
  *
  * \return TRUE on success, otherwise FALSE
  */
 gboolean
 cluster_connect_cpg(crm_cluster_t *cluster)
 {
     cs_error_t rc;
     int fd = -1;
     int retries = 0;
     uint32_t id = 0;
     crm_node_t *peer = NULL;
     cpg_handle_t handle = 0;
     const char *message_name = pcmk__message_name(crm_system_name);
     uid_t found_uid = 0;
     gid_t found_gid = 0;
     pid_t found_pid = 0;
     int rv;
 
     struct mainloop_fd_callbacks cpg_fd_callbacks = {
         .dispatch = pcmk_cpg_dispatch,
         .destroy = cluster->destroy,
     };
 
     cpg_model_v1_data_t cpg_model_info = {
 	    .model = CPG_MODEL_V1,
 	    .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
 	    .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
 	    .cpg_totem_confchg_fn = NULL,
 	    .flags = 0,
     };
 
     cpg_evicted = false;
     cluster->group.length = 0;
     cluster->group.value[0] = 0;
 
     /* group.value is char[128] */
     strncpy(cluster->group.value, message_name, 127);
     cluster->group.value[127] = 0;
     cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
 
     cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
     if (rc != CS_OK) {
         crm_err("Could not connect to the CPG API: %s (%d)",
                 cs_strerror(rc), rc);
         goto bail;
     }
 
     rc = cpg_fd_get(handle, &fd);
     if (rc != CS_OK) {
         crm_err("Could not obtain the CPG API connection: %s (%d)",
                 cs_strerror(rc), rc);
         goto bail;
     }
 
     /* CPG provider run as root (in given user namespace, anyway)? */
     if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
                                             &found_uid, &found_gid))) {
         crm_err("CPG provider is not authentic:"
                 " process %lld (uid: %lld, gid: %lld)",
                 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
                 (long long) found_uid, (long long) found_gid);
         rc = CS_ERR_ACCESS;
         goto bail;
     } else if (rv < 0) {
         crm_err("Could not verify authenticity of CPG provider: %s (%d)",
                 strerror(-rv), -rv);
         rc = CS_ERR_ACCESS;
         goto bail;
     }
 
     id = get_local_nodeid(handle);
     if (id == 0) {
         crm_err("Could not get local node id from the CPG API");
         goto bail;
 
     }
     cluster->nodeid = id;
 
     retries = 0;
     cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
     if (rc != CS_OK) {
         crm_err("Could not join the CPG group '%s': %d", message_name, rc);
         goto bail;
     }
 
     pcmk_cpg_handle = handle;
     cluster->cpg_handle = handle;
     mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
 
   bail:
     if (rc != CS_OK) {
         cpg_finalize(handle);
         return FALSE;
     }
 
     peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster);
     crm_update_peer_proc(__func__, peer, crm_proc_cpg, ONLINESTATUS);
     return TRUE;
 }
 
 /*!
  * \internal
  * \brief Send an XML message via Corosync CPG
  *
  * \param[in] msg   XML message to send
  * \param[in] node  Cluster node to send message to
  * \param[in] dest  Type of message to send
  *
  * \return TRUE on success, otherwise FALSE
  */
 bool
 pcmk__cpg_send_xml(const xmlNode *msg, const crm_node_t *node,
                    enum crm_ais_msg_types dest)
 {
     bool rc = true;
     char *data = NULL;
 
     data = dump_xml_unformatted(msg);
     rc = send_cluster_text(crm_class_cluster, data, FALSE, node, dest);
     free(data);
     return rc;
 }
 
 /*!
  * \internal
  * \brief Send string data via Corosync CPG
  *
  * \param[in] msg_class  Message class (to set as CPG header ID)
  * \param[in] data       Data to send
  * \param[in] local      What to set as host "local" value (which is never used)
  * \param[in] node       Cluster node to send message to
  * \param[in] dest       Type of message to send
  *
  * \return TRUE on success, otherwise FALSE
  */
 gboolean
 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
                   gboolean local, const crm_node_t *node,
                   enum crm_ais_msg_types dest)
 {
     static int msg_id = 0;
     static int local_pid = 0;
     static int local_name_len = 0;
     static const char *local_name = NULL;
 
     char *target = NULL;
     struct iovec *iov;
     pcmk__cpg_msg_t *msg = NULL;
     enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
 
     switch (msg_class) {
         case crm_class_cluster:
             break;
         default:
             crm_err("Invalid message class: %d", msg_class);
             return FALSE;
     }
 
     CRM_CHECK(dest != crm_msg_ais, return FALSE);
 
     if (local_name == NULL) {
         local_name = get_local_node_name();
     }
     if ((local_name_len == 0) && (local_name != NULL)) {
         local_name_len = strlen(local_name);
     }
 
     if (data == NULL) {
         data = "";
     }
 
     if (local_pid == 0) {
         local_pid = getpid();
     }
 
     if (sender == crm_msg_none) {
         sender = local_pid;
     }
 
     msg = calloc(1, sizeof(pcmk__cpg_msg_t));
 
     msg_id++;
     msg->id = msg_id;
     msg->header.id = msg_class;
     msg->header.error = CS_OK;
 
     msg->host.type = dest;
     msg->host.local = local;
 
     if (node) {
         if (node->uname) {
             target = strdup(node->uname);
             msg->host.size = strlen(node->uname);
             memset(msg->host.uname, 0, MAX_NAME);
             memcpy(msg->host.uname, node->uname, msg->host.size);
         } else {
             target = crm_strdup_printf("%u", node->id);
         }
         msg->host.id = node->id;
     } else {
         target = strdup("all");
     }
 
     msg->sender.id = 0;
     msg->sender.type = sender;
     msg->sender.pid = local_pid;
     msg->sender.size = local_name_len;
     memset(msg->sender.uname, 0, MAX_NAME);
     if ((local_name != NULL) && (msg->sender.size != 0)) {
         memcpy(msg->sender.uname, local_name, msg->sender.size);
     }
 
     msg->size = 1 + strlen(data);
     msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
 
     if (msg->size < CRM_BZ2_THRESHOLD) {
         msg = pcmk__realloc(msg, msg->header.size);
         memcpy(msg->data, data, msg->size);
 
     } else {
         char *compressed = NULL;
         unsigned int new_size = 0;
         char *uncompressed = strdup(data);
 
         if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
                            &compressed, &new_size) == pcmk_rc_ok) {
 
             msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
             msg = pcmk__realloc(msg, msg->header.size);
             memcpy(msg->data, compressed, new_size);
 
             msg->is_compressed = TRUE;
             msg->compressed_size = new_size;
 
         } else {
             // cppcheck seems not to understand the abort logic in pcmk__realloc
             // cppcheck-suppress memleak
             msg = pcmk__realloc(msg, msg->header.size);
             memcpy(msg->data, data, msg->size);
         }
 
         free(uncompressed);
         free(compressed);
     }
 
     iov = calloc(1, sizeof(struct iovec));
     iov->iov_base = msg;
     iov->iov_len = msg->header.size;
 
     if (msg->compressed_size) {
         crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
                   msg->id, target, (unsigned long long) iov->iov_len,
                   msg->compressed_size, data);
     } else {
         crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
                   msg->id, target, (unsigned long long) iov->iov_len,
                   msg->size, data);
     }
     free(target);
 
     cs_message_queue = g_list_append(cs_message_queue, iov);
     crm_cs_flush(&pcmk_cpg_handle);
 
     return TRUE;
 }
 
 /*!
  * \brief Get the message type equivalent of a string
  *
  * \param[in] text  String of message type
  *
  * \return Message type equivalent of \p text
  */
 enum crm_ais_msg_types
 text2msg_type(const char *text)
 {
     int type = crm_msg_none;
 
     CRM_CHECK(text != NULL, return type);
     text = pcmk__message_name(text);
     if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
         type = crm_msg_ais;
     } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
         type = crm_msg_cib;
     } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
         type = crm_msg_crmd;
     } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
         type = crm_msg_te;
     } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
         type = crm_msg_pe;
     } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
         type = crm_msg_lrmd;
     } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
         type = crm_msg_stonithd;
     } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
         type = crm_msg_stonith_ng;
     } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
         type = crm_msg_attrd;
 
     } else {
         /* This will normally be a transient client rather than
          * a cluster daemon.  Set the type to the pid of the client
          */
         int scan_rc = sscanf(text, "%d", &type);
 
         if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
             /* Ensure it's sane */
             type = crm_msg_none;
         }
     }
     return type;
 }
diff --git a/lib/cluster/membership.c b/lib/cluster/membership.c
index 32aab39364..9a01cf0769 100644
--- a/lib/cluster/membership.c
+++ b/lib/cluster/membership.c
@@ -1,1385 +1,1385 @@
 /*
  * Copyright 2004-2024 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU Lesser General Public License
  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #ifndef _GNU_SOURCE
 #  define _GNU_SOURCE
 #endif
 
 #include <sys/param.h>
 #include <sys/types.h>
 #include <stdio.h>
 #include <unistd.h>
 #include <string.h>
 #include <glib.h>
 #include <crm/common/ipc.h>
 #include <crm/common/xml_internal.h>
 #include <crm/cluster/internal.h>
 #include <crm/msg_xml.h>
 #include <crm/stonith-ng.h>
 #include "crmcluster_private.h"
 
 /* The peer cache remembers cluster nodes that have been seen.
  * This is managed mostly automatically by libcluster, based on
  * cluster membership events.
  *
  * Because cluster nodes can have conflicting names or UUIDs,
  * the hash table key is a uniquely generated ID.
  */
 GHashTable *crm_peer_cache = NULL;
 
 /*
  * The remote peer cache tracks pacemaker_remote nodes. While the
  * value has the same type as the peer cache's, it is tracked separately for
  * three reasons: pacemaker_remote nodes can't have conflicting names or UUIDs,
  * so the name (which is also the UUID) is used as the hash table key; there
  * is no equivalent of membership events, so management is not automatic; and
  * most users of the peer cache need to exclude pacemaker_remote nodes.
  *
  * That said, using a single cache would be more logical and less error-prone,
  * so it would be a good idea to merge them one day.
  *
  * libcluster provides two avenues for populating the cache:
  * crm_remote_peer_get() and crm_remote_peer_cache_remove() directly manage it,
  * while crm_remote_peer_cache_refresh() populates it via the CIB.
  */
 GHashTable *crm_remote_peer_cache = NULL;
 
 /*
  * The known node cache tracks cluster and remote nodes that have been seen in
  * the CIB. It is useful mainly when a caller needs to know about a node that
  * may no longer be in the membership, but doesn't want to add the node to the
  * main peer cache tables.
  */
 static GHashTable *known_node_cache = NULL;
 
 unsigned long long crm_peer_seq = 0;
 gboolean crm_have_quorum = FALSE;
 static gboolean crm_autoreap  = TRUE;
 
 // Flag setting and clearing for crm_node_t:flags
 
 #define set_peer_flags(peer, flags_to_set) do {                               \
         (peer)->flags = pcmk__set_flags_as(__func__, __LINE__, LOG_TRACE,     \
                                            "Peer", (peer)->uname,             \
                                            (peer)->flags, (flags_to_set),     \
                                            #flags_to_set);                    \
     } while (0)
 
 #define clear_peer_flags(peer, flags_to_clear) do {                           \
         (peer)->flags = pcmk__clear_flags_as(__func__, __LINE__,              \
                                              LOG_TRACE,                       \
                                              "Peer", (peer)->uname,           \
                                              (peer)->flags, (flags_to_clear), \
                                              #flags_to_clear);                \
     } while (0)
 
 static void update_peer_uname(crm_node_t *node, const char *uname);
 static crm_node_t *find_known_node(const char *id, const char *uname);
 
 int
 crm_remote_peer_cache_size(void)
 {
     if (crm_remote_peer_cache == NULL) {
         return 0;
     }
     return g_hash_table_size(crm_remote_peer_cache);
 }
 
 /*!
  * \brief Get a remote node peer cache entry, creating it if necessary
  *
  * \param[in] node_name  Name of remote node
  *
  * \return Cache entry for node on success, NULL (and set errno) otherwise
  *
  * \note When creating a new entry, this will leave the node state undetermined,
  *       so the caller should also call pcmk__update_peer_state() if the state
  *       is known.
  * \note Because this can add and remove cache entries, callers should not
  *       assume any previously obtained cache entry pointers remain valid.
  */
 crm_node_t *
 crm_remote_peer_get(const char *node_name)
 {
     crm_node_t *node;
     char *node_name_copy = NULL;
 
     if (node_name == NULL) {
         errno = EINVAL;
         return NULL;
     }
 
     /* It's theoretically possible that the node was added to the cluster peer
      * cache before it was known to be a Pacemaker Remote node. Remove that
      * entry unless it has a node ID, which means the name actually is
      * associated with a cluster node. (@TODO return an error in that case?)
      */
-    node = pcmk__search_cluster_node_cache(0, node_name, NULL);
+    node = pcmk__search_node_caches(0, node_name, pcmk__node_search_cluster);
     if ((node != NULL) && (node->uuid == NULL)) {
         /* node_name could be a pointer into the cache entry being removed, so
          * reassign it to a copy before the original gets freed
          */
         node_name_copy = strdup(node_name);
         if (node_name_copy == NULL) {
             errno = ENOMEM;
             return NULL;
         }
         node_name = node_name_copy;
         reap_crm_member(0, node_name);
     }
 
     /* Return existing cache entry if one exists */
     node = g_hash_table_lookup(crm_remote_peer_cache, node_name);
     if (node) {
         free(node_name_copy);
         return node;
     }
 
     /* Allocate a new entry */
     node = calloc(1, sizeof(crm_node_t));
     if (node == NULL) {
         free(node_name_copy);
         return NULL;
     }
 
     /* Populate the essential information */
     set_peer_flags(node, crm_remote_node);
     node->uuid = strdup(node_name);
     if (node->uuid == NULL) {
         free(node);
         errno = ENOMEM;
         free(node_name_copy);
         return NULL;
     }
 
     /* Add the new entry to the cache */
     g_hash_table_replace(crm_remote_peer_cache, node->uuid, node);
     crm_trace("added %s to remote cache", node_name);
 
     /* Update the entry's uname, ensuring peer status callbacks are called */
     update_peer_uname(node, node_name);
     free(node_name_copy);
     return node;
 }
 
 /*!
  * \brief Remove a node from the Pacemaker Remote node cache
  *
  * \param[in] node_name  Name of node to remove from cache
  *
  * \note The caller must be careful not to use \p node_name after calling this
  *       function if it might be a pointer into the cache entry being removed.
  */
 void
 crm_remote_peer_cache_remove(const char *node_name)
 {
     /* Do a lookup first, because node_name could be a pointer within the entry
      * being removed -- we can't log it *after* removing it.
      */
     if (g_hash_table_lookup(crm_remote_peer_cache, node_name) != NULL) {
         crm_trace("Removing %s from Pacemaker Remote node cache", node_name);
         g_hash_table_remove(crm_remote_peer_cache, node_name);
     }
 }
 
 /*!
  * \internal
  * \brief Return node status based on a CIB status entry
  *
  * \param[in] node_state  XML of node state
  *
  * \return CRM_NODE_LOST if PCMK__XA_IN_CCM is false in node_state,
  *         CRM_NODE_MEMBER otherwise
  * \note Unlike most boolean XML attributes, this one defaults to true, for
  *       backward compatibility with older controllers that don't set it.
  */
 static const char *
 remote_state_from_cib(const xmlNode *node_state)
 {
     bool status = false;
 
     if ((pcmk__xe_get_bool_attr(node_state, PCMK__XA_IN_CCM,
                                 &status) == pcmk_rc_ok) && !status) {
         return CRM_NODE_LOST;
     } else {
         return CRM_NODE_MEMBER;
     }
 }
 
 /* user data for looping through remote node xpath searches */
 struct refresh_data {
     const char *field;  /* XML attribute to check for node name */
     gboolean has_state; /* whether to update node state based on XML */
 };
 
 /*!
  * \internal
  * \brief Process one pacemaker_remote node xpath search result
  *
  * \param[in] result     XML search result
  * \param[in] user_data  what to look for in the XML
  */
 static void
 remote_cache_refresh_helper(xmlNode *result, void *user_data)
 {
     const struct refresh_data *data = user_data;
     const char *remote = crm_element_value(result, data->field);
     const char *state = NULL;
     crm_node_t *node;
 
     CRM_CHECK(remote != NULL, return);
 
     /* Determine node's state, if the result has it */
     if (data->has_state) {
         state = remote_state_from_cib(result);
     }
 
     /* Check whether cache already has entry for node */
     node = g_hash_table_lookup(crm_remote_peer_cache, remote);
 
     if (node == NULL) {
         /* Node is not in cache, so add a new entry for it */
         node = crm_remote_peer_get(remote);
         CRM_ASSERT(node);
         if (state) {
             pcmk__update_peer_state(__func__, node, state, 0);
         }
 
     } else if (pcmk_is_set(node->flags, crm_node_dirty)) {
         /* Node is in cache and hasn't been updated already, so mark it clean */
         clear_peer_flags(node, crm_node_dirty);
         if (state) {
             pcmk__update_peer_state(__func__, node, state, 0);
         }
     }
 }
 
 static void
 mark_dirty(gpointer key, gpointer value, gpointer user_data)
 {
     set_peer_flags((crm_node_t *) value, crm_node_dirty);
 }
 
 static gboolean
 is_dirty(gpointer key, gpointer value, gpointer user_data)
 {
     return pcmk_is_set(((crm_node_t*)value)->flags, crm_node_dirty);
 }
 
 /*!
  * \brief Repopulate the remote peer cache based on CIB XML
  *
  * \param[in] xmlNode  CIB XML to parse
  */
 void
 crm_remote_peer_cache_refresh(xmlNode *cib)
 {
     struct refresh_data data;
 
     crm_peer_init();
 
     /* First, we mark all existing cache entries as dirty,
      * so that later we can remove any that weren't in the CIB.
      * We don't empty the cache, because we need to detect changes in state.
      */
     g_hash_table_foreach(crm_remote_peer_cache, mark_dirty, NULL);
 
     /* Look for guest nodes and remote nodes in the status section */
     data.field = PCMK_XA_ID;
     data.has_state = TRUE;
     crm_foreach_xpath_result(cib, PCMK__XP_REMOTE_NODE_STATUS,
                              remote_cache_refresh_helper, &data);
 
     /* Look for guest nodes and remote nodes in the configuration section,
      * because they may have just been added and not have a status entry yet.
      * In that case, the cached node state will be left NULL, so that the
      * peer status callback isn't called until we're sure the node started
      * successfully.
      */
     data.field = PCMK_XA_VALUE;
     data.has_state = FALSE;
     crm_foreach_xpath_result(cib, PCMK__XP_GUEST_NODE_CONFIG,
                              remote_cache_refresh_helper, &data);
     data.field = PCMK_XA_ID;
     data.has_state = FALSE;
     crm_foreach_xpath_result(cib, PCMK__XP_REMOTE_NODE_CONFIG,
                              remote_cache_refresh_helper, &data);
 
     /* Remove all old cache entries that weren't seen in the CIB */
     g_hash_table_foreach_remove(crm_remote_peer_cache, is_dirty, NULL);
 }
 
 gboolean
 crm_is_peer_active(const crm_node_t * node)
 {
     if(node == NULL) {
         return FALSE;
     }
 
     if (pcmk_is_set(node->flags, crm_remote_node)) {
         /* remote nodes are never considered active members. This
          * guarantees they will never be considered for DC membership.*/
         return FALSE;
     }
 #if SUPPORT_COROSYNC
     if (is_corosync_cluster()) {
         return crm_is_corosync_peer_active(node);
     }
 #endif
     crm_err("Unhandled cluster type: %s", name_for_cluster_type(get_cluster_type()));
     return FALSE;
 }
 
 static gboolean
 crm_reap_dead_member(gpointer key, gpointer value, gpointer user_data)
 {
     crm_node_t *node = value;
     crm_node_t *search = user_data;
 
     if (search == NULL) {
         return FALSE;
 
     } else if (search->id && node->id != search->id) {
         return FALSE;
 
     } else if (search->id == 0 && !pcmk__str_eq(node->uname, search->uname, pcmk__str_casei)) {
         return FALSE;
 
     } else if (crm_is_peer_active(value) == FALSE) {
         crm_info("Removing node with name %s and id %u from membership cache",
                  (node->uname? node->uname : "unknown"), node->id);
         return TRUE;
     }
     return FALSE;
 }
 
 /*!
  * \brief Remove all peer cache entries matching a node ID and/or uname
  *
  * \param[in] id    ID of node to remove (or 0 to ignore)
  * \param[in] name  Uname of node to remove (or NULL to ignore)
  *
  * \return Number of cache entries removed
  *
  * \note The caller must be careful not to use \p name after calling this
  *       function if it might be a pointer into the cache entry being removed.
  */
 guint
 reap_crm_member(uint32_t id, const char *name)
 {
     int matches = 0;
     crm_node_t search = { 0, };
 
     if (crm_peer_cache == NULL) {
         crm_trace("Membership cache not initialized, ignoring purge request");
         return 0;
     }
 
     search.id = id;
     pcmk__str_update(&search.uname, name);
     matches = g_hash_table_foreach_remove(crm_peer_cache, crm_reap_dead_member, &search);
     if(matches) {
         crm_notice("Purged %d peer%s with id=%u%s%s from the membership cache",
                    matches, pcmk__plural_s(matches), search.id,
                    (search.uname? " and/or uname=" : ""),
                    (search.uname? search.uname : ""));
 
     } else {
         crm_info("No peers with id=%u%s%s to purge from the membership cache",
                  search.id, (search.uname? " and/or uname=" : ""),
                  (search.uname? search.uname : ""));
     }
 
     free(search.uname);
     return matches;
 }
 
 static void
 count_peer(gpointer key, gpointer value, gpointer user_data)
 {
     guint *count = user_data;
     crm_node_t *node = value;
 
     if (crm_is_peer_active(node)) {
         *count = *count + 1;
     }
 }
 
 guint
 crm_active_peers(void)
 {
     guint count = 0;
 
     if (crm_peer_cache) {
         g_hash_table_foreach(crm_peer_cache, count_peer, &count);
     }
     return count;
 }
 
 static void
 destroy_crm_node(gpointer data)
 {
     crm_node_t *node = data;
 
     crm_trace("Destroying entry for node %u: %s", node->id, node->uname);
 
     free(node->uname);
     free(node->state);
     free(node->uuid);
     free(node->expected);
     free(node->conn_host);
     free(node);
 }
 
 void
 crm_peer_init(void)
 {
     if (crm_peer_cache == NULL) {
         crm_peer_cache = pcmk__strikey_table(free, destroy_crm_node);
     }
 
     if (crm_remote_peer_cache == NULL) {
         crm_remote_peer_cache = pcmk__strikey_table(NULL, destroy_crm_node);
     }
 
     if (known_node_cache == NULL) {
         known_node_cache = pcmk__strikey_table(free, destroy_crm_node);
     }
 }
 
 void
 crm_peer_destroy(void)
 {
     if (crm_peer_cache != NULL) {
         crm_trace("Destroying peer cache with %d members", g_hash_table_size(crm_peer_cache));
         g_hash_table_destroy(crm_peer_cache);
         crm_peer_cache = NULL;
     }
 
     if (crm_remote_peer_cache != NULL) {
         crm_trace("Destroying remote peer cache with %d members", g_hash_table_size(crm_remote_peer_cache));
         g_hash_table_destroy(crm_remote_peer_cache);
         crm_remote_peer_cache = NULL;
     }
 
     if (known_node_cache != NULL) {
         crm_trace("Destroying known node cache with %d members",
                   g_hash_table_size(known_node_cache));
         g_hash_table_destroy(known_node_cache);
         known_node_cache = NULL;
     }
 
 }
 
 static void (*peer_status_callback)(enum crm_status_type, crm_node_t *,
                                     const void *) = NULL;
 
 /*!
  * \brief Set a client function that will be called after peer status changes
  *
  * \param[in] dispatch  Pointer to function to use as callback
  *
  * \note Previously, client callbacks were responsible for peer cache
  *       management. This is no longer the case, and client callbacks should do
  *       only client-specific handling. Callbacks MUST NOT add or remove entries
  *       in the peer caches.
  */
 void
 crm_set_status_callback(void (*dispatch) (enum crm_status_type, crm_node_t *, const void *))
 {
     peer_status_callback = dispatch;
 }
 
 /*!
  * \brief Tell the library whether to automatically reap lost nodes
  *
  * If TRUE (the default), calling crm_update_peer_proc() will also update the
  * peer state to CRM_NODE_MEMBER or CRM_NODE_LOST, and pcmk__update_peer_state()
  * will reap peers whose state changes to anything other than CRM_NODE_MEMBER.
  * Callers should leave this enabled unless they plan to manage the cache
  * separately on their own.
  *
  * \param[in] autoreap  TRUE to enable automatic reaping, FALSE to disable
  */
 void
 crm_set_autoreap(gboolean autoreap)
 {
     crm_autoreap = autoreap;
 }
 
 static void
 dump_peer_hash(int level, const char *caller)
 {
     GHashTableIter iter;
     const char *id = NULL;
     crm_node_t *node = NULL;
 
     g_hash_table_iter_init(&iter, crm_peer_cache);
     while (g_hash_table_iter_next(&iter, (gpointer *) &id, (gpointer *) &node)) {
         do_crm_log(level, "%s: Node %u/%s = %p - %s", caller, node->id, node->uname, node, id);
     }
 }
 
 static gboolean
 hash_find_by_data(gpointer key, gpointer value, gpointer user_data)
 {
     return value == user_data;
 }
 
 /*!
  * \internal
  * \brief Search caches for a node (cluster or Pacemaker Remote)
  *
  * \param[in] id     If not 0, cluster node ID to search for
  * \param[in] uname  If not NULL, node name to search for
  * \param[in] flags  Group of enum pcmk__node_search_flags
  *
  * \return Node cache entry if found, otherwise NULL
  */
 crm_node_t *
 pcmk__search_node_caches(unsigned int id, const char *uname, uint32_t flags)
 {
     crm_node_t *node = NULL;
 
     CRM_ASSERT(id > 0 || uname != NULL);
 
     crm_peer_init();
 
     if ((uname != NULL) && pcmk_is_set(flags, pcmk__node_search_remote)) {
         node = g_hash_table_lookup(crm_remote_peer_cache, uname);
     }
 
     if ((node == NULL) && pcmk_is_set(flags, pcmk__node_search_cluster)) {
         node = pcmk__search_cluster_node_cache(id, uname, NULL);
     }
 
     if ((node == NULL) && pcmk_is_set(flags, pcmk__node_search_known)) {
         char *id_str = (id == 0)? NULL : crm_strdup_printf("%u", id);
 
         node = find_known_node(id_str, uname);
         free(id_str);
     }
 
     return node;
 }
 
 /*!
  * \internal
  * \brief Purge a node from cache (both cluster and Pacemaker Remote)
  *
  * \param[in] node_name  If not NULL, purge only nodes with this name
  * \param[in] node_id    If not 0, purge cluster nodes only if they have this ID
  *
  * \note If \p node_name is NULL and \p node_id is 0, no nodes will be purged.
  *       If \p node_name is not NULL and \p node_id is not 0, Pacemaker Remote
  *       nodes that match \p node_name will be purged, and cluster nodes that
  *       match both \p node_name and \p node_id will be purged.
  * \note The caller must be careful not to use \p node_name after calling this
  *       function if it might be a pointer into a cache entry being removed.
  */
 void
 pcmk__purge_node_from_cache(const char *node_name, uint32_t node_id)
 {
     char *node_name_copy = NULL;
 
     if ((node_name == NULL) && (node_id == 0U)) {
         return;
     }
 
     // Purge from Pacemaker Remote node cache
     if ((node_name != NULL)
         && (g_hash_table_lookup(crm_remote_peer_cache, node_name) != NULL)) {
         /* node_name could be a pointer into the cache entry being purged,
          * so reassign it to a copy before the original gets freed
          */
         node_name_copy = strdup(node_name);
         CRM_ASSERT(node_name_copy != NULL);
         node_name = node_name_copy;
 
         crm_trace("Purging %s from Pacemaker Remote node cache", node_name);
         g_hash_table_remove(crm_remote_peer_cache, node_name);
     }
 
     reap_crm_member(node_id, node_name);
     free(node_name_copy);
 }
 
 /*!
  * \internal
  * \brief Search cluster node cache
  *
  * \param[in] id     If not 0, cluster node ID to search for
  * \param[in] uname  If not NULL, node name to search for
  * \param[in] uuid   If not NULL while id is 0, node UUID instead of cluster
  *                   node ID to search for
  *
  * \return Cluster node cache entry if found, otherwise NULL
  */
 crm_node_t *
 pcmk__search_cluster_node_cache(unsigned int id, const char *uname,
                                 const char *uuid)
 {
     GHashTableIter iter;
     crm_node_t *node = NULL;
     crm_node_t *by_id = NULL;
     crm_node_t *by_name = NULL;
 
     CRM_ASSERT(id > 0 || uname != NULL);
 
     crm_peer_init();
 
     if (uname != NULL) {
         g_hash_table_iter_init(&iter, crm_peer_cache);
         while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
             if(node->uname && strcasecmp(node->uname, uname) == 0) {
                 crm_trace("Name match: %s = %p", node->uname, node);
                 by_name = node;
                 break;
             }
         }
     }
 
     if (id > 0) {
         g_hash_table_iter_init(&iter, crm_peer_cache);
         while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
             if(node->id == id) {
                 crm_trace("ID match: %u = %p", node->id, node);
                 by_id = node;
                 break;
             }
         }
 
     } else if (uuid != NULL) {
         g_hash_table_iter_init(&iter, crm_peer_cache);
         while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
             if (pcmk__str_eq(node->uuid, uuid, pcmk__str_casei)) {
                 crm_trace("UUID match: %s = %p", node->uuid, node);
                 by_id = node;
                 break;
             }
         }
     }
 
     node = by_id; /* Good default */
     if(by_id == by_name) {
         /* Nothing to do if they match (both NULL counts) */
         crm_trace("Consistent: %p for %u/%s", by_id, id, uname);
 
     } else if(by_id == NULL && by_name) {
         crm_trace("Only one: %p for %u/%s", by_name, id, uname);
 
         if(id && by_name->id) {
             dump_peer_hash(LOG_WARNING, __func__);
             crm_crit("Node %u and %u share the same name '%s'",
                      id, by_name->id, uname);
             node = NULL; /* Create a new one */
 
         } else {
             node = by_name;
         }
 
     } else if(by_name == NULL && by_id) {
         crm_trace("Only one: %p for %u/%s", by_id, id, uname);
 
         if(uname && by_id->uname) {
             dump_peer_hash(LOG_WARNING, __func__);
             crm_crit("Node '%s' and '%s' share the same cluster nodeid %u: assuming '%s' is correct",
                      uname, by_id->uname, id, uname);
         }
 
     } else if(uname && by_id->uname) {
         if(pcmk__str_eq(uname, by_id->uname, pcmk__str_casei)) {
             crm_notice("Node '%s' has changed its ID from %u to %u", by_id->uname, by_name->id, by_id->id);
             g_hash_table_foreach_remove(crm_peer_cache, hash_find_by_data, by_name);
 
         } else {
             crm_warn("Node '%s' and '%s' share the same cluster nodeid: %u %s", by_id->uname, by_name->uname, id, uname);
             dump_peer_hash(LOG_INFO, __func__);
             crm_abort(__FILE__, __func__, __LINE__, "member weirdness", TRUE,
                       TRUE);
         }
 
     } else if(id && by_name->id) {
         crm_warn("Node %u and %u share the same name: '%s'", by_id->id, by_name->id, uname);
 
     } else {
         /* Simple merge */
 
         /* Only corosync-based clusters use node IDs. The functions that call
          * pcmk__update_peer_state() and crm_update_peer_proc() only know
          * nodeid, so 'by_id' is authoritative when merging.
          */
         dump_peer_hash(LOG_DEBUG, __func__);
 
         crm_info("Merging %p into %p", by_name, by_id);
         g_hash_table_foreach_remove(crm_peer_cache, hash_find_by_data, by_name);
     }
 
     return node;
 }
 
 #if SUPPORT_COROSYNC
 static guint
 remove_conflicting_peer(crm_node_t *node)
 {
     int matches = 0;
     GHashTableIter iter;
     crm_node_t *existing_node = NULL;
 
     if (node->id == 0 || node->uname == NULL) {
         return 0;
     }
 
     if (!pcmk__corosync_has_nodelist()) {
         return 0;
     }
 
     g_hash_table_iter_init(&iter, crm_peer_cache);
     while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &existing_node)) {
         if (existing_node->id > 0
             && existing_node->id != node->id
             && existing_node->uname != NULL
             && strcasecmp(existing_node->uname, node->uname) == 0) {
 
             if (crm_is_peer_active(existing_node)) {
                 continue;
             }
 
             crm_warn("Removing cached offline node %u/%s which has conflicting uname with %u",
                      existing_node->id, existing_node->uname, node->id);
 
             g_hash_table_iter_remove(&iter);
             matches++;
         }
     }
 
     return matches;
 }
 #endif
 
 /*!
  * \brief Get a cluster node cache entry
  *
  * \param[in] id     If not 0, cluster node ID to search for
  * \param[in] uname  If not NULL, node name to search for
  * \param[in] uuid   If not NULL while id is 0, node UUID instead of cluster
  *                   node ID to search for
  * \param[in] flags  Group of enum pcmk__node_search_flags
  *
  * \return (Possibly newly created) cluster node cache entry
  */
 /* coverity[-alloc] Memory is referenced in one or both hashtables */
 crm_node_t *
 pcmk__get_node(unsigned int id, const char *uname, const char *uuid,
                uint32_t flags)
 {
     crm_node_t *node = NULL;
     char *uname_lookup = NULL;
 
     CRM_ASSERT(id > 0 || uname != NULL);
 
     crm_peer_init();
 
     // Check the Pacemaker Remote node cache first
     if (pcmk_is_set(flags, pcmk__node_search_remote)) {
         node = g_hash_table_lookup(crm_remote_peer_cache, uname);
         if (node != NULL) {
             return node;
         }
     }
 
     if (!pcmk_is_set(flags, pcmk__node_search_cluster)) {
         return NULL;
     }
 
     node = pcmk__search_cluster_node_cache(id, uname, uuid);
 
     /* if uname wasn't provided, and find_peer did not turn up a uname based on id.
      * we need to do a lookup of the node name using the id in the cluster membership. */
     if ((node == NULL || node->uname == NULL) && (uname == NULL)) { 
         uname_lookup = get_node_name(id);
     }
 
     if (uname_lookup) {
         uname = uname_lookup;
         crm_trace("Inferred a name of '%s' for node %u", uname, id);
 
         /* try to turn up the node one more time now that we know the uname. */
         if (node == NULL) {
             node = pcmk__search_cluster_node_cache(id, uname, uuid);
         }
     }
 
     if (node == NULL) {
         char *uniqueid = crm_generate_uuid();
 
         node = calloc(1, sizeof(crm_node_t));
         CRM_ASSERT(node);
 
         crm_info("Created entry %s/%p for node %s/%u (%d total)",
                  uniqueid, node, uname, id, 1 + g_hash_table_size(crm_peer_cache));
         g_hash_table_replace(crm_peer_cache, uniqueid, node);
     }
 
     if(id > 0 && uname && (node->id == 0 || node->uname == NULL)) {
         crm_info("Node %u is now known as %s", id, uname);
     }
 
     if(id > 0 && node->id == 0) {
         node->id = id;
     }
 
     if (uname && (node->uname == NULL)) {
         update_peer_uname(node, uname);
     }
 
     if(node->uuid == NULL) {
         if (uuid == NULL) {
             uuid = crm_peer_uuid(node);
         }
 
         if (uuid) {
             crm_info("Node %u has uuid %s", id, uuid);
 
         } else {
             crm_info("Cannot obtain a UUID for node %u/%s", id, node->uname);
         }
     }
 
     free(uname_lookup);
 
     return node;
 }
 
 /*!
  * \internal
  * \brief Update a node's uname
  *
  * \param[in,out] node   Node object to update
  * \param[in]     uname  New name to set
  *
  * \note This function should not be called within a peer cache iteration,
  *       because in some cases it can remove conflicting cache entries,
  *       which would invalidate the iterator.
  */
 static void
 update_peer_uname(crm_node_t *node, const char *uname)
 {
     CRM_CHECK(uname != NULL,
               crm_err("Bug: can't update node name without name"); return);
     CRM_CHECK(node != NULL,
               crm_err("Bug: can't update node name to %s without node", uname);
               return);
 
     if (pcmk__str_eq(uname, node->uname, pcmk__str_casei)) {
         crm_debug("Node uname '%s' did not change", uname);
         return;
     }
 
     for (const char *c = uname; *c; ++c) {
         if ((*c >= 'A') && (*c <= 'Z')) {
             crm_warn("Node names with capitals are discouraged, consider changing '%s'",
                      uname);
             break;
         }
     }
 
     pcmk__str_update(&node->uname, uname);
 
     if (peer_status_callback != NULL) {
         peer_status_callback(crm_status_uname, node, NULL);
     }
 
 #if SUPPORT_COROSYNC
     if (is_corosync_cluster() && !pcmk_is_set(node->flags, crm_remote_node)) {
         remove_conflicting_peer(node);
     }
 #endif
 }
 
 /*!
  * \internal
  * \brief Get log-friendly string equivalent of a process flag
  *
  * \param[in] proc  Process flag
  *
  * \return Log-friendly string equivalent of \p proc
  */
 static inline const char *
 proc2text(enum crm_proc_flag proc)
 {
     const char *text = "unknown";
 
     switch (proc) {
         case crm_proc_none:
             text = "none";
             break;
         case crm_proc_based:
             text = "pacemaker-based";
             break;
         case crm_proc_controld:
             text = "pacemaker-controld";
             break;
         case crm_proc_schedulerd:
             text = "pacemaker-schedulerd";
             break;
         case crm_proc_execd:
             text = "pacemaker-execd";
             break;
         case crm_proc_attrd:
             text = "pacemaker-attrd";
             break;
         case crm_proc_fenced:
             text = "pacemaker-fenced";
             break;
         case crm_proc_cpg:
             text = "corosync-cpg";
             break;
     }
     return text;
 }
 
 /*!
  * \internal
  * \brief Update a node's process information (and potentially state)
  *
  * \param[in]     source  Caller's function name (for log messages)
  * \param[in,out] node    Node object to update
  * \param[in]     flag    Bitmask of new process information
  * \param[in]     status  node status (online, offline, etc.)
  *
  * \return NULL if any node was reaped from peer caches, value of node otherwise
  *
  * \note If this function returns NULL, the supplied node object was likely
  *       freed and should not be used again. This function should not be
  *       called within a cache iteration if reaping is possible, otherwise
  *       reaping could invalidate the iterator.
  */
 crm_node_t *
 crm_update_peer_proc(const char *source, crm_node_t * node, uint32_t flag, const char *status)
 {
     uint32_t last = 0;
     gboolean changed = FALSE;
 
     CRM_CHECK(node != NULL, crm_err("%s: Could not set %s to %s for NULL",
                                     source, proc2text(flag), status);
                             return NULL);
 
     /* Pacemaker doesn't spawn processes on remote nodes */
     if (pcmk_is_set(node->flags, crm_remote_node)) {
         return node;
     }
 
     last = node->processes;
     if (status == NULL) {
         node->processes = flag;
         if (node->processes != last) {
             changed = TRUE;
         }
 
     } else if (pcmk__str_eq(status, ONLINESTATUS, pcmk__str_casei)) {
         if ((node->processes & flag) != flag) {
             node->processes = pcmk__set_flags_as(__func__, __LINE__,
                                                  LOG_TRACE, "Peer process",
                                                  node->uname, node->processes,
                                                  flag, "processes");
             changed = TRUE;
         }
 
     } else if (node->processes & flag) {
         node->processes = pcmk__clear_flags_as(__func__, __LINE__,
                                                LOG_TRACE, "Peer process",
                                                node->uname, node->processes,
                                                flag, "processes");
         changed = TRUE;
     }
 
     if (changed) {
         if (status == NULL && flag <= crm_proc_none) {
             crm_info("%s: Node %s[%u] - all processes are now offline", source, node->uname,
                      node->id);
         } else {
             crm_info("%s: Node %s[%u] - %s is now %s", source, node->uname, node->id,
                      proc2text(flag), status);
         }
 
         if (pcmk_is_set(node->processes, crm_get_cluster_proc())) {
             node->when_online = time(NULL);
 
         } else {
             node->when_online = 0;
         }
 
         /* Call the client callback first, then update the peer state,
          * in case the node will be reaped
          */
         if (peer_status_callback != NULL) {
             peer_status_callback(crm_status_processes, node, &last);
         }
 
         /* The client callback shouldn't touch the peer caches,
          * but as a safety net, bail if the peer cache was destroyed.
          */
         if (crm_peer_cache == NULL) {
             return NULL;
         }
 
         if (crm_autoreap) {
             const char *peer_state = NULL;
 
             if (pcmk_is_set(node->processes, crm_get_cluster_proc())) {
                 peer_state = CRM_NODE_MEMBER;
             } else {
                 peer_state = CRM_NODE_LOST;
             }
             node = pcmk__update_peer_state(__func__, node, peer_state, 0);
         }
     } else {
         crm_trace("%s: Node %s[%u] - %s is unchanged (%s)", source, node->uname, node->id,
                   proc2text(flag), status);
     }
     return node;
 }
 
 /*!
  * \internal
  * \brief Update a cluster node cache entry's expected join state
  *
  * \param[in]     source    Caller's function name (for logging)
  * \param[in,out] node      Node to update
  * \param[in]     expected  Node's new join state
  */
 void
 pcmk__update_peer_expected(const char *source, crm_node_t *node,
                            const char *expected)
 {
     char *last = NULL;
     gboolean changed = FALSE;
 
     CRM_CHECK(node != NULL, crm_err("%s: Could not set 'expected' to %s", source, expected);
               return);
 
     /* Remote nodes don't participate in joins */
     if (pcmk_is_set(node->flags, crm_remote_node)) {
         return;
     }
 
     last = node->expected;
     if (expected != NULL && !pcmk__str_eq(node->expected, expected, pcmk__str_casei)) {
         node->expected = strdup(expected);
         changed = TRUE;
     }
 
     if (changed) {
         crm_info("%s: Node %s[%u] - expected state is now %s (was %s)", source, node->uname, node->id,
                  expected, last);
         free(last);
     } else {
         crm_trace("%s: Node %s[%u] - expected state is unchanged (%s)", source, node->uname,
                   node->id, expected);
     }
 }
 
 /*!
  * \internal
  * \brief Update a node's state and membership information
  *
  * \param[in]     source      Caller's function name (for log messages)
  * \param[in,out] node        Node object to update
  * \param[in]     state       Node's new state
  * \param[in]     membership  Node's new membership ID
  * \param[in,out] iter        If not NULL, pointer to node's peer cache iterator
  *
  * \return NULL if any node was reaped, value of node otherwise
  *
  * \note If this function returns NULL, the supplied node object was likely
  *       freed and should not be used again. This function may be called from
  *       within a peer cache iteration if the iterator is supplied.
  */
 static crm_node_t *
 update_peer_state_iter(const char *source, crm_node_t *node, const char *state,
                        uint64_t membership, GHashTableIter *iter)
 {
     gboolean is_member;
 
     CRM_CHECK(node != NULL,
               crm_err("Could not set state for unknown host to %s"
                       CRM_XS " source=%s", state, source);
               return NULL);
 
     is_member = pcmk__str_eq(state, CRM_NODE_MEMBER, pcmk__str_casei);
     if (is_member) {
         node->when_lost = 0;
         if (membership) {
             node->last_seen = membership;
         }
     }
 
     if (state && !pcmk__str_eq(node->state, state, pcmk__str_casei)) {
         char *last = node->state;
 
         if (is_member) {
              node->when_member = time(NULL);
 
         } else {
              node->when_member = 0;
         }
 
         node->state = strdup(state);
         crm_notice("Node %s state is now %s " CRM_XS
                    " nodeid=%u previous=%s source=%s", node->uname, state,
                    node->id, (last? last : "unknown"), source);
         if (peer_status_callback != NULL) {
             peer_status_callback(crm_status_nstate, node, last);
         }
         free(last);
 
         if (crm_autoreap && !is_member
             && !pcmk_is_set(node->flags, crm_remote_node)) {
             /* We only autoreap from the peer cache, not the remote peer cache,
              * because the latter should be managed only by
              * crm_remote_peer_cache_refresh().
              */
             if(iter) {
                 crm_notice("Purged 1 peer with id=%u and/or uname=%s from the membership cache", node->id, node->uname);
                 g_hash_table_iter_remove(iter);
 
             } else {
                 reap_crm_member(node->id, node->uname);
             }
             node = NULL;
         }
 
     } else {
         crm_trace("Node %s state is unchanged (%s) " CRM_XS
                   " nodeid=%u source=%s", node->uname, state, node->id, source);
     }
     return node;
 }
 
 /*!
  * \brief Update a node's state and membership information
  *
  * \param[in]     source      Caller's function name (for log messages)
  * \param[in,out] node        Node object to update
  * \param[in]     state       Node's new state
  * \param[in]     membership  Node's new membership ID
  *
  * \return NULL if any node was reaped, value of node otherwise
  *
  * \note If this function returns NULL, the supplied node object was likely
  *       freed and should not be used again. This function should not be
  *       called within a cache iteration if reaping is possible,
  *       otherwise reaping could invalidate the iterator.
  */
 crm_node_t *
 pcmk__update_peer_state(const char *source, crm_node_t *node,
                         const char *state, uint64_t membership)
 {
     return update_peer_state_iter(source, node, state, membership, NULL);
 }
 
 /*!
  * \internal
  * \brief Reap all nodes from cache whose membership information does not match
  *
  * \param[in] membership  Membership ID of nodes to keep
  */
 void
 pcmk__reap_unseen_nodes(uint64_t membership)
 {
     GHashTableIter iter;
     crm_node_t *node = NULL;
 
     crm_trace("Reaping unseen nodes...");
     g_hash_table_iter_init(&iter, crm_peer_cache);
     while (g_hash_table_iter_next(&iter, NULL, (gpointer *)&node)) {
         if (node->last_seen != membership) {
             if (node->state) {
                 /*
                  * Calling update_peer_state_iter() allows us to
                  * remove the node from crm_peer_cache without
                  * invalidating our iterator
                  */
                 update_peer_state_iter(__func__, node, CRM_NODE_LOST,
                                            membership, &iter);
 
             } else {
                 crm_info("State of node %s[%u] is still unknown",
                          node->uname, node->id);
             }
         }
     }
 }
 
 static crm_node_t *
 find_known_node(const char *id, const char *uname)
 {
     GHashTableIter iter;
     crm_node_t *node = NULL;
     crm_node_t *by_id = NULL;
     crm_node_t *by_name = NULL;
 
     if (uname) {
         g_hash_table_iter_init(&iter, known_node_cache);
         while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
             if (node->uname && strcasecmp(node->uname, uname) == 0) {
                 crm_trace("Name match: %s = %p", node->uname, node);
                 by_name = node;
                 break;
             }
         }
     }
 
     if (id) {
         g_hash_table_iter_init(&iter, known_node_cache);
         while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
             if(strcasecmp(node->uuid, id) == 0) {
                 crm_trace("ID match: %s= %p", id, node);
                 by_id = node;
                 break;
             }
         }
     }
 
     node = by_id; /* Good default */
     if (by_id == by_name) {
         /* Nothing to do if they match (both NULL counts) */
         crm_trace("Consistent: %p for %s/%s", by_id, id, uname);
 
     } else if (by_id == NULL && by_name) {
         crm_trace("Only one: %p for %s/%s", by_name, id, uname);
 
         if (id) {
             node = NULL;
 
         } else {
             node = by_name;
         }
 
     } else if (by_name == NULL && by_id) {
         crm_trace("Only one: %p for %s/%s", by_id, id, uname);
 
         if (uname) {
             node = NULL;
         }
 
     } else if (uname && by_id->uname
                && pcmk__str_eq(uname, by_id->uname, pcmk__str_casei)) {
         /* Multiple nodes have the same uname in the CIB.
          * Return by_id. */
 
     } else if (id && by_name->uuid
                && pcmk__str_eq(id, by_name->uuid, pcmk__str_casei)) {
         /* Multiple nodes have the same id in the CIB.
          * Return by_name. */
         node = by_name;
 
     } else {
         node = NULL;
     }
 
     if (node == NULL) {
         crm_debug("Couldn't find node%s%s%s%s",
                    id? " " : "",
                    id? id : "",
                    uname? " with name " : "",
                    uname? uname : "");
     }
 
     return node;
 }
 
 static void
 known_node_cache_refresh_helper(xmlNode *xml_node, void *user_data)
 {
     const char *id = crm_element_value(xml_node, PCMK_XA_ID);
     const char *uname = crm_element_value(xml_node, PCMK_XA_UNAME);
     crm_node_t * node =  NULL;
 
     CRM_CHECK(id != NULL && uname !=NULL, return);
     node = find_known_node(id, uname);
 
     if (node == NULL) {
         char *uniqueid = crm_generate_uuid();
 
         node = calloc(1, sizeof(crm_node_t));
         CRM_ASSERT(node != NULL);
 
         node->uname = strdup(uname);
         CRM_ASSERT(node->uname != NULL);
 
         node->uuid = strdup(id);
         CRM_ASSERT(node->uuid != NULL);
 
         g_hash_table_replace(known_node_cache, uniqueid, node);
 
     } else if (pcmk_is_set(node->flags, crm_node_dirty)) {
         pcmk__str_update(&node->uname, uname);
 
         /* Node is in cache and hasn't been updated already, so mark it clean */
         clear_peer_flags(node, crm_node_dirty);
     }
 
 }
 
 static void
 refresh_known_node_cache(xmlNode *cib)
 {
     crm_peer_init();
 
     g_hash_table_foreach(known_node_cache, mark_dirty, NULL);
 
     crm_foreach_xpath_result(cib, PCMK__XP_MEMBER_NODE_CONFIG,
                              known_node_cache_refresh_helper, NULL);
 
     /* Remove all old cache entries that weren't seen in the CIB */
     g_hash_table_foreach_remove(known_node_cache, is_dirty, NULL);
 }
 
 void
 pcmk__refresh_node_caches_from_cib(xmlNode *cib)
 {
     crm_remote_peer_cache_refresh(cib);
     refresh_known_node_cache(cib);
 }
 
 // Deprecated functions kept only for backward API compatibility
 // LCOV_EXCL_START
 
 #include <crm/cluster/compat.h>
 
 int
 crm_terminate_member(int nodeid, const char *uname, void *unused)
 {
     return stonith_api_kick(nodeid, uname, 120, TRUE);
 }
 
 int
 crm_terminate_member_no_mainloop(int nodeid, const char *uname, int *connection)
 {
     return stonith_api_kick(nodeid, uname, 120, TRUE);
 }
 
 crm_node_t *
 crm_get_peer(unsigned int id, const char *uname)
 {
     return pcmk__get_node(id, uname, NULL, pcmk__node_search_cluster);
 }
 
 crm_node_t *
 crm_get_peer_full(unsigned int id, const char *uname, int flags)
 {
     return pcmk__get_node(id, uname, NULL, flags);
 }
 
 // LCOV_EXCL_STOP
 // End deprecated API