diff --git a/daemons/attrd/attrd_corosync.c b/daemons/attrd/attrd_corosync.c
index 374a94bf41..ae4b3034a0 100644
--- a/daemons/attrd/attrd_corosync.c
+++ b/daemons/attrd/attrd_corosync.c
@@ -1,608 +1,608 @@
 /*
  * Copyright 2013-2024 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU General Public License version 2
  * or later (GPLv2+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #include <errno.h>
 #include <stdbool.h>
 #include <stdint.h>
 #include <stdlib.h>
 
 #include <crm/cluster.h>
 #include <crm/cluster/internal.h>
 #include <crm/common/logging.h>
 #include <crm/common/results.h>
 #include <crm/common/strings_internal.h>
 #include <crm/common/xml.h>
 
 #include "pacemaker-attrd.h"
 
 static xmlNode *
 attrd_confirmation(int callid)
 {
     xmlNode *node = pcmk__xe_create(NULL, __func__);
 
     crm_xml_add(node, PCMK__XA_T, PCMK__VALUE_ATTRD);
     crm_xml_add(node, PCMK__XA_SRC, get_local_node_name());
     crm_xml_add(node, PCMK_XA_TASK, PCMK__ATTRD_CMD_CONFIRM);
     crm_xml_add_int(node, PCMK__XA_CALL_ID, callid);
 
     return node;
 }
 
 static void
 attrd_peer_message(crm_node_t *peer, xmlNode *xml)
 {
     const char *election_op = crm_element_value(xml, PCMK__XA_CRM_TASK);
 
     if (election_op) {
         attrd_handle_election_op(peer, xml);
         return;
     }
 
     if (attrd_shutting_down(false)) {
         /* If we're shutting down, we want to continue responding to election
          * ops as long as we're a cluster member (because our vote may be
          * needed). Ignore all other messages.
          */
         return;
 
     } else {
         pcmk__request_t request = {
             .ipc_client     = NULL,
             .ipc_id         = 0,
             .ipc_flags      = 0,
             .peer           = peer->uname,
             .xml            = xml,
             .call_options   = 0,
             .result         = PCMK__UNKNOWN_RESULT,
         };
 
         request.op = crm_element_value_copy(request.xml, PCMK_XA_TASK);
         CRM_CHECK(request.op != NULL, return);
 
         attrd_handle_request(&request);
 
         /* Having finished handling the request, check to see if the originating
          * peer requested confirmation.  If so, send that confirmation back now.
          */
         if (pcmk__xe_attr_is_true(xml, PCMK__XA_CONFIRM) &&
             !pcmk__str_eq(request.op, PCMK__ATTRD_CMD_CONFIRM, pcmk__str_none)) {
             int callid = 0;
             xmlNode *reply = NULL;
 
             /* Add the confirmation ID for the message we are confirming to the
              * response so the originating peer knows what they're a confirmation
              * for.
              */
             crm_element_value_int(xml, PCMK__XA_CALL_ID, &callid);
             reply = attrd_confirmation(callid);
 
             /* And then send the confirmation back to the originating peer.  This
              * ends up right back in this same function (attrd_peer_message) on the
              * peer where it will have to do something with a PCMK__XA_CONFIRM type
              * message.
              */
             crm_debug("Sending %s a confirmation", peer->uname);
             attrd_send_message(peer, reply, false);
             free_xml(reply);
         }
 
         pcmk__reset_request(&request);
     }
 }
 
 static void
 attrd_cpg_dispatch(cpg_handle_t handle,
                  const struct cpg_name *groupName,
                  uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
 {
     uint32_t kind = 0;
     xmlNode *xml = NULL;
     const char *from = NULL;
     char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
 
     if(data == NULL) {
         return;
     }
 
     if (kind == crm_class_cluster) {
         xml = pcmk__xml_parse(data);
     }
 
     if (xml == NULL) {
         crm_err("Bad message of class %d received from %s[%u]: '%.120s'", kind, from, nodeid, data);
     } else {
         attrd_peer_message(pcmk__get_node(nodeid, from, NULL,
                                           pcmk__node_search_cluster),
                            xml);
     }
 
     free_xml(xml);
     free(data);
 }
 
 static void
 attrd_cpg_destroy(gpointer unused)
 {
     if (attrd_shutting_down(false)) {
         crm_info("Disconnected from Corosync process group");
 
     } else {
         crm_crit("Lost connection to Corosync process group, shutting down");
         attrd_exit_status = CRM_EX_DISCONNECT;
         attrd_shutdown(0);
     }
 }
 
 /*!
  * \internal
  * \brief Broadcast an update for a single attribute value
  *
  * \param[in] a  Attribute to broadcast
  * \param[in] v  Attribute value to broadcast
  */
 void
 attrd_broadcast_value(const attribute_t *a, const attribute_value_t *v)
 {
     xmlNode *op = pcmk__xe_create(NULL, PCMK_XE_OP);
 
     crm_xml_add(op, PCMK_XA_TASK, PCMK__ATTRD_CMD_UPDATE);
     attrd_add_value_xml(op, a, v, false);
     attrd_send_message(NULL, op, false);
     free_xml(op);
 }
 
 #define state_text(state) pcmk__s((state), "in unknown state")
 
 static void
 attrd_peer_change_cb(enum crm_status_type kind, crm_node_t *peer, const void *data)
 {
     bool gone = false;
     bool is_remote = pcmk_is_set(peer->flags, crm_remote_node);
 
     switch (kind) {
         case crm_status_uname:
             crm_debug("%s node %s is now %s",
                       (is_remote? "Remote" : "Cluster"),
                       peer->uname, state_text(peer->state));
             break;
 
         case crm_status_processes:
             if (!pcmk_is_set(peer->processes, crm_get_cluster_proc())) {
                 gone = true;
             }
             crm_debug("Node %s is %s a peer",
                       peer->uname, (gone? "no longer" : "now"));
             break;
 
         case crm_status_nstate:
             crm_debug("%s node %s is now %s (was %s)",
                       (is_remote? "Remote" : "Cluster"),
                       peer->uname, state_text(peer->state), state_text(data));
             if (pcmk__str_eq(peer->state, CRM_NODE_MEMBER, pcmk__str_casei)) {
                 /* If we're the writer, send new peers a list of all attributes
                  * (unless it's a remote node, which doesn't run its own attrd)
                  */
                 if (attrd_election_won()
                     && !pcmk_is_set(peer->flags, crm_remote_node)) {
                     attrd_peer_sync(peer);
                 }
             } else {
                 // Remove all attribute values associated with lost nodes
                 attrd_peer_remove(peer->uname, false, "loss");
                 gone = true;
             }
             break;
     }
 
     // Remove votes from cluster nodes that leave, in case election in progress
     if (gone && !is_remote) {
         attrd_remove_voter(peer);
         attrd_remove_peer_protocol_ver(peer->uname);
         attrd_do_not_expect_from_peer(peer->uname);
     }
 }
 
 static void
 record_peer_nodeid(attribute_value_t *v, const char *host)
 {
     crm_node_t *known_peer = pcmk__get_node(v->nodeid, host, NULL,
                                             pcmk__node_search_cluster);
 
     crm_trace("Learned %s has node id %s", known_peer->uname, known_peer->uuid);
     if (attrd_election_won()) {
         attrd_write_attributes(attrd_write_changed);
     }
 }
 
 #define readable_value(rv_v) pcmk__s((rv_v)->current, "(unset)")
 
 #define readable_peer(p)    \
     (((p) == NULL)? "all peers" : pcmk__s((p)->uname, "unknown peer"))
 
 static void
 update_attr_on_host(attribute_t *a, const crm_node_t *peer, const xmlNode *xml,
                     const char *attr, const char *value, const char *host,
                     bool filter)
 {
     int is_remote = 0;
     bool changed = false;
     attribute_value_t *v = NULL;
 
     // Create entry for value if not already existing
     v = g_hash_table_lookup(a->values, host);
     if (v == NULL) {
         v = pcmk__assert_alloc(1, sizeof(attribute_value_t));
 
         v->nodename = pcmk__str_copy(host);
         g_hash_table_replace(a->values, v->nodename, v);
     }
 
     // If value is for a Pacemaker Remote node, remember that
     crm_element_value_int(xml, PCMK__XA_ATTR_IS_REMOTE, &is_remote);
     if (is_remote) {
         attrd_set_value_flags(v, attrd_value_remote);
-        CRM_ASSERT(crm_remote_peer_get(host) != NULL);
+        CRM_ASSERT(pcmk__cluster_lookup_remote_node(host) != NULL);
     }
 
     // Check whether the value changed
     changed = !pcmk__str_eq(v->current, value, pcmk__str_casei);
 
     if (changed && filter && pcmk__str_eq(host, attrd_cluster->uname,
                                           pcmk__str_casei)) {
         /* Broadcast the local value for an attribute that differs from the
          * value provided in a peer's attribute synchronization response. This
          * ensures a node's values for itself take precedence and all peers are
          * kept in sync.
          */
         v = g_hash_table_lookup(a->values, attrd_cluster->uname);
         crm_notice("%s[%s]: local value '%s' takes priority over '%s' from %s",
                    attr, host, readable_value(v), value, peer->uname);
         attrd_broadcast_value(a, v);
 
     } else if (changed) {
         crm_notice("Setting %s[%s]%s%s: %s -> %s "
                    CRM_XS " from %s with %s write delay",
                    attr, host, a->set_type ? " in " : "",
                    pcmk__s(a->set_type, ""), readable_value(v),
                    pcmk__s(value, "(unset)"), peer->uname,
                    (a->timeout_ms == 0)? "no" : pcmk__readable_interval(a->timeout_ms));
         pcmk__str_update(&v->current, value);
         attrd_set_attr_flags(a, attrd_attr_changed);
 
         if (pcmk__str_eq(host, attrd_cluster->uname, pcmk__str_casei)
             && pcmk__str_eq(attr, PCMK__NODE_ATTR_SHUTDOWN, pcmk__str_none)) {
 
             if (!pcmk__str_eq(value, "0", pcmk__str_null_matches)) {
                 attrd_set_requesting_shutdown();
 
             } else {
                 attrd_clear_requesting_shutdown();
             }
         }
 
         // Write out new value or start dampening timer
         if (a->timeout_ms && a->timer) {
             crm_trace("Delaying write of %s %s for dampening",
                       attr, pcmk__readable_interval(a->timeout_ms));
             mainloop_timer_start(a->timer);
         } else {
             attrd_write_or_elect_attribute(a);
         }
 
     } else {
         int is_force_write = 0;
 
         crm_element_value_int(xml, PCMK__XA_ATTRD_IS_FORCE_WRITE,
                               &is_force_write);
 
         if (is_force_write == 1 && a->timeout_ms && a->timer) {
             /* Save forced writing and set change flag. */
             /* The actual attribute is written by Writer after election. */
             crm_trace("%s[%s] from %s is unchanged (%s), forcing write",
                       attr, host, peer->uname, pcmk__s(value, "unset"));
             attrd_set_attr_flags(a, attrd_attr_force_write);
         } else {
             crm_trace("%s[%s] from %s is unchanged (%s)",
                       attr, host, peer->uname, pcmk__s(value, "unset"));
         }
     }
 
     // This allows us to later detect local values that peer doesn't know about
     attrd_set_value_flags(v, attrd_value_from_peer);
 
     /* If this is a cluster node whose node ID we are learning, remember it */
     if ((v->nodeid == 0) && !pcmk_is_set(v->flags, attrd_value_remote)
         && (crm_element_value_int(xml, PCMK__XA_ATTR_HOST_ID,
                                   (int*)&v->nodeid) == 0) && (v->nodeid > 0)) {
         record_peer_nodeid(v, host);
     }
 }
 
 static void
 attrd_peer_update_one(const crm_node_t *peer, xmlNode *xml, bool filter)
 {
     attribute_t *a = NULL;
     const char *attr = crm_element_value(xml, PCMK__XA_ATTR_NAME);
     const char *value = crm_element_value(xml, PCMK__XA_ATTR_VALUE);
     const char *host = crm_element_value(xml, PCMK__XA_ATTR_HOST);
 
     if (attr == NULL) {
         crm_warn("Could not update attribute: peer did not specify name");
         return;
     }
 
     a = attrd_populate_attribute(xml, attr);
     if (a == NULL) {
         return;
     }
 
     if (host == NULL) {
         // If no host was specified, update all hosts
         GHashTableIter vIter;
 
         crm_debug("Setting %s for all hosts to %s", attr, value);
         pcmk__xe_remove_attr(xml, PCMK__XA_ATTR_HOST_ID);
         g_hash_table_iter_init(&vIter, a->values);
 
         while (g_hash_table_iter_next(&vIter, (gpointer *) & host, NULL)) {
             update_attr_on_host(a, peer, xml, attr, value, host, filter);
         }
 
     } else {
         // Update attribute value for the given host
         update_attr_on_host(a, peer, xml, attr, value, host, filter);
     }
 
     /* If this is a message from some attrd instance broadcasting its protocol
      * version, check to see if it's a new minimum version.
      */
     if (pcmk__str_eq(attr, CRM_ATTR_PROTOCOL, pcmk__str_none)) {
         attrd_update_minimum_protocol_ver(peer->uname, value);
     }
 }
 
 static void
 broadcast_unseen_local_values(void)
 {
     GHashTableIter aIter;
     GHashTableIter vIter;
     attribute_t *a = NULL;
     attribute_value_t *v = NULL;
     xmlNode *sync = NULL;
 
     g_hash_table_iter_init(&aIter, attributes);
     while (g_hash_table_iter_next(&aIter, NULL, (gpointer *) & a)) {
 
         g_hash_table_iter_init(&vIter, a->values);
         while (g_hash_table_iter_next(&vIter, NULL, (gpointer *) & v)) {
 
             if (!pcmk_is_set(v->flags, attrd_value_from_peer)
                 && pcmk__str_eq(v->nodename, attrd_cluster->uname,
                                 pcmk__str_casei)) {
                 crm_trace("* %s[%s]='%s' is local-only",
                           a->id, v->nodename, readable_value(v));
                 if (sync == NULL) {
                     sync = pcmk__xe_create(NULL, __func__);
                     crm_xml_add(sync, PCMK_XA_TASK, PCMK__ATTRD_CMD_SYNC_RESPONSE);
                 }
                 attrd_add_value_xml(sync, a, v, a->timeout_ms && a->timer);
             }
         }
     }
 
     if (sync != NULL) {
         crm_debug("Broadcasting local-only values");
         attrd_send_message(NULL, sync, false);
         free_xml(sync);
     }
 }
 
 int
 attrd_cluster_connect(void)
 {
     int rc = pcmk_rc_ok;
 
     attrd_cluster = pcmk_cluster_new();
 
     attrd_cluster->destroy = attrd_cpg_destroy;
     attrd_cluster->cpg.cpg_deliver_fn = attrd_cpg_dispatch;
     attrd_cluster->cpg.cpg_confchg_fn = pcmk_cpg_membership;
 
     crm_set_status_callback(&attrd_peer_change_cb);
 
     rc = pcmk_cluster_connect(attrd_cluster);
     rc = pcmk_rc2legacy(rc);
     if (rc != pcmk_ok) {
         crm_err("Cluster connection failed");
         return rc;
     }
     return pcmk_ok;
 }
 
 void
 attrd_peer_clear_failure(pcmk__request_t *request)
 {
     xmlNode *xml = request->xml;
     const char *rsc = crm_element_value(xml, PCMK__XA_ATTR_RESOURCE);
     const char *host = crm_element_value(xml, PCMK__XA_ATTR_HOST);
     const char *op = crm_element_value(xml, PCMK__XA_ATTR_CLEAR_OPERATION);
     const char *interval_spec = crm_element_value(xml,
                                                   PCMK__XA_ATTR_CLEAR_INTERVAL);
     guint interval_ms = 0U;
     char *attr = NULL;
     GHashTableIter iter;
     regex_t regex;
 
     crm_node_t *peer = pcmk__get_node(0, request->peer, NULL,
                                       pcmk__node_search_cluster);
 
     pcmk_parse_interval_spec(interval_spec, &interval_ms);
 
     if (attrd_failure_regex(&regex, rsc, op, interval_ms) != pcmk_ok) {
         crm_info("Ignoring invalid request to clear failures for %s",
                  pcmk__s(rsc, "all resources"));
         return;
     }
 
     crm_xml_add(xml, PCMK_XA_TASK, PCMK__ATTRD_CMD_UPDATE);
 
     /* Make sure value is not set, so we delete */
     pcmk__xe_remove_attr(xml, PCMK__XA_ATTR_VALUE);
 
     g_hash_table_iter_init(&iter, attributes);
     while (g_hash_table_iter_next(&iter, (gpointer *) &attr, NULL)) {
         if (regexec(&regex, attr, 0, NULL, 0) == 0) {
             crm_trace("Matched %s when clearing %s",
                       attr, pcmk__s(rsc, "all resources"));
             crm_xml_add(xml, PCMK__XA_ATTR_NAME, attr);
             attrd_peer_update(peer, xml, host, false);
         }
     }
     regfree(&regex);
 }
 
 /*!
  * \internal
  * \brief Load attributes from a peer sync response
  *
  * \param[in]     peer      Peer that sent sync response
  * \param[in]     peer_won  Whether peer is the attribute writer
  * \param[in,out] xml       Request XML
  */
 void
 attrd_peer_sync_response(const crm_node_t *peer, bool peer_won, xmlNode *xml)
 {
     crm_info("Processing " PCMK__ATTRD_CMD_SYNC_RESPONSE " from %s",
              peer->uname);
 
     if (peer_won) {
         /* Initialize the "seen" flag for all attributes to cleared, so we can
          * detect attributes that local node has but the writer doesn't.
          */
         attrd_clear_value_seen();
     }
 
     // Process each attribute update in the sync response
     for (xmlNode *child = pcmk__xe_first_child(xml, NULL, NULL, NULL);
          child != NULL; child = pcmk__xe_next(child)) {
 
         attrd_peer_update(peer, child,
                           crm_element_value(child, PCMK__XA_ATTR_HOST), true);
     }
 
     if (peer_won) {
         /* If any attributes are still not marked as seen, the writer doesn't
          * know about them, so send all peers an update with them.
          */
         broadcast_unseen_local_values();
     }
 }
 
 /*!
  * \internal
  * \brief Remove all attributes and optionally peer cache entries for a node
  *
  * \param[in] host     Name of node to purge
  * \param[in] uncache  If true, remove node from peer caches
  * \param[in] source   Who requested removal (only used for logging)
  */
 void
 attrd_peer_remove(const char *host, bool uncache, const char *source)
 {
     attribute_t *a = NULL;
     GHashTableIter aIter;
 
     CRM_CHECK(host != NULL, return);
     crm_notice("Removing all %s attributes for node %s "
                CRM_XS " %s reaping node from cache",
                host, source, (uncache? "and" : "without"));
 
     g_hash_table_iter_init(&aIter, attributes);
     while (g_hash_table_iter_next(&aIter, NULL, (gpointer *) & a)) {
         if(g_hash_table_remove(a->values, host)) {
             crm_debug("Removed %s[%s] for peer %s", a->id, host, source);
         }
     }
 
     if (uncache) {
         pcmk__purge_node_from_cache(host, 0);
     }
 }
 
 /*!
  * \internal
  * \brief Send all known attributes and values to a peer
  *
  * \param[in] peer  Peer to send sync to (if NULL, broadcast to all peers)
  */
 void
 attrd_peer_sync(crm_node_t *peer)
 {
     GHashTableIter aIter;
     GHashTableIter vIter;
 
     attribute_t *a = NULL;
     attribute_value_t *v = NULL;
     xmlNode *sync = pcmk__xe_create(NULL, __func__);
 
     crm_xml_add(sync, PCMK_XA_TASK, PCMK__ATTRD_CMD_SYNC_RESPONSE);
 
     g_hash_table_iter_init(&aIter, attributes);
     while (g_hash_table_iter_next(&aIter, NULL, (gpointer *) & a)) {
         g_hash_table_iter_init(&vIter, a->values);
         while (g_hash_table_iter_next(&vIter, NULL, (gpointer *) & v)) {
             crm_debug("Syncing %s[%s]='%s' to %s",
                       a->id, v->nodename, readable_value(v),
                       readable_peer(peer));
             attrd_add_value_xml(sync, a, v, false);
         }
     }
 
     crm_debug("Syncing values to %s", readable_peer(peer));
     attrd_send_message(peer, sync, false);
     free_xml(sync);
 }
 
 void
 attrd_peer_update(const crm_node_t *peer, xmlNode *xml, const char *host,
                   bool filter)
 {
     bool handle_sync_point = false;
 
     CRM_CHECK((peer != NULL) && (xml != NULL), return);
     if (xml->children != NULL) {
         for (xmlNode *child = pcmk__xe_first_child(xml, PCMK_XE_OP, NULL, NULL);
              child != NULL; child = pcmk__xe_next_same(child)) {
 
             attrd_copy_xml_attributes(xml, child);
             attrd_peer_update_one(peer, child, filter);
 
             if (attrd_request_has_sync_point(child)) {
                 handle_sync_point = true;
             }
         }
 
     } else {
         attrd_peer_update_one(peer, xml, filter);
 
         if (attrd_request_has_sync_point(xml)) {
             handle_sync_point = true;
         }
     }
 
     /* If the update XML specified that the client wanted to wait for a sync
      * point, process that now.
      */
     if (handle_sync_point) {
         crm_trace("Hit local sync point for attribute update");
         attrd_ack_waitlist_clients(attrd_sync_point_local, xml);
     }
 }
diff --git a/daemons/controld/controld_join_client.c b/daemons/controld/controld_join_client.c
index 6e35fbae82..47a920704d 100644
--- a/daemons/controld/controld_join_client.c
+++ b/daemons/controld/controld_join_client.c
@@ -1,366 +1,366 @@
 /*
  * Copyright 2004-2024 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU General Public License version 2
  * or later (GPLv2+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #include <crm/crm.h>
 #include <crm/cib.h>
 #include <crm/common/xml.h>
 
 #include <pacemaker-controld.h>
 
 void join_query_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void *user_data);
 
 extern ha_msg_input_t *copy_ha_msg_input(ha_msg_input_t * orig);
 
 /*!
  * \internal
  * \brief Remember if DC is shutting down as we join
  *
  * If we're joining while the current DC is shutting down, update its expected
  * state, so we don't fence it if we become the new DC. (We weren't a peer
  * when it broadcast its shutdown request.)
  *
  * \param[in] msg  A join message from the DC
  */
 static void
 update_dc_expected(const xmlNode *msg)
 {
     if ((controld_globals.dc_name != NULL)
         && pcmk__xe_attr_is_true(msg, PCMK__XA_DC_LEAVING)) {
         crm_node_t *dc_node = pcmk__get_node(0, controld_globals.dc_name, NULL,
                                              pcmk__node_search_cluster);
 
         pcmk__update_peer_expected(__func__, dc_node, CRMD_JOINSTATE_DOWN);
     }
 }
 
 /*	A_CL_JOIN_QUERY		*/
 /* is there a DC out there? */
 void
 do_cl_join_query(long long action,
                  enum crmd_fsa_cause cause,
                  enum crmd_fsa_state cur_state,
                  enum crmd_fsa_input current_input, fsa_data_t * msg_data)
 {
     xmlNode *req = create_request(CRM_OP_JOIN_ANNOUNCE, NULL, NULL,
                                   CRM_SYSTEM_DC, CRM_SYSTEM_CRMD, NULL);
 
     sleep(1);                   // Give the cluster layer time to propagate to the DC
     update_dc(NULL);            /* Unset any existing value so that the result is not discarded */
     crm_debug("Querying for a DC");
     send_cluster_message(NULL, crm_msg_crmd, req, FALSE);
     free_xml(req);
 }
 
 /*	 A_CL_JOIN_ANNOUNCE	*/
 
 /* this is kind of a workaround for the fact that we may not be around or
  * are otherwise unable to reply when the DC sends out A_DC_JOIN_OFFER_ALL
  */
 void
 do_cl_join_announce(long long action,
                     enum crmd_fsa_cause cause,
                     enum crmd_fsa_state cur_state,
                     enum crmd_fsa_input current_input, fsa_data_t * msg_data)
 {
     /* don't announce if we're in one of these states */
     if (cur_state != S_PENDING) {
         crm_warn("Not announcing cluster join because in state %s",
                  fsa_state2string(cur_state));
         return;
     }
 
     if (!pcmk_is_set(controld_globals.fsa_input_register, R_STARTING)) {
         /* send as a broadcast */
         xmlNode *req = create_request(CRM_OP_JOIN_ANNOUNCE, NULL, NULL,
                                       CRM_SYSTEM_DC, CRM_SYSTEM_CRMD, NULL);
 
         crm_debug("Announcing availability");
         update_dc(NULL);
         send_cluster_message(NULL, crm_msg_crmd, req, FALSE);
         free_xml(req);
 
     } else {
         /* Delay announce until we have finished local startup */
         crm_warn("Delaying announce of cluster join until local startup is complete");
         return;
     }
 }
 
 static int query_call_id = 0;
 
 /*	 A_CL_JOIN_REQUEST	*/
 /* aka. accept the welcome offer */
 void
 do_cl_join_offer_respond(long long action,
                          enum crmd_fsa_cause cause,
                          enum crmd_fsa_state cur_state,
                          enum crmd_fsa_input current_input, fsa_data_t * msg_data)
 {
     cib_t *cib_conn = controld_globals.cib_conn;
 
     ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg);
     const char *welcome_from;
     const char *join_id;
 
     CRM_CHECK(input != NULL, return);
 
     welcome_from = crm_element_value(input->msg, PCMK__XA_SRC);
     join_id = crm_element_value(input->msg, PCMK__XA_JOIN_ID);
     crm_trace("Accepting cluster join offer from node %s "CRM_XS" join-%s",
               welcome_from, crm_element_value(input->msg, PCMK__XA_JOIN_ID));
 
     /* we only ever want the last one */
     if (query_call_id > 0) {
         crm_trace("Cancelling previous join query: %d", query_call_id);
         remove_cib_op_callback(query_call_id, FALSE);
         query_call_id = 0;
     }
 
     if (update_dc(input->msg) == FALSE) {
         crm_warn("Discarding cluster join offer from node %s (expected %s)",
                  welcome_from, controld_globals.dc_name);
         return;
     }
 
     update_dc_expected(input->msg);
 
     query_call_id = cib_conn->cmds->query(cib_conn, NULL, NULL,
                                           cib_scope_local|cib_no_children);
     fsa_register_cib_callback(query_call_id, pcmk__str_copy(join_id),
                               join_query_callback);
     crm_trace("Registered join query callback: %d", query_call_id);
 
     controld_set_fsa_action_flags(A_DC_TIMER_STOP);
     controld_trigger_fsa();
 }
 
 void
 join_query_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void *user_data)
 {
     char *join_id = user_data;
     xmlNode *generation = pcmk__xe_create(NULL, PCMK__XE_GENERATION_TUPLE);
 
     CRM_LOG_ASSERT(join_id != NULL);
 
     if (query_call_id != call_id) {
         crm_trace("Query %d superseded", call_id);
         goto done;
     }
 
     query_call_id = 0;
     if(rc != pcmk_ok || output == NULL) {
         crm_err("Could not retrieve version details for join-%s: %s (%d)",
                 join_id, pcmk_strerror(rc), rc);
         register_fsa_error_adv(C_FSA_INTERNAL, I_ERROR, NULL, NULL, __func__);
 
     } else if (controld_globals.dc_name == NULL) {
         crm_debug("Membership is in flux, not continuing join-%s", join_id);
 
     } else {
         xmlNode *reply = NULL;
 
         crm_debug("Respond to join offer join-%s from %s",
                   join_id, controld_globals.dc_name);
         copy_in_properties(generation, output);
 
         reply = create_request(CRM_OP_JOIN_REQUEST, generation,
                                controld_globals.dc_name, CRM_SYSTEM_DC,
                                CRM_SYSTEM_CRMD, NULL);
 
         crm_xml_add(reply, PCMK__XA_JOIN_ID, join_id);
         crm_xml_add(reply, PCMK_XA_CRM_FEATURE_SET, CRM_FEATURE_SET);
         send_cluster_message(pcmk__get_node(0, controld_globals.dc_name, NULL,
                                             pcmk__node_search_cluster),
                              crm_msg_crmd, reply, TRUE);
         free_xml(reply);
     }
 
   done:
     free_xml(generation);
 }
 
 void
 set_join_state(const char *start_state, const char *node_name, const char *node_uuid,
                bool remote)
 {
     if (pcmk__str_eq(start_state, PCMK_VALUE_STANDBY, pcmk__str_casei)) {
         crm_notice("Forcing node %s to join in %s state per configured "
                    "environment", node_name, start_state);
         cib__update_node_attr(controld_globals.logger_out,
                               controld_globals.cib_conn, cib_sync_call,
                               PCMK_XE_NODES, node_uuid,
                               NULL, NULL, NULL, PCMK_NODE_ATTR_STANDBY,
                               PCMK_VALUE_TRUE, NULL,
                               (remote? PCMK_VALUE_REMOTE : NULL));
 
     } else if (pcmk__str_eq(start_state, PCMK_VALUE_ONLINE, pcmk__str_casei)) {
         crm_notice("Forcing node %s to join in %s state per configured "
                    "environment", node_name, start_state);
         cib__update_node_attr(controld_globals.logger_out,
                               controld_globals.cib_conn, cib_sync_call,
                               PCMK_XE_NODES, node_uuid,
                               NULL, NULL, NULL, PCMK_NODE_ATTR_STANDBY,
                               PCMK_VALUE_FALSE, NULL,
                               (remote? PCMK_VALUE_REMOTE : NULL));
 
     } else if (pcmk__str_eq(start_state, PCMK_VALUE_DEFAULT, pcmk__str_casei)) {
         crm_debug("Not forcing a starting state on node %s", node_name);
 
     } else {
         crm_warn("Unrecognized start state '%s', using "
                  "'" PCMK_VALUE_DEFAULT "' (%s)",
                  start_state, node_name);
     }
 }
 
 static int
 update_conn_host_cache(xmlNode *node, void *userdata)
 {
     const char *remote = crm_element_value(node, PCMK_XA_ID);
     const char *conn_host = crm_element_value(node, PCMK__XA_CONNECTION_HOST);
     const char *state = crm_element_value(node, PCMK__XA_NODE_STATE);
 
-    crm_node_t *remote_peer = crm_remote_peer_get(remote);
+    crm_node_t *remote_peer = pcmk__cluster_lookup_remote_node(remote);
 
     if (remote_peer == NULL) {
         return pcmk_rc_ok;
     }
 
     if (conn_host != NULL) {
         pcmk__str_update(&remote_peer->conn_host, conn_host);
     }
 
     if (state != NULL) {
         pcmk__update_peer_state(__func__, remote_peer, state, 0);
     }
 
     return pcmk_rc_ok;
 }
 
 /*	A_CL_JOIN_RESULT	*/
 /* aka. this is notification that we have (or have not) been accepted */
 void
 do_cl_join_finalize_respond(long long action,
                             enum crmd_fsa_cause cause,
                             enum crmd_fsa_state cur_state,
                             enum crmd_fsa_input current_input, fsa_data_t * msg_data)
 {
     xmlNode *tmp1 = NULL;
     gboolean was_nack = TRUE;
     static gboolean first_join = TRUE;
     ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg);
     const char *start_state = pcmk__env_option(PCMK__ENV_NODE_START_STATE);
 
     int join_id = -1;
     const char *op = crm_element_value(input->msg, PCMK__XA_CRM_TASK);
     const char *welcome_from = crm_element_value(input->msg, PCMK__XA_SRC);
 
     if (!pcmk__str_eq(op, CRM_OP_JOIN_ACKNAK, pcmk__str_casei)) {
         crm_trace("Ignoring op=%s message", op);
         return;
     }
 
     /* calculate if it was an ack or a nack */
     if (pcmk__xe_attr_is_true(input->msg, CRM_OP_JOIN_ACKNAK)) {
         was_nack = FALSE;
     }
 
     crm_element_value_int(input->msg, PCMK__XA_JOIN_ID, &join_id);
 
     if (was_nack) {
         crm_err("Shutting down because cluster join with leader %s failed "
                 CRM_XS" join-%d NACK'd", welcome_from, join_id);
         register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL);
         controld_set_fsa_input_flags(R_STAYDOWN);
         return;
     }
 
     if (!AM_I_DC
         && pcmk__str_eq(welcome_from, controld_globals.our_nodename,
                         pcmk__str_casei)) {
         crm_warn("Discarding our own welcome - we're no longer the DC");
         return;
     }
 
     if (update_dc(input->msg) == FALSE) {
         crm_warn("Discarding %s from node %s (expected from %s)",
                  op, welcome_from, controld_globals.dc_name);
         return;
     }
 
     update_dc_expected(input->msg);
 
     /* record the node's feature set as a transient attribute */
     update_attrd(controld_globals.our_nodename, CRM_ATTR_FEATURE_SET,
                  CRM_FEATURE_SET, NULL, FALSE);
 
     /* send our status section to the DC */
     tmp1 = controld_query_executor_state();
     if (tmp1 != NULL) {
         xmlNode *remotes = NULL;
         xmlNode *reply = create_request(CRM_OP_JOIN_CONFIRM, tmp1,
                                         controld_globals.dc_name, CRM_SYSTEM_DC,
                                         CRM_SYSTEM_CRMD, NULL);
 
         crm_xml_add_int(reply, PCMK__XA_JOIN_ID, join_id);
 
         crm_debug("Confirming join-%d: sending local operation history to %s",
                   join_id, controld_globals.dc_name);
 
         /*
          * If this is the node's first join since the controller started on it,
          * set its initial state (standby or member) according to the user's
          * preference.
          *
          * We do not clear the LRM history here. Even if the DC failed to do it
          * when we last left, removing them here creates a race condition if the
          * controller is being recovered. Instead of a list of active resources
          * from the executor, we may end up with a blank status section. If we
          * are _NOT_ lucky, we will probe for the "wrong" instance of anonymous
          * clones and end up with multiple active instances on the machine.
          */
         if (first_join
             && !pcmk_is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) {
 
             first_join = FALSE;
             if (start_state) {
                 set_join_state(start_state, controld_globals.our_nodename,
                                controld_globals.our_uuid, false);
             }
         }
 
         send_cluster_message(pcmk__get_node(0, controld_globals.dc_name, NULL,
                                             pcmk__node_search_cluster),
                              crm_msg_crmd, reply, TRUE);
         free_xml(reply);
 
         if (AM_I_DC == FALSE) {
             register_fsa_input_adv(cause, I_NOT_DC, NULL, A_NOTHING, TRUE,
                                    __func__);
         }
 
         free_xml(tmp1);
 
         /* Update the remote node cache with information about which node
          * is hosting the connection.
          */
         remotes = pcmk__xe_first_child(input->msg, PCMK_XE_NODES, NULL, NULL);
         if (remotes != NULL) {
             pcmk__xe_foreach_child(remotes, PCMK_XE_NODE,
                                    update_conn_host_cache, NULL);
         }
 
     } else {
         crm_err("Could not confirm join-%d with %s: Local operation history "
                 "failed", join_id, controld_globals.dc_name);
         register_fsa_error(C_FSA_INTERNAL, I_FAIL, NULL);
     }
 }
diff --git a/daemons/controld/controld_messages.c b/daemons/controld/controld_messages.c
index 1d71109fae..875d1e9617 100644
--- a/daemons/controld/controld_messages.c
+++ b/daemons/controld/controld_messages.c
@@ -1,1358 +1,1358 @@
 /*
  * Copyright 2004-2024 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU General Public License version 2
  * or later (GPLv2+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #include <sys/param.h>
 #include <string.h>
 #include <time.h>
 
 #include <crm/crm.h>
 #include <crm/common/xml.h>
 #include <crm/cluster/internal.h>
 #include <crm/cib.h>
 #include <crm/common/ipc_internal.h>
 
 #include <pacemaker-controld.h>
 
 static enum crmd_fsa_input handle_message(xmlNode *msg,
                                           enum crmd_fsa_cause cause);
 static void handle_response(xmlNode *stored_msg);
 static enum crmd_fsa_input handle_request(xmlNode *stored_msg,
                                           enum crmd_fsa_cause cause);
 static enum crmd_fsa_input handle_shutdown_request(xmlNode *stored_msg);
 static void send_msg_via_ipc(xmlNode * msg, const char *sys);
 
 /* debug only, can wrap all it likes */
 static int last_data_id = 0;
 
 void
 register_fsa_error_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input,
                        fsa_data_t * cur_data, void *new_data, const char *raised_from)
 {
     /* save the current actions if any */
     if (controld_globals.fsa_actions != A_NOTHING) {
         register_fsa_input_adv(cur_data ? cur_data->fsa_cause : C_FSA_INTERNAL,
                                I_NULL, cur_data ? cur_data->data : NULL,
                                controld_globals.fsa_actions, TRUE, __func__);
     }
 
     /* reset the action list */
     crm_info("Resetting the current action list");
     fsa_dump_actions(controld_globals.fsa_actions, "Drop");
     controld_globals.fsa_actions = A_NOTHING;
 
     /* register the error */
     register_fsa_input_adv(cause, input, new_data, A_NOTHING, TRUE, raised_from);
 }
 
 void
 register_fsa_input_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input,
                        void *data, uint64_t with_actions,
                        gboolean prepend, const char *raised_from)
 {
     unsigned old_len = g_list_length(controld_globals.fsa_message_queue);
     fsa_data_t *fsa_data = NULL;
 
     if (raised_from == NULL) {
         raised_from = "<unknown>";
     }
 
     if (input == I_NULL && with_actions == A_NOTHING /* && data == NULL */ ) {
         /* no point doing anything */
         crm_err("Cannot add entry to queue: no input and no action");
         return;
     }
 
     if (input == I_WAIT_FOR_EVENT) {
         controld_set_global_flags(controld_fsa_is_stalled);
         crm_debug("Stalling the FSA pending further input: source=%s cause=%s data=%p queue=%d",
                   raised_from, fsa_cause2string(cause), data, old_len);
 
         if (old_len > 0) {
             fsa_dump_queue(LOG_TRACE);
             prepend = FALSE;
         }
 
         if (data == NULL) {
             controld_set_fsa_action_flags(with_actions);
             fsa_dump_actions(with_actions, "Restored");
             return;
         }
 
         /* Store everything in the new event and reset
          * controld_globals.fsa_actions
          */
         with_actions |= controld_globals.fsa_actions;
         controld_globals.fsa_actions = A_NOTHING;
     }
 
     last_data_id++;
     crm_trace("%s %s FSA input %d (%s) due to %s, %s data",
               raised_from, (prepend? "prepended" : "appended"), last_data_id,
               fsa_input2string(input), fsa_cause2string(cause),
               (data? "with" : "without"));
 
     fsa_data = pcmk__assert_alloc(1, sizeof(fsa_data_t));
     fsa_data->id = last_data_id;
     fsa_data->fsa_input = input;
     fsa_data->fsa_cause = cause;
     fsa_data->origin = raised_from;
     fsa_data->data = NULL;
     fsa_data->data_type = fsa_dt_none;
     fsa_data->actions = with_actions;
 
     if (with_actions != A_NOTHING) {
         crm_trace("Adding actions %.16llx to input",
                   (unsigned long long) with_actions);
     }
 
     if (data != NULL) {
         switch (cause) {
             case C_FSA_INTERNAL:
             case C_CRMD_STATUS_CALLBACK:
             case C_IPC_MESSAGE:
             case C_HA_MESSAGE:
                 CRM_CHECK(((ha_msg_input_t *) data)->msg != NULL,
                           crm_err("Bogus data from %s", raised_from));
                 crm_trace("Copying %s data from %s as cluster message data",
                           fsa_cause2string(cause), raised_from);
                 fsa_data->data = copy_ha_msg_input(data);
                 fsa_data->data_type = fsa_dt_ha_msg;
                 break;
 
             case C_LRM_OP_CALLBACK:
                 crm_trace("Copying %s data from %s as lrmd_event_data_t",
                           fsa_cause2string(cause), raised_from);
                 fsa_data->data = lrmd_copy_event((lrmd_event_data_t *) data);
                 fsa_data->data_type = fsa_dt_lrm;
                 break;
 
             case C_TIMER_POPPED:
             case C_SHUTDOWN:
             case C_UNKNOWN:
             case C_STARTUP:
                 crm_crit("Copying %s data (from %s) is not yet implemented",
                          fsa_cause2string(cause), raised_from);
                 crmd_exit(CRM_EX_SOFTWARE);
                 break;
         }
     }
 
     /* make sure to free it properly later */
     if (prepend) {
         controld_globals.fsa_message_queue
             = g_list_prepend(controld_globals.fsa_message_queue, fsa_data);
     } else {
         controld_globals.fsa_message_queue
             = g_list_append(controld_globals.fsa_message_queue, fsa_data);
     }
 
     crm_trace("FSA message queue length is %d",
               g_list_length(controld_globals.fsa_message_queue));
 
     /* fsa_dump_queue(LOG_TRACE); */
 
     if (old_len == g_list_length(controld_globals.fsa_message_queue)) {
         crm_err("Couldn't add message to the queue");
     }
 
     if (input != I_WAIT_FOR_EVENT) {
         controld_trigger_fsa();
     }
 }
 
 void
 fsa_dump_queue(int log_level)
 {
     int offset = 0;
 
     for (GList *iter = controld_globals.fsa_message_queue; iter != NULL;
          iter = iter->next) {
         fsa_data_t *data = (fsa_data_t *) iter->data;
 
         do_crm_log_unlikely(log_level,
                             "queue[%d.%d]: input %s raised by %s(%p.%d)\t(cause=%s)",
                             offset++, data->id, fsa_input2string(data->fsa_input),
                             data->origin, data->data, data->data_type,
                             fsa_cause2string(data->fsa_cause));
     }
 }
 
 ha_msg_input_t *
 copy_ha_msg_input(ha_msg_input_t * orig)
 {
     xmlNode *wrapper = NULL;
 
     ha_msg_input_t *copy = pcmk__assert_alloc(1, sizeof(ha_msg_input_t));
 
     copy->msg = (orig != NULL)? pcmk__xml_copy(NULL, orig->msg) : NULL;
 
     wrapper = pcmk__xe_first_child(copy->msg, PCMK__XE_CRM_XML, NULL, NULL);
     copy->xml = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
     return copy;
 }
 
 void
 delete_fsa_input(fsa_data_t * fsa_data)
 {
     lrmd_event_data_t *op = NULL;
     xmlNode *foo = NULL;
 
     if (fsa_data == NULL) {
         return;
     }
     crm_trace("About to free %s data", fsa_cause2string(fsa_data->fsa_cause));
 
     if (fsa_data->data != NULL) {
         switch (fsa_data->data_type) {
             case fsa_dt_ha_msg:
                 delete_ha_msg_input(fsa_data->data);
                 break;
 
             case fsa_dt_xml:
                 foo = fsa_data->data;
                 free_xml(foo);
                 break;
 
             case fsa_dt_lrm:
                 op = (lrmd_event_data_t *) fsa_data->data;
                 lrmd_free_event(op);
                 break;
 
             case fsa_dt_none:
                 if (fsa_data->data != NULL) {
                     crm_err("Don't know how to free %s data from %s",
                             fsa_cause2string(fsa_data->fsa_cause), fsa_data->origin);
                     crmd_exit(CRM_EX_SOFTWARE);
                 }
                 break;
         }
         crm_trace("%s data freed", fsa_cause2string(fsa_data->fsa_cause));
     }
 
     free(fsa_data);
 }
 
 /* returns the next message */
 fsa_data_t *
 get_message(void)
 {
     fsa_data_t *message
         = (fsa_data_t *) controld_globals.fsa_message_queue->data;
 
     controld_globals.fsa_message_queue
         = g_list_remove(controld_globals.fsa_message_queue, message);
     crm_trace("Processing input %d", message->id);
     return message;
 }
 
 void *
 fsa_typed_data_adv(fsa_data_t * fsa_data, enum fsa_data_type a_type, const char *caller)
 {
     void *ret_val = NULL;
 
     if (fsa_data == NULL) {
         crm_err("%s: No FSA data available", caller);
 
     } else if (fsa_data->data == NULL) {
         crm_err("%s: No message data available. Origin: %s", caller, fsa_data->origin);
 
     } else if (fsa_data->data_type != a_type) {
         crm_crit("%s: Message data was the wrong type! %d vs. requested=%d.  Origin: %s",
                  caller, fsa_data->data_type, a_type, fsa_data->origin);
         CRM_ASSERT(fsa_data->data_type == a_type);
     } else {
         ret_val = fsa_data->data;
     }
 
     return ret_val;
 }
 
 /*	A_MSG_ROUTE	*/
 void
 do_msg_route(long long action,
              enum crmd_fsa_cause cause,
              enum crmd_fsa_state cur_state,
              enum crmd_fsa_input current_input, fsa_data_t * msg_data)
 {
     ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg);
 
     route_message(msg_data->fsa_cause, input->msg);
 }
 
 void
 route_message(enum crmd_fsa_cause cause, xmlNode * input)
 {
     ha_msg_input_t fsa_input;
     enum crmd_fsa_input result = I_NULL;
 
     fsa_input.msg = input;
     CRM_CHECK(cause == C_IPC_MESSAGE || cause == C_HA_MESSAGE, return);
 
     /* try passing the buck first */
     if (relay_message(input, cause == C_IPC_MESSAGE)) {
         return;
     }
 
     /* handle locally */
     result = handle_message(input, cause);
 
     /* done or process later? */
     switch (result) {
         case I_NULL:
         case I_CIB_OP:
         case I_ROUTER:
         case I_NODE_JOIN:
         case I_JOIN_REQUEST:
         case I_JOIN_RESULT:
             break;
         default:
             /* Defering local processing of message */
             register_fsa_input_later(cause, result, &fsa_input);
             return;
     }
 
     if (result != I_NULL) {
         /* add to the front of the queue */
         register_fsa_input(cause, result, &fsa_input);
     }
 }
 
 gboolean
 relay_message(xmlNode * msg, gboolean originated_locally)
 {
     enum crm_ais_msg_types dest = crm_msg_ais;
     bool is_for_dc = false;
     bool is_for_dcib = false;
     bool is_for_te = false;
     bool is_for_crm = false;
     bool is_for_cib = false;
     bool is_local = false;
     bool broadcast = false;
     const char *host_to = NULL;
     const char *sys_to = NULL;
     const char *sys_from = NULL;
     const char *type = NULL;
     const char *task = NULL;
     const char *ref = NULL;
     crm_node_t *node_to = NULL;
 
     CRM_CHECK(msg != NULL, return TRUE);
 
     host_to = crm_element_value(msg, PCMK__XA_CRM_HOST_TO);
     sys_to = crm_element_value(msg, PCMK__XA_CRM_SYS_TO);
     sys_from = crm_element_value(msg, PCMK__XA_CRM_SYS_FROM);
     type = crm_element_value(msg, PCMK__XA_T);
     task = crm_element_value(msg, PCMK__XA_CRM_TASK);
     ref = crm_element_value(msg, PCMK_XA_REFERENCE);
 
     broadcast = pcmk__str_empty(host_to);
 
     if (ref == NULL) {
         ref = "without reference ID";
     }
 
     if (pcmk__str_eq(task, CRM_OP_HELLO, pcmk__str_casei)) {
         crm_trace("Received hello %s from %s (no processing needed)",
                   ref, pcmk__s(sys_from, "unidentified source"));
         crm_log_xml_trace(msg, "hello");
         return TRUE;
     }
 
     // Require message type (set by create_request())
     if (!pcmk__str_eq(type, PCMK__VALUE_CRMD, pcmk__str_none)) {
         crm_warn("Ignoring invalid message %s with type '%s' "
                  "(not '" PCMK__VALUE_CRMD "')",
                  ref, pcmk__s(type, ""));
         crm_log_xml_trace(msg, "ignored");
         return TRUE;
     }
 
     // Require a destination subsystem (also set by create_request())
     if (sys_to == NULL) {
         crm_warn("Ignoring invalid message %s with no " PCMK__XA_CRM_SYS_TO,
                  ref);
         crm_log_xml_trace(msg, "ignored");
         return TRUE;
     }
 
     // Get the message type appropriate to the destination subsystem
     if (is_corosync_cluster()) {
         dest = text2msg_type(sys_to);
         if ((dest < crm_msg_ais) || (dest > crm_msg_stonith_ng)) {
             /* Unrecognized value, use a sane default
              *
              * @TODO Maybe we should bail instead
              */
             dest = crm_msg_crmd;
         }
     }
 
     is_for_dc = (strcasecmp(CRM_SYSTEM_DC, sys_to) == 0);
     is_for_dcib = (strcasecmp(CRM_SYSTEM_DCIB, sys_to) == 0);
     is_for_te = (strcasecmp(CRM_SYSTEM_TENGINE, sys_to) == 0);
     is_for_cib = (strcasecmp(CRM_SYSTEM_CIB, sys_to) == 0);
     is_for_crm = (strcasecmp(CRM_SYSTEM_CRMD, sys_to) == 0);
 
     // Check whether message should be processed locally
     is_local = false;
     if (broadcast) {
         if (is_for_dc || is_for_te) {
             is_local = false;
 
         } else if (is_for_crm) {
             if (pcmk__strcase_any_of(task, CRM_OP_NODE_INFO,
                                      PCMK__CONTROLD_CMD_NODES, NULL)) {
                 /* Node info requests do not specify a host, which is normally
                  * treated as "all hosts", because the whole point is that the
                  * client may not know the local node name. Always handle these
                  * requests locally.
                  */
                 is_local = true;
             } else {
                 is_local = !originated_locally;
             }
 
         } else {
             is_local = true;
         }
 
     } else if (pcmk__str_eq(controld_globals.our_nodename, host_to,
                             pcmk__str_casei)) {
         is_local = true;
 
     } else if (is_for_crm && pcmk__str_eq(task, CRM_OP_LRM_DELETE, pcmk__str_casei)) {
         xmlNode *wrapper = pcmk__xe_first_child(msg, PCMK__XE_CRM_XML, NULL,
                                                 NULL);
         xmlNode *msg_data = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
         const char *mode = crm_element_value(msg_data, PCMK__XA_MODE);
 
         if (pcmk__str_eq(mode, PCMK__VALUE_CIB, pcmk__str_none)) {
             // Local delete of an offline node's resource history
             is_local = true;
         }
     }
 
     // Check whether message should be relayed
 
     if (is_for_dc || is_for_dcib || is_for_te) {
         if (AM_I_DC) {
             if (is_for_te) {
                 crm_trace("Route message %s locally as transition request",
                           ref);
                 crm_log_xml_trace(msg, sys_to);
                 send_msg_via_ipc(msg, sys_to);
                 return TRUE; // No further processing of message is needed
             }
             crm_trace("Route message %s locally as DC request", ref);
             return FALSE; // More to be done by caller
         }
 
         if (originated_locally
             && !pcmk__strcase_any_of(sys_from, CRM_SYSTEM_PENGINE,
                                      CRM_SYSTEM_TENGINE, NULL)) {
             crm_trace("Relay message %s to DC (via %s)",
                       ref, pcmk__s(host_to, "broadcast"));
             crm_log_xml_trace(msg, "relayed");
             if (!broadcast) {
                 node_to = pcmk__get_node(0, host_to, NULL,
                                          pcmk__node_search_cluster);
             }
             send_cluster_message(node_to, dest, msg, TRUE);
             return TRUE;
         }
 
         /* Transition engine and scheduler messages are sent only to the DC on
          * the same node. If we are no longer the DC, discard this message.
          */
         crm_trace("Ignoring message %s because we are no longer DC", ref);
         crm_log_xml_trace(msg, "ignored");
         return TRUE; // No further processing of message is needed
     }
 
     if (is_local) {
         if (is_for_crm || is_for_cib) {
             crm_trace("Route message %s locally as controller request", ref);
             return FALSE; // More to be done by caller
         }
         crm_trace("Relay message %s locally to %s", ref, sys_to);
         crm_log_xml_trace(msg, "IPC-relay");
         send_msg_via_ipc(msg, sys_to);
         return TRUE;
     }
 
     if (!broadcast) {
         node_to = pcmk__search_node_caches(0, host_to,
                                            pcmk__node_search_cluster);
         if (node_to == NULL) {
             crm_warn("Ignoring message %s because node %s is unknown",
                      ref, host_to);
             crm_log_xml_trace(msg, "ignored");
             return TRUE;
         }
     }
 
     crm_trace("Relay message %s to %s",
               ref, pcmk__s(host_to, "all peers"));
     crm_log_xml_trace(msg, "relayed");
     send_cluster_message(node_to, dest, msg, TRUE);
     return TRUE;
 }
 
 // Return true if field contains a positive integer
 static bool
 authorize_version(xmlNode *message_data, const char *field,
                   const char *client_name, const char *ref, const char *uuid)
 {
     const char *version = crm_element_value(message_data, field);
     long long version_num;
 
     if ((pcmk__scan_ll(version, &version_num, -1LL) != pcmk_rc_ok)
         || (version_num < 0LL)) {
 
         crm_warn("Rejected IPC hello from %s: '%s' is not a valid protocol %s "
                  CRM_XS " ref=%s uuid=%s",
                  client_name, ((version == NULL)? "" : version),
                  field, (ref? ref : "none"), uuid);
         return false;
     }
     return true;
 }
 
 /*!
  * \internal
  * \brief Check whether a client IPC message is acceptable
  *
  * If a given client IPC message is a hello, "authorize" it by ensuring it has
  * valid information such as a protocol version, and return false indicating
  * that nothing further needs to be done with the message. If the message is not
  * a hello, just return true to indicate it needs further processing.
  *
  * \param[in]     client_msg     XML of IPC message
  * \param[in,out] curr_client    If IPC is not proxied, client that sent message
  * \param[in]     proxy_session  If IPC is proxied, the session ID
  *
  * \return true if message needs further processing, false if it doesn't
  */
 bool
 controld_authorize_ipc_message(const xmlNode *client_msg, pcmk__client_t *curr_client,
                                const char *proxy_session)
 {
     xmlNode *wrapper = NULL;
     xmlNode *message_data = NULL;
     const char *client_name = NULL;
     const char *op = crm_element_value(client_msg, PCMK__XA_CRM_TASK);
     const char *ref = crm_element_value(client_msg, PCMK_XA_REFERENCE);
     const char *uuid = (curr_client? curr_client->id : proxy_session);
 
     if (uuid == NULL) {
         crm_warn("IPC message from client rejected: No client identifier "
                  CRM_XS " ref=%s", (ref? ref : "none"));
         goto rejected;
     }
 
     if (!pcmk__str_eq(CRM_OP_HELLO, op, pcmk__str_casei)) {
         // Only hello messages need to be authorized
         return true;
     }
 
     wrapper = pcmk__xe_first_child(client_msg, PCMK__XE_CRM_XML, NULL, NULL);
     message_data = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
 
     client_name = crm_element_value(message_data, PCMK__XA_CLIENT_NAME);
     if (pcmk__str_empty(client_name)) {
         crm_warn("IPC hello from client rejected: No client name",
                  CRM_XS " ref=%s uuid=%s", (ref? ref : "none"), uuid);
         goto rejected;
     }
     if (!authorize_version(message_data, PCMK__XA_MAJOR_VERSION, client_name,
                            ref, uuid)) {
         goto rejected;
     }
     if (!authorize_version(message_data, PCMK__XA_MINOR_VERSION, client_name,
                            ref, uuid)) {
         goto rejected;
     }
 
     crm_trace("Validated IPC hello from client %s", client_name);
     crm_log_xml_trace(client_msg, "hello");
     if (curr_client) {
         curr_client->userdata = pcmk__str_copy(client_name);
     }
     controld_trigger_fsa();
     return false;
 
 rejected:
     crm_log_xml_trace(client_msg, "rejected");
     if (curr_client) {
         qb_ipcs_disconnect(curr_client->ipcs);
     }
     return false;
 }
 
 static enum crmd_fsa_input
 handle_message(xmlNode *msg, enum crmd_fsa_cause cause)
 {
     const char *type = NULL;
 
     CRM_CHECK(msg != NULL, return I_NULL);
 
     type = crm_element_value(msg, PCMK__XA_SUBT);
     if (pcmk__str_eq(type, PCMK__VALUE_REQUEST, pcmk__str_none)) {
         return handle_request(msg, cause);
     }
 
     if (pcmk__str_eq(type, PCMK__VALUE_RESPONSE, pcmk__str_none)) {
         handle_response(msg);
         return I_NULL;
     }
 
     crm_warn("Ignoring message with unknown " PCMK__XA_SUBT" '%s'",
              pcmk__s(type, ""));
     crm_log_xml_trace(msg, "bad");
     return I_NULL;
 }
 
 static enum crmd_fsa_input
 handle_failcount_op(xmlNode * stored_msg)
 {
     const char *rsc = NULL;
     const char *uname = NULL;
     const char *op = NULL;
     char *interval_spec = NULL;
     guint interval_ms = 0;
     gboolean is_remote_node = FALSE;
 
     xmlNode *wrapper = pcmk__xe_first_child(stored_msg, PCMK__XE_CRM_XML, NULL,
                                             NULL);
     xmlNode *xml_op = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
 
     if (xml_op) {
         xmlNode *xml_rsc = pcmk__xe_first_child(xml_op, PCMK_XE_PRIMITIVE, NULL,
                                                 NULL);
         xmlNode *xml_attrs = pcmk__xe_first_child(xml_op, PCMK__XE_ATTRIBUTES,
                                                   NULL, NULL);
 
         if (xml_rsc) {
             rsc = pcmk__xe_id(xml_rsc);
         }
         if (xml_attrs) {
             op = crm_element_value(xml_attrs,
                                    CRM_META "_" PCMK__META_CLEAR_FAILURE_OP);
             crm_element_value_ms(xml_attrs,
                                  CRM_META "_" PCMK__META_CLEAR_FAILURE_INTERVAL,
                                  &interval_ms);
         }
     }
     uname = crm_element_value(xml_op, PCMK__META_ON_NODE);
 
     if ((rsc == NULL) || (uname == NULL)) {
         crm_log_xml_warn(stored_msg, "invalid failcount op");
         return I_NULL;
     }
 
     if (crm_element_value(xml_op, PCMK__XA_ROUTER_NODE)) {
         is_remote_node = TRUE;
     }
 
     crm_debug("Clearing failures for %s-interval %s on %s "
               "from attribute manager, CIB, and executor state",
               pcmk__readable_interval(interval_ms), rsc, uname);
 
     if (interval_ms) {
         interval_spec = crm_strdup_printf("%ums", interval_ms);
     }
     update_attrd_clear_failures(uname, rsc, op, interval_spec, is_remote_node);
     free(interval_spec);
 
     controld_cib_delete_last_failure(rsc, uname, op, interval_ms);
 
     lrm_clear_last_failure(rsc, uname, op, interval_ms);
 
     return I_NULL;
 }
 
 static enum crmd_fsa_input
 handle_lrm_delete(xmlNode *stored_msg)
 {
     const char *mode = NULL;
     xmlNode *wrapper = pcmk__xe_first_child(stored_msg, PCMK__XE_CRM_XML, NULL,
                                             NULL);
     xmlNode *msg_data = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
 
     CRM_CHECK(msg_data != NULL, return I_NULL);
 
     /* CRM_OP_LRM_DELETE has two distinct modes. The default behavior is to
      * relay the operation to the affected node, which will unregister the
      * resource from the local executor, clear the resource's history from the
      * CIB, and do some bookkeeping in the controller.
      *
      * However, if the affected node is offline, the client will specify
      * mode=PCMK__VALUE_CIB which means the controller receiving the operation
      * should clear the resource's history from the CIB and nothing else. This
      * is used to clear shutdown locks.
      */
     mode = crm_element_value(msg_data, PCMK__XA_MODE);
     if (!pcmk__str_eq(mode, PCMK__VALUE_CIB, pcmk__str_none)) {
         // Relay to affected node
         crm_xml_add(stored_msg, PCMK__XA_CRM_SYS_TO, CRM_SYSTEM_LRMD);
         return I_ROUTER;
 
     } else {
         // Delete CIB history locally (compare with do_lrm_delete())
         const char *from_sys = NULL;
         const char *user_name = NULL;
         const char *rsc_id = NULL;
         const char *node = NULL;
         xmlNode *rsc_xml = NULL;
         int rc = pcmk_rc_ok;
 
         rsc_xml = pcmk__xe_first_child(msg_data, PCMK_XE_PRIMITIVE, NULL, NULL);
         CRM_CHECK(rsc_xml != NULL, return I_NULL);
 
         rsc_id = pcmk__xe_id(rsc_xml);
         from_sys = crm_element_value(stored_msg, PCMK__XA_CRM_SYS_FROM);
         node = crm_element_value(msg_data, PCMK__META_ON_NODE);
         user_name = pcmk__update_acl_user(stored_msg, PCMK__XA_CRM_USER, NULL);
         crm_debug("Handling " CRM_OP_LRM_DELETE " for %s on %s locally%s%s "
                   "(clearing CIB resource history only)", rsc_id, node,
                   (user_name? " for user " : ""), (user_name? user_name : ""));
         rc = controld_delete_resource_history(rsc_id, node, user_name,
                                               cib_dryrun|cib_sync_call);
         if (rc == pcmk_rc_ok) {
             rc = controld_delete_resource_history(rsc_id, node, user_name,
                                                   crmd_cib_smart_opt());
         }
 
         /* Notify client. Also notify tengine if mode=PCMK__VALUE_CIB and
          * op=CRM_OP_LRM_DELETE.
          */
         if (from_sys) {
             lrmd_event_data_t *op = NULL;
             const char *from_host = crm_element_value(stored_msg, PCMK__XA_SRC);
             const char *transition;
 
             if (strcmp(from_sys, CRM_SYSTEM_TENGINE)) {
                 transition = crm_element_value(msg_data,
                                                PCMK__XA_TRANSITION_KEY);
             } else {
                 transition = crm_element_value(stored_msg,
                                                PCMK__XA_TRANSITION_KEY);
             }
 
             crm_info("Notifying %s on %s that %s was%s deleted",
                      from_sys, (from_host? from_host : "local node"), rsc_id,
                      ((rc == pcmk_rc_ok)? "" : " not"));
             op = lrmd_new_event(rsc_id, PCMK_ACTION_DELETE, 0);
             op->type = lrmd_event_exec_complete;
             op->user_data = pcmk__str_copy(pcmk__s(transition, FAKE_TE_ID));
             op->params = pcmk__strkey_table(free, free);
             pcmk__insert_dup(op->params, PCMK_XA_CRM_FEATURE_SET,
                              CRM_FEATURE_SET);
             controld_rc2event(op, rc);
             controld_ack_event_directly(from_host, from_sys, NULL, op, rsc_id);
             lrmd_free_event(op);
             controld_trigger_delete_refresh(from_sys, rsc_id);
         }
         return I_NULL;
     }
 }
 
 /*!
  * \brief Handle a CRM_OP_REMOTE_STATE message by updating remote peer cache
  *
  * \param[in] msg  Message XML
  *
  * \return Next FSA input
  */
 static enum crmd_fsa_input
 handle_remote_state(const xmlNode *msg)
 {
     const char *conn_host = NULL;
     const char *remote_uname = pcmk__xe_id(msg);
     crm_node_t *remote_peer;
     bool remote_is_up = false;
     int rc = pcmk_rc_ok;
 
     rc = pcmk__xe_get_bool_attr(msg, PCMK__XA_IN_CCM, &remote_is_up);
 
     CRM_CHECK(remote_uname && rc == pcmk_rc_ok, return I_NULL);
 
-    remote_peer = crm_remote_peer_get(remote_uname);
+    remote_peer = pcmk__cluster_lookup_remote_node(remote_uname);
     CRM_CHECK(remote_peer, return I_NULL);
 
     pcmk__update_peer_state(__func__, remote_peer,
                             remote_is_up ? CRM_NODE_MEMBER : CRM_NODE_LOST,
                             0);
 
     conn_host = crm_element_value(msg, PCMK__XA_CONNECTION_HOST);
     if (conn_host) {
         pcmk__str_update(&remote_peer->conn_host, conn_host);
     } else if (remote_peer->conn_host) {
         free(remote_peer->conn_host);
         remote_peer->conn_host = NULL;
     }
 
     return I_NULL;
 }
 
 /*!
  * \brief Handle a CRM_OP_PING message
  *
  * \param[in] msg  Message XML
  *
  * \return Next FSA input
  */
 static enum crmd_fsa_input
 handle_ping(const xmlNode *msg)
 {
     const char *value = NULL;
     xmlNode *ping = NULL;
     xmlNode *reply = NULL;
 
     // Build reply
 
     ping = pcmk__xe_create(NULL, PCMK__XE_PING_RESPONSE);
     value = crm_element_value(msg, PCMK__XA_CRM_SYS_TO);
     crm_xml_add(ping, PCMK__XA_CRM_SUBSYSTEM, value);
 
     // Add controller state
     value = fsa_state2string(controld_globals.fsa_state);
     crm_xml_add(ping, PCMK__XA_CRMD_STATE, value);
     crm_notice("Current ping state: %s", value); // CTS needs this
 
     // Add controller health
     // @TODO maybe do some checks to determine meaningful status
     crm_xml_add(ping, PCMK_XA_RESULT, "ok");
 
     // Send reply
     reply = create_reply(msg, ping);
     free_xml(ping);
     if (reply != NULL) {
         (void) relay_message(reply, TRUE);
         free_xml(reply);
     }
 
     // Nothing further to do
     return I_NULL;
 }
 
 /*!
  * \brief Handle a PCMK__CONTROLD_CMD_NODES message
  *
  * \param[in] request  Message XML
  *
  * \return Next FSA input
  */
 static enum crmd_fsa_input
 handle_node_list(const xmlNode *request)
 {
     GHashTableIter iter;
     crm_node_t *node = NULL;
     xmlNode *reply = NULL;
     xmlNode *reply_data = NULL;
 
     // Create message data for reply
     reply_data = pcmk__xe_create(NULL, PCMK_XE_NODES);
     g_hash_table_iter_init(&iter, crm_peer_cache);
     while (g_hash_table_iter_next(&iter, NULL, (gpointer *) & node)) {
         xmlNode *xml = pcmk__xe_create(reply_data, PCMK_XE_NODE);
 
         crm_xml_add_ll(xml, PCMK_XA_ID, (long long) node->id); // uint32_t
         crm_xml_add(xml, PCMK_XA_UNAME, node->uname);
         crm_xml_add(xml, PCMK__XA_IN_CCM, node->state);
     }
 
     // Create and send reply
     reply = create_reply(request, reply_data);
     free_xml(reply_data);
     if (reply) {
         (void) relay_message(reply, TRUE);
         free_xml(reply);
     }
 
     // Nothing further to do
     return I_NULL;
 }
 
 /*!
  * \brief Handle a CRM_OP_NODE_INFO request
  *
  * \param[in] msg  Message XML
  *
  * \return Next FSA input
  */
 static enum crmd_fsa_input
 handle_node_info_request(const xmlNode *msg)
 {
     const char *value = NULL;
     crm_node_t *node = NULL;
     int node_id = 0;
     xmlNode *reply = NULL;
     xmlNode *reply_data = NULL;
 
     // Build reply
 
     reply_data = pcmk__xe_create(NULL, PCMK_XE_NODE);
     crm_xml_add(reply_data, PCMK__XA_CRM_SUBSYSTEM, CRM_SYSTEM_CRMD);
 
     // Add whether current partition has quorum
     pcmk__xe_set_bool_attr(reply_data, PCMK_XA_HAVE_QUORUM,
                            pcmk_is_set(controld_globals.flags,
                                        controld_has_quorum));
 
     // Check whether client requested node info by ID and/or name
     crm_element_value_int(msg, PCMK_XA_ID, &node_id);
     if (node_id < 0) {
         node_id = 0;
     }
     value = crm_element_value(msg, PCMK_XA_UNAME);
 
     // Default to local node if none given
     if ((node_id == 0) && (value == NULL)) {
         value = controld_globals.our_nodename;
     }
 
     node = pcmk__search_node_caches(node_id, value, pcmk__node_search_any);
     if (node) {
         crm_xml_add(reply_data, PCMK_XA_ID, node->uuid);
         crm_xml_add(reply_data, PCMK_XA_UNAME, node->uname);
         crm_xml_add(reply_data, PCMK_XA_CRMD, node->state);
         pcmk__xe_set_bool_attr(reply_data, PCMK_XA_REMOTE_NODE,
                                pcmk_is_set(node->flags, crm_remote_node));
     }
 
     // Send reply
     reply = create_reply(msg, reply_data);
     free_xml(reply_data);
     if (reply != NULL) {
         (void) relay_message(reply, TRUE);
         free_xml(reply);
     }
 
     // Nothing further to do
     return I_NULL;
 }
 
 static void
 verify_feature_set(xmlNode *msg)
 {
     const char *dc_version = crm_element_value(msg, PCMK_XA_CRM_FEATURE_SET);
 
     if (dc_version == NULL) {
         /* All we really know is that the DC feature set is older than 3.1.0,
          * but that's also all that really matters.
          */
         dc_version = "3.0.14";
     }
 
     if (feature_set_compatible(dc_version, CRM_FEATURE_SET)) {
         crm_trace("Local feature set (%s) is compatible with DC's (%s)",
                   CRM_FEATURE_SET, dc_version);
     } else {
         crm_err("Local feature set (%s) is incompatible with DC's (%s)",
                 CRM_FEATURE_SET, dc_version);
 
         // Nothing is likely to improve without administrator involvement
         controld_set_fsa_input_flags(R_STAYDOWN);
         crmd_exit(CRM_EX_FATAL);
     }
 }
 
 // DC gets own shutdown all-clear
 static enum crmd_fsa_input
 handle_shutdown_self_ack(xmlNode *stored_msg)
 {
     const char *host_from = crm_element_value(stored_msg, PCMK__XA_SRC);
 
     if (pcmk_is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) {
         // The expected case -- we initiated own shutdown sequence
         crm_info("Shutting down controller");
         return I_STOP;
     }
 
     if (pcmk__str_eq(host_from, controld_globals.dc_name, pcmk__str_casei)) {
         // Must be logic error -- DC confirming its own unrequested shutdown
         crm_err("Shutting down controller immediately due to "
                 "unexpected shutdown confirmation");
         return I_TERMINATE;
     }
 
     if (controld_globals.fsa_state != S_STOPPING) {
         // Shouldn't happen -- non-DC confirming unrequested shutdown
         crm_err("Starting new DC election because %s is "
                 "confirming shutdown we did not request",
                 (host_from? host_from : "another node"));
         return I_ELECTION;
     }
 
     // Shouldn't happen, but we are already stopping anyway
     crm_debug("Ignoring unexpected shutdown confirmation from %s",
               (host_from? host_from : "another node"));
     return I_NULL;
 }
 
 // Non-DC gets shutdown all-clear from DC
 static enum crmd_fsa_input
 handle_shutdown_ack(xmlNode *stored_msg)
 {
     const char *host_from = crm_element_value(stored_msg, PCMK__XA_SRC);
 
     if (host_from == NULL) {
         crm_warn("Ignoring shutdown request without origin specified");
         return I_NULL;
     }
 
     if (pcmk__str_eq(host_from, controld_globals.dc_name,
                      pcmk__str_null_matches|pcmk__str_casei)) {
 
         if (pcmk_is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) {
             crm_info("Shutting down controller after confirmation from %s",
                      host_from);
         } else {
             crm_err("Shutting down controller after unexpected "
                     "shutdown request from %s", host_from);
             controld_set_fsa_input_flags(R_STAYDOWN);
         }
         return I_STOP;
     }
 
     crm_warn("Ignoring shutdown request from %s because DC is %s",
              host_from, controld_globals.dc_name);
     return I_NULL;
 }
 
 static enum crmd_fsa_input
 handle_request(xmlNode *stored_msg, enum crmd_fsa_cause cause)
 {
     xmlNode *msg = NULL;
     const char *op = crm_element_value(stored_msg, PCMK__XA_CRM_TASK);
 
     /* Optimize this for the DC - it has the most to do */
 
     crm_log_xml_trace(stored_msg, "request");
     if (op == NULL) {
         crm_warn("Ignoring request without " PCMK__XA_CRM_TASK);
         return I_NULL;
     }
 
     if (strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0) {
         const char *from = crm_element_value(stored_msg, PCMK__XA_SRC);
         crm_node_t *node = pcmk__search_node_caches(0, from,
                                                     pcmk__node_search_cluster);
 
         pcmk__update_peer_expected(__func__, node, CRMD_JOINSTATE_DOWN);
         if(AM_I_DC == FALSE) {
             return I_NULL; /* Done */
         }
     }
 
     /*========== DC-Only Actions ==========*/
     if (AM_I_DC) {
         if (strcmp(op, CRM_OP_JOIN_ANNOUNCE) == 0) {
             return I_NODE_JOIN;
 
         } else if (strcmp(op, CRM_OP_JOIN_REQUEST) == 0) {
             return I_JOIN_REQUEST;
 
         } else if (strcmp(op, CRM_OP_JOIN_CONFIRM) == 0) {
             return I_JOIN_RESULT;
 
         } else if (strcmp(op, CRM_OP_SHUTDOWN) == 0) {
             return handle_shutdown_self_ack(stored_msg);
 
         } else if (strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0) {
             // Another controller wants to shut down its node
             return handle_shutdown_request(stored_msg);
         }
     }
 
     /*========== common actions ==========*/
     if (strcmp(op, CRM_OP_NOVOTE) == 0) {
         ha_msg_input_t fsa_input;
 
         fsa_input.msg = stored_msg;
         register_fsa_input_adv(C_HA_MESSAGE, I_NULL, &fsa_input,
                                A_ELECTION_COUNT | A_ELECTION_CHECK, FALSE,
                                __func__);
 
     } else if (strcmp(op, CRM_OP_REMOTE_STATE) == 0) {
         /* a remote connection host is letting us know the node state */
         return handle_remote_state(stored_msg);
 
     } else if (strcmp(op, CRM_OP_THROTTLE) == 0) {
         throttle_update(stored_msg);
         if (AM_I_DC && (controld_globals.transition_graph != NULL)
             && !controld_globals.transition_graph->complete) {
 
             crm_debug("The throttle changed. Trigger a graph.");
             trigger_graph();
         }
         return I_NULL;
 
     } else if (strcmp(op, CRM_OP_CLEAR_FAILCOUNT) == 0) {
         return handle_failcount_op(stored_msg);
 
     } else if (strcmp(op, CRM_OP_VOTE) == 0) {
         /* count the vote and decide what to do after that */
         ha_msg_input_t fsa_input;
 
         fsa_input.msg = stored_msg;
         register_fsa_input_adv(C_HA_MESSAGE, I_NULL, &fsa_input,
                                A_ELECTION_COUNT | A_ELECTION_CHECK, FALSE,
                                __func__);
 
         /* Sometimes we _must_ go into S_ELECTION */
         if (controld_globals.fsa_state == S_HALT) {
             crm_debug("Forcing an election from S_HALT");
             return I_ELECTION;
         }
 
     } else if (strcmp(op, CRM_OP_JOIN_OFFER) == 0) {
         verify_feature_set(stored_msg);
         crm_debug("Raising I_JOIN_OFFER: join-%s",
                   crm_element_value(stored_msg, PCMK__XA_JOIN_ID));
         return I_JOIN_OFFER;
 
     } else if (strcmp(op, CRM_OP_JOIN_ACKNAK) == 0) {
         crm_debug("Raising I_JOIN_RESULT: join-%s",
                   crm_element_value(stored_msg, PCMK__XA_JOIN_ID));
         return I_JOIN_RESULT;
 
     } else if (strcmp(op, CRM_OP_LRM_DELETE) == 0) {
         return handle_lrm_delete(stored_msg);
 
     } else if ((strcmp(op, CRM_OP_LRM_FAIL) == 0)
                || (strcmp(op, CRM_OP_LRM_REFRESH) == 0) // @COMPAT
                || (strcmp(op, CRM_OP_REPROBE) == 0)) {
 
         crm_xml_add(stored_msg, PCMK__XA_CRM_SYS_TO, CRM_SYSTEM_LRMD);
         return I_ROUTER;
 
     } else if (strcmp(op, CRM_OP_NOOP) == 0) {
         return I_NULL;
 
     } else if (strcmp(op, CRM_OP_PING) == 0) {
         return handle_ping(stored_msg);
 
     } else if (strcmp(op, CRM_OP_NODE_INFO) == 0) {
         return handle_node_info_request(stored_msg);
 
     } else if (strcmp(op, CRM_OP_RM_NODE_CACHE) == 0) {
         int id = 0;
         const char *name = NULL;
 
         crm_element_value_int(stored_msg, PCMK_XA_ID, &id);
         name = crm_element_value(stored_msg, PCMK_XA_UNAME);
 
         if(cause == C_IPC_MESSAGE) {
             msg = create_request(CRM_OP_RM_NODE_CACHE, NULL, NULL, CRM_SYSTEM_CRMD, CRM_SYSTEM_CRMD, NULL);
             if (send_cluster_message(NULL, crm_msg_crmd, msg, TRUE) == FALSE) {
                 crm_err("Could not instruct peers to remove references to node %s/%u", name, id);
             } else {
                 crm_notice("Instructing peers to remove references to node %s/%u", name, id);
             }
             free_xml(msg);
 
         } else {
             reap_crm_member(id, name);
 
             /* If we're forgetting this node, also forget any failures to fence
              * it, so we don't carry that over to any node added later with the
              * same name.
              */
             st_fail_count_reset(name);
         }
 
     } else if (strcmp(op, CRM_OP_MAINTENANCE_NODES) == 0) {
         xmlNode *wrapper = pcmk__xe_first_child(stored_msg, PCMK__XE_CRM_XML,
                                                 NULL, NULL);
         xmlNode *xml = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
 
         remote_ra_process_maintenance_nodes(xml);
 
     } else if (strcmp(op, PCMK__CONTROLD_CMD_NODES) == 0) {
         return handle_node_list(stored_msg);
 
         /*========== (NOT_DC)-Only Actions ==========*/
     } else if (!AM_I_DC) {
 
         if (strcmp(op, CRM_OP_SHUTDOWN) == 0) {
             return handle_shutdown_ack(stored_msg);
         }
 
     } else {
         crm_err("Unexpected request (%s) sent to %s", op, AM_I_DC ? "the DC" : "non-DC node");
         crm_log_xml_err(stored_msg, "Unexpected");
     }
 
     return I_NULL;
 }
 
 static void
 handle_response(xmlNode *stored_msg)
 {
     const char *op = crm_element_value(stored_msg, PCMK__XA_CRM_TASK);
 
     crm_log_xml_trace(stored_msg, "reply");
     if (op == NULL) {
         crm_warn("Ignoring reply without " PCMK__XA_CRM_TASK);
 
     } else if (AM_I_DC && strcmp(op, CRM_OP_PECALC) == 0) {
         // Check whether scheduler answer been superseded by subsequent request
         const char *msg_ref = crm_element_value(stored_msg, PCMK_XA_REFERENCE);
 
         if (msg_ref == NULL) {
             crm_err("%s - Ignoring calculation with no reference", op);
 
         } else if (pcmk__str_eq(msg_ref, controld_globals.fsa_pe_ref,
                                 pcmk__str_none)) {
             ha_msg_input_t fsa_input;
 
             controld_stop_sched_timer();
             fsa_input.msg = stored_msg;
             register_fsa_input_later(C_IPC_MESSAGE, I_PE_SUCCESS, &fsa_input);
 
         } else {
             crm_info("%s calculation %s is obsolete", op, msg_ref);
         }
 
     } else if (strcmp(op, CRM_OP_VOTE) == 0
                || strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0 || strcmp(op, CRM_OP_SHUTDOWN) == 0) {
 
     } else {
         const char *host_from = crm_element_value(stored_msg, PCMK__XA_SRC);
 
         crm_err("Unexpected response (op=%s, src=%s) sent to the %s",
                 op, host_from, AM_I_DC ? "DC" : "controller");
     }
 }
 
 static enum crmd_fsa_input
 handle_shutdown_request(xmlNode * stored_msg)
 {
     /* handle here to avoid potential version issues
      *   where the shutdown message/procedure may have
      *   been changed in later versions.
      *
      * This way the DC is always in control of the shutdown
      */
 
     char *now_s = NULL;
     const char *host_from = crm_element_value(stored_msg, PCMK__XA_SRC);
 
     if (host_from == NULL) {
         /* we're shutting down and the DC */
         host_from = controld_globals.our_nodename;
     }
 
     crm_info("Creating shutdown request for %s (state=%s)", host_from,
              fsa_state2string(controld_globals.fsa_state));
     crm_log_xml_trace(stored_msg, "message");
 
     now_s = pcmk__ttoa(time(NULL));
     update_attrd(host_from, PCMK__NODE_ATTR_SHUTDOWN, now_s, NULL, FALSE);
     free(now_s);
 
     /* will be picked up by the TE as long as its running */
     return I_NULL;
 }
 
 static void
 send_msg_via_ipc(xmlNode * msg, const char *sys)
 {
     pcmk__client_t *client_channel = NULL;
 
     CRM_CHECK(sys != NULL, return);
 
     client_channel = pcmk__find_client_by_id(sys);
 
     if (crm_element_value(msg, PCMK__XA_SRC) == NULL) {
         crm_xml_add(msg, PCMK__XA_SRC, controld_globals.our_nodename);
     }
 
     if (client_channel != NULL) {
         /* Transient clients such as crmadmin */
         pcmk__ipc_send_xml(client_channel, 0, msg, crm_ipc_server_event);
 
     } else if (pcmk__str_eq(sys, CRM_SYSTEM_TENGINE, pcmk__str_none)) {
         xmlNode *wrapper = pcmk__xe_first_child(msg, PCMK__XE_CRM_XML, NULL,
                                                 NULL);
         xmlNode *data = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
 
         process_te_message(msg, data);
 
     } else if (pcmk__str_eq(sys, CRM_SYSTEM_LRMD, pcmk__str_none)) {
         fsa_data_t fsa_data;
         ha_msg_input_t fsa_input;
         xmlNode *wrapper = NULL;
 
         fsa_input.msg = msg;
 
         wrapper = pcmk__xe_first_child(msg, PCMK__XE_CRM_XML, NULL, NULL);
         fsa_input.xml = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
 
         fsa_data.id = 0;
         fsa_data.actions = 0;
         fsa_data.data = &fsa_input;
         fsa_data.fsa_input = I_MESSAGE;
         fsa_data.fsa_cause = C_IPC_MESSAGE;
         fsa_data.origin = __func__;
         fsa_data.data_type = fsa_dt_ha_msg;
 
         do_lrm_invoke(A_LRM_INVOKE, C_IPC_MESSAGE, controld_globals.fsa_state,
                       I_MESSAGE, &fsa_data);
 
     } else if (crmd_is_proxy_session(sys)) {
         crmd_proxy_send(sys, msg);
 
     } else {
         crm_info("Received invalid request: unknown subsystem '%s'", sys);
     }
 }
 
 void
 delete_ha_msg_input(ha_msg_input_t * orig)
 {
     if (orig == NULL) {
         return;
     }
     free_xml(orig->msg);
     free(orig);
 }
 
 /*!
  * \internal
  * \brief Notify the cluster of a remote node state change
  *
  * \param[in] node_name  Node's name
  * \param[in] node_up    true if node is up, false if down
  */
 void
 broadcast_remote_state_message(const char *node_name, bool node_up)
 {
     xmlNode *msg = create_request(CRM_OP_REMOTE_STATE, NULL, NULL,
                                   CRM_SYSTEM_CRMD, CRM_SYSTEM_CRMD, NULL);
 
     crm_info("Notifying cluster of Pacemaker Remote node %s %s",
              node_name, node_up? "coming up" : "going down");
 
     crm_xml_add(msg, PCMK_XA_ID, node_name);
     pcmk__xe_set_bool_attr(msg, PCMK__XA_IN_CCM, node_up);
 
     if (node_up) {
         crm_xml_add(msg, PCMK__XA_CONNECTION_HOST,
                     controld_globals.our_nodename);
     }
 
     send_cluster_message(NULL, crm_msg_crmd, msg, TRUE);
     free_xml(msg);
 }
 
diff --git a/daemons/controld/controld_remote_ra.c b/daemons/controld/controld_remote_ra.c
index 6e25103a9d..2769e06f5a 100644
--- a/daemons/controld/controld_remote_ra.c
+++ b/daemons/controld/controld_remote_ra.c
@@ -1,1471 +1,1471 @@
 /*
  * Copyright 2013-2024 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU General Public License version 2
  * or later (GPLv2+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #include <crm/crm.h>
 #include <crm/common/xml.h>
 #include <crm/common/xml_internal.h>
 #include <crm/lrmd.h>
 #include <crm/lrmd_internal.h>
 #include <crm/services.h>
 
 #include <pacemaker-controld.h>
 
 #define REMOTE_LRMD_RA "remote"
 
 /* The max start timeout before cmd retry */
 #define MAX_START_TIMEOUT_MS 10000
 
 #define cmd_set_flags(cmd, flags_to_set) do { \
     (cmd)->status = pcmk__set_flags_as(__func__, __LINE__, LOG_TRACE, \
                                        "Remote command", (cmd)->rsc_id, (cmd)->status, \
                                        (flags_to_set), #flags_to_set); \
         } while (0)
 
 #define cmd_clear_flags(cmd, flags_to_clear) do { \
     (cmd)->status = pcmk__clear_flags_as(__func__, __LINE__, LOG_TRACE, \
                                          "Remote command", (cmd)->rsc_id, (cmd)->status, \
                                          (flags_to_clear), #flags_to_clear); \
         } while (0)
 
 enum remote_cmd_status {
     cmd_reported_success    = (1 << 0),
     cmd_cancel              = (1 << 1),
 };
 
 typedef struct remote_ra_cmd_s {
     /*! the local node the cmd is issued from */
     char *owner;
     /*! the remote node the cmd is executed on */
     char *rsc_id;
     /*! the action to execute */
     char *action;
     /*! some string the client wants us to give it back */
     char *userdata;
     /*! start delay in ms */
     int start_delay;
     /*! timer id used for start delay. */
     int delay_id;
     /*! timeout in ms for cmd */
     int timeout;
     int remaining_timeout;
     /*! recurring interval in ms */
     guint interval_ms;
     /*! interval timer id */
     int interval_id;
     int monitor_timeout_id;
     int takeover_timeout_id;
     /*! action parameters */
     lrmd_key_value_t *params;
     pcmk__action_result_t result;
     int call_id;
     time_t start_time;
     uint32_t status;
 } remote_ra_cmd_t;
 
 #define lrm_remote_set_flags(lrm_state, flags_to_set) do { \
     lrm_state_t *lrm = (lrm_state); \
     remote_ra_data_t *ra = lrm->remote_ra_data; \
     ra->status = pcmk__set_flags_as(__func__, __LINE__, LOG_TRACE, "Remote", \
                                     lrm->node_name, ra->status, \
                                     (flags_to_set), #flags_to_set); \
         } while (0)
 
 #define lrm_remote_clear_flags(lrm_state, flags_to_clear) do { \
     lrm_state_t *lrm = (lrm_state); \
     remote_ra_data_t *ra = lrm->remote_ra_data; \
     ra->status = pcmk__clear_flags_as(__func__, __LINE__, LOG_TRACE, "Remote", \
                                       lrm->node_name, ra->status, \
                                       (flags_to_clear), #flags_to_clear); \
         } while (0)
 
 enum remote_status {
     expect_takeover     = (1 << 0),
     takeover_complete   = (1 << 1),
     remote_active       = (1 << 2),
     /* Maintenance mode is difficult to determine from the controller's context,
      * so we have it signalled back with the transition from the scheduler.
      */
     remote_in_maint     = (1 << 3),
     /* Similar for whether we are controlling a guest node or remote node.
      * Fortunately there is a meta-attribute in the transition already and
      * as the situation doesn't change over time we can use the
      * resource start for noting down the information for later use when
      * the attributes aren't at hand.
      */
     controlling_guest   = (1 << 4),
 };
 
 typedef struct remote_ra_data_s {
     crm_trigger_t *work;
     remote_ra_cmd_t *cur_cmd;
     GList *cmds;
     GList *recurring_cmds;
     uint32_t status;
 } remote_ra_data_t;
 
 static int handle_remote_ra_start(lrm_state_t * lrm_state, remote_ra_cmd_t * cmd, int timeout_ms);
 static void handle_remote_ra_stop(lrm_state_t * lrm_state, remote_ra_cmd_t * cmd);
 static GList *fail_all_monitor_cmds(GList * list);
 
 static void
 free_cmd(gpointer user_data)
 {
     remote_ra_cmd_t *cmd = user_data;
 
     if (!cmd) {
         return;
     }
     if (cmd->delay_id) {
         g_source_remove(cmd->delay_id);
     }
     if (cmd->interval_id) {
         g_source_remove(cmd->interval_id);
     }
     if (cmd->monitor_timeout_id) {
         g_source_remove(cmd->monitor_timeout_id);
     }
     if (cmd->takeover_timeout_id) {
         g_source_remove(cmd->takeover_timeout_id);
     }
     free(cmd->owner);
     free(cmd->rsc_id);
     free(cmd->action);
     free(cmd->userdata);
     pcmk__reset_result(&(cmd->result));
     lrmd_key_value_freeall(cmd->params);
     free(cmd);
 }
 
 static int
 generate_callid(void)
 {
     static int remote_ra_callid = 0;
 
     remote_ra_callid++;
     if (remote_ra_callid <= 0) {
         remote_ra_callid = 1;
     }
 
     return remote_ra_callid;
 }
 
 static gboolean
 recurring_helper(gpointer data)
 {
     remote_ra_cmd_t *cmd = data;
     lrm_state_t *connection_rsc = NULL;
 
     cmd->interval_id = 0;
     connection_rsc = lrm_state_find(cmd->rsc_id);
     if (connection_rsc && connection_rsc->remote_ra_data) {
         remote_ra_data_t *ra_data = connection_rsc->remote_ra_data;
 
         ra_data->recurring_cmds = g_list_remove(ra_data->recurring_cmds, cmd);
 
         ra_data->cmds = g_list_append(ra_data->cmds, cmd);
         mainloop_set_trigger(ra_data->work);
     }
     return FALSE;
 }
 
 static gboolean
 start_delay_helper(gpointer data)
 {
     remote_ra_cmd_t *cmd = data;
     lrm_state_t *connection_rsc = NULL;
 
     cmd->delay_id = 0;
     connection_rsc = lrm_state_find(cmd->rsc_id);
     if (connection_rsc && connection_rsc->remote_ra_data) {
         remote_ra_data_t *ra_data = connection_rsc->remote_ra_data;
 
         mainloop_set_trigger(ra_data->work);
     }
     return FALSE;
 }
 
 static bool
 should_purge_attributes(crm_node_t *node)
 {
     bool purge = true;
     crm_node_t *conn_node = NULL;
     lrm_state_t *connection_rsc = NULL;
 
     if (!node->conn_host) {
         return purge;
     }
 
     /* Get the node that was hosting the remote connection resource from the
      * peer cache.  That's the one we really care about here.
      */
     conn_node = pcmk__get_node(0, node->conn_host, NULL,
                                pcmk__node_search_cluster);
     if (conn_node == NULL) {
         return purge;
     }
 
     /* Check the uptime of connection_rsc.  If it hasn't been running long
      * enough, set purge=true.  "Long enough" means it started running earlier
      * than the timestamp when we noticed it went away in the first place.
      */
     connection_rsc = lrm_state_find(node->uname);
 
     if (connection_rsc != NULL) {
         lrmd_t *lrm = connection_rsc->conn;
         time_t uptime = lrmd__uptime(lrm);
         time_t now = time(NULL);
 
         /* Add 20s of fuzziness to give corosync a while to notice the remote
          * host is gone.  On various error conditions (failure to get uptime,
          * peer_lost isn't set) we default to purging.
          */
         if (uptime > 0 &&
             conn_node->peer_lost > 0 &&
             uptime + 20 >= now - conn_node->peer_lost) {
             purge = false;
         }
     }
 
     return purge;
 }
 
 static enum controld_section_e
 section_to_delete(bool purge)
 {
     if (pcmk_is_set(controld_globals.flags, controld_shutdown_lock_enabled)) {
         if (purge) {
             return controld_section_all_unlocked;
         } else {
             return controld_section_lrm_unlocked;
         }
     } else {
         if (purge) {
             return controld_section_all;
         } else {
             return controld_section_lrm;
         }
     }
 }
 
 static void
 purge_remote_node_attrs(int call_opt, crm_node_t *node)
 {
     bool purge = should_purge_attributes(node);
     enum controld_section_e section = section_to_delete(purge);
 
     /* Purge node from attrd's memory */
     if (purge) {
         update_attrd_remote_node_removed(node->uname, NULL);
     }
 
     controld_delete_node_state(node->uname, section, call_opt);
 }
 
 /*!
  * \internal
  * \brief Handle cluster communication related to pacemaker_remote node joining
  *
  * \param[in] node_name  Name of newly integrated pacemaker_remote node
  */
 static void
 remote_node_up(const char *node_name)
 {
     int call_opt;
     xmlNode *update, *state;
     crm_node_t *node;
     lrm_state_t *connection_rsc = NULL;
 
     CRM_CHECK(node_name != NULL, return);
     crm_info("Announcing Pacemaker Remote node %s", node_name);
 
     call_opt = crmd_cib_smart_opt();
 
     /* Delete node's probe_complete attribute. This serves two purposes:
      *
      * - @COMPAT DCs < 1.1.14 in a rolling upgrade might use it
      * - deleting it (or any attribute for that matter) here ensures the
      *   attribute manager learns the node is remote
      */
     update_attrd(node_name, CRM_OP_PROBED, NULL, NULL, TRUE);
 
     /* Ensure node is in the remote peer cache with member status */
-    node = crm_remote_peer_get(node_name);
+    node = pcmk__cluster_lookup_remote_node(node_name);
     CRM_CHECK(node != NULL, return);
 
     purge_remote_node_attrs(call_opt, node);
     pcmk__update_peer_state(__func__, node, CRM_NODE_MEMBER, 0);
 
     /* Apply any start state that we were given from the environment on the
      * remote node.
      */
     connection_rsc = lrm_state_find(node->uname);
 
     if (connection_rsc != NULL) {
         lrmd_t *lrm = connection_rsc->conn;
         const char *start_state = lrmd__node_start_state(lrm);
 
         if (start_state) {
             set_join_state(start_state, node->uname, node->uuid, true);
         }
     }
 
     /* pacemaker_remote nodes don't participate in the membership layer,
      * so cluster nodes don't automatically get notified when they come and go.
      * We send a cluster message to the DC, and update the CIB node state entry,
      * so the DC will get it sooner (via message) or later (via CIB refresh),
      * and any other interested parties can query the CIB.
      */
     broadcast_remote_state_message(node_name, true);
 
     update = pcmk__xe_create(NULL, PCMK_XE_STATUS);
     state = create_node_state_update(node, node_update_cluster, update,
                                      __func__);
 
     /* Clear the PCMK__XA_NODE_FENCED flag in the node state. If the node ever
      * needs to be fenced, this flag will allow various actions to determine
      * whether the fencing has happened yet.
      */
     crm_xml_add(state, PCMK__XA_NODE_FENCED, "0");
 
     /* TODO: If the remote connection drops, and this (async) CIB update either
      * failed or has not yet completed, later actions could mistakenly think the
      * node has already been fenced (if the PCMK__XA_NODE_FENCED attribute was
      * previously set, because it won't have been cleared). This could prevent
      * actual fencing or allow recurring monitor failures to be cleared too
      * soon. Ideally, we wouldn't rely on the CIB for the fenced status.
      */
     controld_update_cib(PCMK_XE_STATUS, update, call_opt, NULL);
     free_xml(update);
 }
 
 enum down_opts {
     DOWN_KEEP_LRM,
     DOWN_ERASE_LRM
 };
 
 /*!
  * \internal
  * \brief Handle cluster communication related to pacemaker_remote node leaving
  *
  * \param[in] node_name  Name of lost node
  * \param[in] opts       Whether to keep or erase LRM history
  */
 static void
 remote_node_down(const char *node_name, const enum down_opts opts)
 {
     xmlNode *update;
     int call_opt = crmd_cib_smart_opt();
     crm_node_t *node;
 
     /* Purge node from attrd's memory */
     update_attrd_remote_node_removed(node_name, NULL);
 
     /* Normally, only node attributes should be erased, and the resource history
      * should be kept until the node comes back up. However, after a successful
      * fence, we want to clear the history as well, so we don't think resources
      * are still running on the node.
      */
     if (opts == DOWN_ERASE_LRM) {
         controld_delete_node_state(node_name, controld_section_all, call_opt);
     } else {
         controld_delete_node_state(node_name, controld_section_attrs, call_opt);
     }
 
     /* Ensure node is in the remote peer cache with lost state */
-    node = crm_remote_peer_get(node_name);
+    node = pcmk__cluster_lookup_remote_node(node_name);
     CRM_CHECK(node != NULL, return);
     pcmk__update_peer_state(__func__, node, CRM_NODE_LOST, 0);
 
     /* Notify DC */
     broadcast_remote_state_message(node_name, false);
 
     /* Update CIB node state */
     update = pcmk__xe_create(NULL, PCMK_XE_STATUS);
     create_node_state_update(node, node_update_cluster, update, __func__);
     controld_update_cib(PCMK_XE_STATUS, update, call_opt, NULL);
     free_xml(update);
 }
 
 /*!
  * \internal
  * \brief Handle effects of a remote RA command on node state
  *
  * \param[in] cmd  Completed remote RA command
  */
 static void
 check_remote_node_state(const remote_ra_cmd_t *cmd)
 {
     /* Only successful actions can change node state */
     if (!pcmk__result_ok(&(cmd->result))) {
         return;
     }
 
     if (pcmk__str_eq(cmd->action, PCMK_ACTION_START, pcmk__str_casei)) {
         remote_node_up(cmd->rsc_id);
 
     } else if (pcmk__str_eq(cmd->action, PCMK_ACTION_MIGRATE_FROM,
                             pcmk__str_casei)) {
         /* After a successful migration, we don't need to do remote_node_up()
          * because the DC already knows the node is up, and we don't want to
          * clear LRM history etc. We do need to add the remote node to this
          * host's remote peer cache, because (unless it happens to be DC)
          * it hasn't been tracking the remote node, and other code relies on
          * the cache to distinguish remote nodes from unseen cluster nodes.
          */
-        crm_node_t *node = crm_remote_peer_get(cmd->rsc_id);
+        crm_node_t *node = pcmk__cluster_lookup_remote_node(cmd->rsc_id);
 
         CRM_CHECK(node != NULL, return);
         pcmk__update_peer_state(__func__, node, CRM_NODE_MEMBER, 0);
 
     } else if (pcmk__str_eq(cmd->action, PCMK_ACTION_STOP, pcmk__str_casei)) {
         lrm_state_t *lrm_state = lrm_state_find(cmd->rsc_id);
         remote_ra_data_t *ra_data = lrm_state? lrm_state->remote_ra_data : NULL;
 
         if (ra_data) {
             if (!pcmk_is_set(ra_data->status, takeover_complete)) {
                 /* Stop means down if we didn't successfully migrate elsewhere */
                 remote_node_down(cmd->rsc_id, DOWN_KEEP_LRM);
             } else if (AM_I_DC == FALSE) {
                 /* Only the connection host and DC track node state,
                  * so if the connection migrated elsewhere and we aren't DC,
                  * un-cache the node, so we don't have stale info
                  */
                 crm_remote_peer_cache_remove(cmd->rsc_id);
             }
         }
     }
 
     /* We don't do anything for successful monitors, which is correct for
      * routine recurring monitors, and for monitors on nodes where the
      * connection isn't supposed to be (the cluster will stop the connection in
      * that case). However, if the initial probe finds the connection already
      * active on the node where we want it, we probably should do
      * remote_node_up(). Unfortunately, we can't distinguish that case here.
      * Given that connections have to be initiated by the cluster, the chance of
      * that should be close to zero.
      */
 }
 
 static void
 report_remote_ra_result(remote_ra_cmd_t * cmd)
 {
     lrmd_event_data_t op = { 0, };
 
     check_remote_node_state(cmd);
 
     op.type = lrmd_event_exec_complete;
     op.rsc_id = cmd->rsc_id;
     op.op_type = cmd->action;
     op.user_data = cmd->userdata;
     op.timeout = cmd->timeout;
     op.interval_ms = cmd->interval_ms;
     op.t_run = (unsigned int) cmd->start_time;
     op.t_rcchange = (unsigned int) cmd->start_time;
 
     lrmd__set_result(&op, cmd->result.exit_status, cmd->result.execution_status,
                      cmd->result.exit_reason);
 
     if (pcmk_is_set(cmd->status, cmd_reported_success) && !pcmk__result_ok(&(cmd->result))) {
         op.t_rcchange = (unsigned int) time(NULL);
         /* This edge case will likely never ever occur, but if it does the
          * result is that a failure will not be processed correctly. This is only
          * remotely possible because we are able to detect a connection resource's tcp
          * connection has failed at any moment after start has completed. The actual
          * recurring operation is just a connectivity ping.
          *
          * basically, we are not guaranteed that the first successful monitor op and
          * a subsequent failed monitor op will not occur in the same timestamp. We have to
          * make it look like the operations occurred at separate times though. */
         if (op.t_rcchange == op.t_run) {
             op.t_rcchange++;
         }
     }
 
     if (cmd->params) {
         lrmd_key_value_t *tmp;
 
         op.params = pcmk__strkey_table(free, free);
         for (tmp = cmd->params; tmp; tmp = tmp->next) {
             pcmk__insert_dup(op.params, tmp->key, tmp->value);
         }
 
     }
     op.call_id = cmd->call_id;
     op.remote_nodename = cmd->owner;
 
     lrm_op_callback(&op);
 
     if (op.params) {
         g_hash_table_destroy(op.params);
     }
     lrmd__reset_result(&op);
 }
 
 static void
 update_remaining_timeout(remote_ra_cmd_t * cmd)
 {
     cmd->remaining_timeout = ((cmd->timeout / 1000) - (time(NULL) - cmd->start_time)) * 1000;
 }
 
 static gboolean
 retry_start_cmd_cb(gpointer data)
 {
     lrm_state_t *lrm_state = data;
     remote_ra_data_t *ra_data = lrm_state->remote_ra_data;
     remote_ra_cmd_t *cmd = NULL;
     int rc = ETIME;
 
     if (!ra_data || !ra_data->cur_cmd) {
         return FALSE;
     }
     cmd = ra_data->cur_cmd;
     if (!pcmk__strcase_any_of(cmd->action, PCMK_ACTION_START,
                               PCMK_ACTION_MIGRATE_FROM, NULL)) {
         return FALSE;
     }
     update_remaining_timeout(cmd);
 
     if (cmd->remaining_timeout > 0) {
         rc = handle_remote_ra_start(lrm_state, cmd, cmd->remaining_timeout);
     } else {
         pcmk__set_result(&(cmd->result), PCMK_OCF_UNKNOWN_ERROR,
                          PCMK_EXEC_TIMEOUT,
                          "Not enough time remains to retry remote connection");
     }
 
     if (rc != pcmk_rc_ok) {
         report_remote_ra_result(cmd);
 
         if (ra_data->cmds) {
             mainloop_set_trigger(ra_data->work);
         }
         ra_data->cur_cmd = NULL;
         free_cmd(cmd);
     } else {
         /* wait for connection event */
     }
 
     return FALSE;
 }
 
 
 static gboolean
 connection_takeover_timeout_cb(gpointer data)
 {
     lrm_state_t *lrm_state = NULL;
     remote_ra_cmd_t *cmd = data;
 
     crm_info("takeover event timed out for node %s", cmd->rsc_id);
     cmd->takeover_timeout_id = 0;
 
     lrm_state = lrm_state_find(cmd->rsc_id);
 
     handle_remote_ra_stop(lrm_state, cmd);
     free_cmd(cmd);
 
     return FALSE;
 }
 
 static gboolean
 monitor_timeout_cb(gpointer data)
 {
     lrm_state_t *lrm_state = NULL;
     remote_ra_cmd_t *cmd = data;
 
     lrm_state = lrm_state_find(cmd->rsc_id);
 
     crm_info("Timed out waiting for remote poke response from %s%s",
              cmd->rsc_id, (lrm_state? "" : " (no LRM state)"));
     cmd->monitor_timeout_id = 0;
     pcmk__set_result(&(cmd->result), PCMK_OCF_UNKNOWN_ERROR, PCMK_EXEC_TIMEOUT,
                      "Remote executor did not respond");
 
     if (lrm_state && lrm_state->remote_ra_data) {
         remote_ra_data_t *ra_data = lrm_state->remote_ra_data;
 
         if (ra_data->cur_cmd == cmd) {
             ra_data->cur_cmd = NULL;
         }
         if (ra_data->cmds) {
             mainloop_set_trigger(ra_data->work);
         }
     }
 
     report_remote_ra_result(cmd);
     free_cmd(cmd);
 
     if(lrm_state) {
         lrm_state_disconnect(lrm_state);
     }
     return FALSE;
 }
 
 static void
 synthesize_lrmd_success(lrm_state_t *lrm_state, const char *rsc_id, const char *op_type)
 {
     lrmd_event_data_t op = { 0, };
 
     if (lrm_state == NULL) {
         /* if lrm_state not given assume local */
         lrm_state = lrm_state_find(controld_globals.our_nodename);
     }
     CRM_ASSERT(lrm_state != NULL);
 
     op.type = lrmd_event_exec_complete;
     op.rsc_id = rsc_id;
     op.op_type = op_type;
     op.t_run = (unsigned int) time(NULL);
     op.t_rcchange = op.t_run;
     op.call_id = generate_callid();
     lrmd__set_result(&op, PCMK_OCF_OK, PCMK_EXEC_DONE, NULL);
     process_lrm_event(lrm_state, &op, NULL, NULL);
 }
 
 void
 remote_lrm_op_callback(lrmd_event_data_t * op)
 {
     gboolean cmd_handled = FALSE;
     lrm_state_t *lrm_state = NULL;
     remote_ra_data_t *ra_data = NULL;
     remote_ra_cmd_t *cmd = NULL;
 
     crm_debug("Processing '%s%s%s' event on remote connection to %s: %s "
               "(%d) status=%s (%d)",
               (op->op_type? op->op_type : ""), (op->op_type? " " : ""),
               lrmd_event_type2str(op->type), op->remote_nodename,
               services_ocf_exitcode_str(op->rc), op->rc,
               pcmk_exec_status_str(op->op_status), op->op_status);
 
     lrm_state = lrm_state_find(op->remote_nodename);
     if (!lrm_state || !lrm_state->remote_ra_data) {
         crm_debug("No state information found for remote connection event");
         return;
     }
     ra_data = lrm_state->remote_ra_data;
 
     if (op->type == lrmd_event_new_client) {
         // Another client has connected to the remote daemon
 
         if (pcmk_is_set(ra_data->status, expect_takeover)) {
             // Great, we knew this was coming
             lrm_remote_clear_flags(lrm_state, expect_takeover);
             lrm_remote_set_flags(lrm_state, takeover_complete);
 
         } else {
             crm_err("Disconnecting from Pacemaker Remote node %s due to "
                     "unexpected client takeover", op->remote_nodename);
             /* In this case, lrmd_tls_connection_destroy() will be called under the control of mainloop. */
             /* Do not free lrm_state->conn yet. */
             /* It'll be freed in the following stop action. */
             lrm_state_disconnect_only(lrm_state);
         }
         return;
     }
 
     /* filter all EXEC events up */
     if (op->type == lrmd_event_exec_complete) {
         if (pcmk_is_set(ra_data->status, takeover_complete)) {
             crm_debug("ignoring event, this connection is taken over by another node");
         } else {
             lrm_op_callback(op);
         }
         return;
     }
 
     if ((op->type == lrmd_event_disconnect) && (ra_data->cur_cmd == NULL)) {
 
         if (!pcmk_is_set(ra_data->status, remote_active)) {
             crm_debug("Disconnection from Pacemaker Remote node %s complete",
                       lrm_state->node_name);
 
         } else if (!remote_ra_is_in_maintenance(lrm_state)) {
             crm_err("Lost connection to Pacemaker Remote node %s",
                     lrm_state->node_name);
             ra_data->recurring_cmds = fail_all_monitor_cmds(ra_data->recurring_cmds);
             ra_data->cmds = fail_all_monitor_cmds(ra_data->cmds);
 
         } else {
             crm_notice("Unmanaged Pacemaker Remote node %s disconnected",
                        lrm_state->node_name);
             /* Do roughly what a 'stop' on the remote-resource would do */
             handle_remote_ra_stop(lrm_state, NULL);
             remote_node_down(lrm_state->node_name, DOWN_KEEP_LRM);
             /* now fake the reply of a successful 'stop' */
             synthesize_lrmd_success(NULL, lrm_state->node_name,
                                     PCMK_ACTION_STOP);
         }
         return;
     }
 
     if (!ra_data->cur_cmd) {
         crm_debug("no event to match");
         return;
     }
 
     cmd = ra_data->cur_cmd;
 
     /* Start actions and migrate from actions complete after connection
      * comes back to us. */
     if ((op->type == lrmd_event_connect)
         && pcmk__strcase_any_of(cmd->action, PCMK_ACTION_START,
                                 PCMK_ACTION_MIGRATE_FROM, NULL)) {
         if (op->connection_rc < 0) {
             update_remaining_timeout(cmd);
 
             if ((op->connection_rc == -ENOKEY)
                 || (op->connection_rc == -EKEYREJECTED)) {
                 // Hard error, don't retry
                 pcmk__set_result(&(cmd->result), PCMK_OCF_INVALID_PARAM,
                                  PCMK_EXEC_ERROR,
                                  pcmk_strerror(op->connection_rc));
 
             } else if (cmd->remaining_timeout > 3000) {
                 crm_trace("rescheduling start, remaining timeout %d", cmd->remaining_timeout);
                 g_timeout_add(1000, retry_start_cmd_cb, lrm_state);
                 return;
 
             } else {
                 crm_trace("can't reschedule start, remaining timeout too small %d",
                           cmd->remaining_timeout);
                 pcmk__format_result(&(cmd->result), PCMK_OCF_UNKNOWN_ERROR,
                                     PCMK_EXEC_TIMEOUT,
                                     "%s without enough time to retry",
                                     pcmk_strerror(op->connection_rc));
             }
 
         } else {
             lrm_state_reset_tables(lrm_state, TRUE);
             pcmk__set_result(&(cmd->result), PCMK_OCF_OK, PCMK_EXEC_DONE, NULL);
             lrm_remote_set_flags(lrm_state, remote_active);
         }
 
         crm_debug("Remote connection event matched %s action", cmd->action);
         report_remote_ra_result(cmd);
         cmd_handled = TRUE;
 
     } else if ((op->type == lrmd_event_poke)
                && pcmk__str_eq(cmd->action, PCMK_ACTION_MONITOR,
                                pcmk__str_casei)) {
 
         if (cmd->monitor_timeout_id) {
             g_source_remove(cmd->monitor_timeout_id);
             cmd->monitor_timeout_id = 0;
         }
 
         /* Only report success the first time, after that only worry about failures.
          * For this function, if we get the poke pack, it is always a success. Pokes
          * only fail if the send fails, or the response times out. */
         if (!pcmk_is_set(cmd->status, cmd_reported_success)) {
             pcmk__set_result(&(cmd->result), PCMK_OCF_OK, PCMK_EXEC_DONE, NULL);
             report_remote_ra_result(cmd);
             cmd_set_flags(cmd, cmd_reported_success);
         }
 
         crm_debug("Remote poke event matched %s action", cmd->action);
 
         /* success, keep rescheduling if interval is present. */
         if (cmd->interval_ms && !pcmk_is_set(cmd->status, cmd_cancel)) {
             ra_data->recurring_cmds = g_list_append(ra_data->recurring_cmds, cmd);
             cmd->interval_id = g_timeout_add(cmd->interval_ms,
                                              recurring_helper, cmd);
             cmd = NULL;         /* prevent free */
         }
         cmd_handled = TRUE;
 
     } else if ((op->type == lrmd_event_disconnect)
                && pcmk__str_eq(cmd->action, PCMK_ACTION_MONITOR,
                                pcmk__str_casei)) {
         if (pcmk_is_set(ra_data->status, remote_active) &&
             !pcmk_is_set(cmd->status, cmd_cancel)) {
             pcmk__set_result(&(cmd->result), PCMK_OCF_UNKNOWN_ERROR,
                              PCMK_EXEC_ERROR,
                              "Remote connection unexpectedly dropped "
                              "during monitor");
             report_remote_ra_result(cmd);
             crm_err("Remote connection to %s unexpectedly dropped during monitor",
                     lrm_state->node_name);
         }
         cmd_handled = TRUE;
 
     } else if ((op->type == lrmd_event_new_client)
                && pcmk__str_eq(cmd->action, PCMK_ACTION_STOP,
                                pcmk__str_casei)) {
 
         handle_remote_ra_stop(lrm_state, cmd);
         cmd_handled = TRUE;
 
     } else {
         crm_debug("Event did not match %s action", ra_data->cur_cmd->action);
     }
 
     if (cmd_handled) {
         ra_data->cur_cmd = NULL;
         if (ra_data->cmds) {
             mainloop_set_trigger(ra_data->work);
         }
         free_cmd(cmd);
     }
 }
 
 static void
 handle_remote_ra_stop(lrm_state_t * lrm_state, remote_ra_cmd_t * cmd)
 {
     remote_ra_data_t *ra_data = NULL;
 
     CRM_ASSERT(lrm_state);
     ra_data = lrm_state->remote_ra_data;
 
     if (!pcmk_is_set(ra_data->status, takeover_complete)) {
         /* delete pending ops when ever the remote connection is intentionally stopped */
         g_hash_table_remove_all(lrm_state->active_ops);
     } else {
         /* we no longer hold the history if this connection has been migrated,
          * however, we keep metadata cache for future use */
         lrm_state_reset_tables(lrm_state, FALSE);
     }
 
     lrm_remote_clear_flags(lrm_state, remote_active);
     lrm_state_disconnect(lrm_state);
 
     if (ra_data->cmds) {
         g_list_free_full(ra_data->cmds, free_cmd);
     }
     if (ra_data->recurring_cmds) {
         g_list_free_full(ra_data->recurring_cmds, free_cmd);
     }
     ra_data->cmds = NULL;
     ra_data->recurring_cmds = NULL;
     ra_data->cur_cmd = NULL;
 
     if (cmd) {
         pcmk__set_result(&(cmd->result), PCMK_OCF_OK, PCMK_EXEC_DONE, NULL);
         report_remote_ra_result(cmd);
     }
 }
 
 // \return Standard Pacemaker return code
 static int
 handle_remote_ra_start(lrm_state_t * lrm_state, remote_ra_cmd_t * cmd, int timeout_ms)
 {
     const char *server = NULL;
     lrmd_key_value_t *tmp = NULL;
     int port = 0;
     int timeout_used = timeout_ms > MAX_START_TIMEOUT_MS ? MAX_START_TIMEOUT_MS : timeout_ms;
     int rc = pcmk_rc_ok;
 
     for (tmp = cmd->params; tmp; tmp = tmp->next) {
         if (pcmk__strcase_any_of(tmp->key,
                                  PCMK_REMOTE_RA_ADDR, PCMK_REMOTE_RA_SERVER,
                                  NULL)) {
             server = tmp->value;
 
         } else if (pcmk__str_eq(tmp->key, PCMK_REMOTE_RA_PORT,
                                 pcmk__str_none)) {
             port = atoi(tmp->value);
 
         } else if (pcmk__str_eq(tmp->key, CRM_META "_" PCMK__META_CONTAINER,
                                 pcmk__str_none)) {
             lrm_remote_set_flags(lrm_state, controlling_guest);
         }
     }
 
     rc = controld_connect_remote_executor(lrm_state, server, port,
                                           timeout_used);
     if (rc != pcmk_rc_ok) {
         pcmk__format_result(&(cmd->result), PCMK_OCF_UNKNOWN_ERROR,
                             PCMK_EXEC_ERROR,
                             "Could not connect to Pacemaker Remote node %s: %s",
                             lrm_state->node_name, pcmk_rc_str(rc));
     }
     return rc;
 }
 
 static gboolean
 handle_remote_ra_exec(gpointer user_data)
 {
     int rc = 0;
     lrm_state_t *lrm_state = user_data;
     remote_ra_data_t *ra_data = lrm_state->remote_ra_data;
     remote_ra_cmd_t *cmd;
     GList *first = NULL;
 
     if (ra_data->cur_cmd) {
         /* still waiting on previous cmd */
         return TRUE;
     }
 
     while (ra_data->cmds) {
         first = ra_data->cmds;
         cmd = first->data;
         if (cmd->delay_id) {
             /* still waiting for start delay timer to trip */
             return TRUE;
         }
 
         ra_data->cmds = g_list_remove_link(ra_data->cmds, first);
         g_list_free_1(first);
 
         if (pcmk__str_any_of(cmd->action, PCMK_ACTION_START,
                              PCMK_ACTION_MIGRATE_FROM, NULL)) {
             lrm_remote_clear_flags(lrm_state, expect_takeover | takeover_complete);
             if (handle_remote_ra_start(lrm_state, cmd,
                                        cmd->timeout) == pcmk_rc_ok) {
                 /* take care of this later when we get async connection result */
                 crm_debug("Initiated async remote connection, %s action will complete after connect event",
                           cmd->action);
                 ra_data->cur_cmd = cmd;
                 return TRUE;
             }
             report_remote_ra_result(cmd);
 
         } else if (!strcmp(cmd->action, PCMK_ACTION_MONITOR)) {
 
             if (lrm_state_is_connected(lrm_state) == TRUE) {
                 rc = lrm_state_poke_connection(lrm_state);
                 if (rc < 0) {
                     pcmk__set_result(&(cmd->result), PCMK_OCF_UNKNOWN_ERROR,
                                      PCMK_EXEC_ERROR, pcmk_strerror(rc));
                 }
             } else {
                 rc = -1;
                 pcmk__set_result(&(cmd->result), PCMK_OCF_NOT_RUNNING,
                                  PCMK_EXEC_DONE, "Remote connection inactive");
             }
 
             if (rc == 0) {
                 crm_debug("Poked Pacemaker Remote at node %s, waiting for async response",
                           cmd->rsc_id);
                 ra_data->cur_cmd = cmd;
                 cmd->monitor_timeout_id = g_timeout_add(cmd->timeout, monitor_timeout_cb, cmd);
                 return TRUE;
             }
             report_remote_ra_result(cmd);
 
         } else if (!strcmp(cmd->action, PCMK_ACTION_STOP)) {
 
             if (pcmk_is_set(ra_data->status, expect_takeover)) {
                 /* briefly wait on stop for the takeover event to occur. If the
                  * takeover event does not occur during the wait period, that's fine.
                  * It just means that the remote-node's lrm_status section is going to get
                  * cleared which will require all the resources running in the remote-node
                  * to be explicitly re-detected via probe actions.  If the takeover does occur
                  * successfully, then we can leave the status section intact. */
                 cmd->takeover_timeout_id = g_timeout_add((cmd->timeout/2), connection_takeover_timeout_cb, cmd);
                 ra_data->cur_cmd = cmd;
                 return TRUE;
             }
 
             handle_remote_ra_stop(lrm_state, cmd);
 
         } else if (strcmp(cmd->action, PCMK_ACTION_MIGRATE_TO) == 0) {
             lrm_remote_clear_flags(lrm_state, takeover_complete);
             lrm_remote_set_flags(lrm_state, expect_takeover);
             pcmk__set_result(&(cmd->result), PCMK_OCF_OK, PCMK_EXEC_DONE, NULL);
             report_remote_ra_result(cmd);
 
         } else if (pcmk__str_any_of(cmd->action, PCMK_ACTION_RELOAD,
                                     PCMK_ACTION_RELOAD_AGENT, NULL))  {
             /* Currently the only reloadable parameter is
              * PCMK_REMOTE_RA_RECONNECT_INTERVAL, which is only used by the
              * scheduler via the CIB, so reloads are a no-op.
              *
              * @COMPAT DC <2.1.0: We only need to check for "reload" in case
              * we're in a rolling upgrade with a DC scheduling "reload" instead
              * of "reload-agent". An OCF 1.1 "reload" would be a no-op anyway,
              * so this would work for that purpose as well.
              */
             pcmk__set_result(&(cmd->result), PCMK_OCF_OK, PCMK_EXEC_DONE, NULL);
             report_remote_ra_result(cmd);
         }
 
         free_cmd(cmd);
     }
 
     return TRUE;
 }
 
 static void
 remote_ra_data_init(lrm_state_t * lrm_state)
 {
     remote_ra_data_t *ra_data = NULL;
 
     if (lrm_state->remote_ra_data) {
         return;
     }
 
     ra_data = pcmk__assert_alloc(1, sizeof(remote_ra_data_t));
     ra_data->work = mainloop_add_trigger(G_PRIORITY_HIGH, handle_remote_ra_exec, lrm_state);
     lrm_state->remote_ra_data = ra_data;
 }
 
 void
 remote_ra_cleanup(lrm_state_t * lrm_state)
 {
     remote_ra_data_t *ra_data = lrm_state->remote_ra_data;
 
     if (!ra_data) {
         return;
     }
 
     if (ra_data->cmds) {
         g_list_free_full(ra_data->cmds, free_cmd);
     }
 
     if (ra_data->recurring_cmds) {
         g_list_free_full(ra_data->recurring_cmds, free_cmd);
     }
     mainloop_destroy_trigger(ra_data->work);
     free(ra_data);
     lrm_state->remote_ra_data = NULL;
 }
 
 gboolean
 is_remote_lrmd_ra(const char *agent, const char *provider, const char *id)
 {
     if (agent && provider && !strcmp(agent, REMOTE_LRMD_RA) && !strcmp(provider, "pacemaker")) {
         return TRUE;
     }
     if ((id != NULL) && (lrm_state_find(id) != NULL)
         && !pcmk__str_eq(id, controld_globals.our_nodename, pcmk__str_casei)) {
         return TRUE;
     }
 
     return FALSE;
 }
 
 lrmd_rsc_info_t *
 remote_ra_get_rsc_info(lrm_state_t * lrm_state, const char *rsc_id)
 {
     lrmd_rsc_info_t *info = NULL;
 
     if ((lrm_state_find(rsc_id))) {
         info = pcmk__assert_alloc(1, sizeof(lrmd_rsc_info_t));
 
         info->id = pcmk__str_copy(rsc_id);
         info->type = pcmk__str_copy(REMOTE_LRMD_RA);
         info->standard = pcmk__str_copy(PCMK_RESOURCE_CLASS_OCF);
         info->provider = pcmk__str_copy("pacemaker");
     }
 
     return info;
 }
 
 static gboolean
 is_remote_ra_supported_action(const char *action)
 {
     return pcmk__str_any_of(action,
                             PCMK_ACTION_START,
                             PCMK_ACTION_STOP,
                             PCMK_ACTION_MONITOR,
                             PCMK_ACTION_MIGRATE_TO,
                             PCMK_ACTION_MIGRATE_FROM,
                             PCMK_ACTION_RELOAD_AGENT,
                             PCMK_ACTION_RELOAD,
                             NULL);
 }
 
 static GList *
 fail_all_monitor_cmds(GList * list)
 {
     GList *rm_list = NULL;
     remote_ra_cmd_t *cmd = NULL;
     GList *gIter = NULL;
 
     for (gIter = list; gIter != NULL; gIter = gIter->next) {
         cmd = gIter->data;
         if ((cmd->interval_ms > 0)
             && pcmk__str_eq(cmd->action, PCMK_ACTION_MONITOR,
                             pcmk__str_casei)) {
             rm_list = g_list_append(rm_list, cmd);
         }
     }
 
     for (gIter = rm_list; gIter != NULL; gIter = gIter->next) {
         cmd = gIter->data;
 
         pcmk__set_result(&(cmd->result), PCMK_OCF_UNKNOWN_ERROR,
                          PCMK_EXEC_ERROR, "Lost connection to remote executor");
         crm_trace("Pre-emptively failing %s %s (interval=%u, %s)",
                   cmd->action, cmd->rsc_id, cmd->interval_ms, cmd->userdata);
         report_remote_ra_result(cmd);
 
         list = g_list_remove(list, cmd);
         free_cmd(cmd);
     }
 
     /* frees only the list data, not the cmds */
     g_list_free(rm_list);
     return list;
 }
 
 static GList *
 remove_cmd(GList * list, const char *action, guint interval_ms)
 {
     remote_ra_cmd_t *cmd = NULL;
     GList *gIter = NULL;
 
     for (gIter = list; gIter != NULL; gIter = gIter->next) {
         cmd = gIter->data;
         if ((cmd->interval_ms == interval_ms)
             && pcmk__str_eq(cmd->action, action, pcmk__str_casei)) {
             break;
         }
         cmd = NULL;
     }
     if (cmd) {
         list = g_list_remove(list, cmd);
         free_cmd(cmd);
     }
     return list;
 }
 
 int
 remote_ra_cancel(lrm_state_t *lrm_state, const char *rsc_id,
                  const char *action, guint interval_ms)
 {
     lrm_state_t *connection_rsc = NULL;
     remote_ra_data_t *ra_data = NULL;
 
     connection_rsc = lrm_state_find(rsc_id);
     if (!connection_rsc || !connection_rsc->remote_ra_data) {
         return -EINVAL;
     }
 
     ra_data = connection_rsc->remote_ra_data;
     ra_data->cmds = remove_cmd(ra_data->cmds, action, interval_ms);
     ra_data->recurring_cmds = remove_cmd(ra_data->recurring_cmds, action,
                                          interval_ms);
     if (ra_data->cur_cmd &&
         (ra_data->cur_cmd->interval_ms == interval_ms) &&
         (pcmk__str_eq(ra_data->cur_cmd->action, action, pcmk__str_casei))) {
 
         cmd_set_flags(ra_data->cur_cmd, cmd_cancel);
     }
 
     return 0;
 }
 
 static remote_ra_cmd_t *
 handle_dup_monitor(remote_ra_data_t *ra_data, guint interval_ms,
                    const char *userdata)
 {
     GList *gIter = NULL;
     remote_ra_cmd_t *cmd = NULL;
 
     /* there are 3 places a potential duplicate monitor operation
      * could exist.
      * 1. recurring_cmds list. where the op is waiting for its next interval
      * 2. cmds list, where the op is queued to get executed immediately
      * 3. cur_cmd, which means the monitor op is in flight right now.
      */
     if (interval_ms == 0) {
         return NULL;
     }
 
     if (ra_data->cur_cmd &&
         !pcmk_is_set(ra_data->cur_cmd->status, cmd_cancel) &&
         (ra_data->cur_cmd->interval_ms == interval_ms)
         && pcmk__str_eq(ra_data->cur_cmd->action, PCMK_ACTION_MONITOR,
                         pcmk__str_casei)) {
 
         cmd = ra_data->cur_cmd;
         goto handle_dup;
     }
 
     for (gIter = ra_data->recurring_cmds; gIter != NULL; gIter = gIter->next) {
         cmd = gIter->data;
         if ((cmd->interval_ms == interval_ms)
             && pcmk__str_eq(cmd->action, PCMK_ACTION_MONITOR,
                             pcmk__str_casei)) {
             goto handle_dup;
         }
     }
 
     for (gIter = ra_data->cmds; gIter != NULL; gIter = gIter->next) {
         cmd = gIter->data;
         if ((cmd->interval_ms == interval_ms)
             && pcmk__str_eq(cmd->action, PCMK_ACTION_MONITOR,
                             pcmk__str_casei)) {
             goto handle_dup;
         }
     }
 
     return NULL;
 
 handle_dup:
 
     crm_trace("merging duplicate monitor cmd " PCMK__OP_FMT,
               cmd->rsc_id, PCMK_ACTION_MONITOR, interval_ms);
 
     /* update the userdata */
     if (userdata) {
        free(cmd->userdata);
        cmd->userdata = pcmk__str_copy(userdata);
     }
 
     /* if we've already reported success, generate a new call id */
     if (pcmk_is_set(cmd->status, cmd_reported_success)) {
         cmd->start_time = time(NULL);
         cmd->call_id = generate_callid();
         cmd_clear_flags(cmd, cmd_reported_success);
     }
 
     /* if we have an interval_id set, that means we are in the process of
      * waiting for this cmd's next interval. instead of waiting, cancel
      * the timer and execute the action immediately */
     if (cmd->interval_id) {
         g_source_remove(cmd->interval_id);
         cmd->interval_id = 0;
         recurring_helper(cmd);
     }
 
     return cmd;
 }
 
 /*!
  * \internal
  * \brief Execute an action using the (internal) ocf:pacemaker:remote agent
  *
  * \param[in]     lrm_state      Executor state object for remote connection
  * \param[in]     rsc_id         Connection resource ID
  * \param[in]     action         Action to execute
  * \param[in]     userdata       String to copy and pass to execution callback
  * \param[in]     interval_ms    Action interval (in milliseconds)
  * \param[in]     timeout_ms     Action timeout (in milliseconds)
  * \param[in]     start_delay_ms Delay (in milliseconds) before executing action
  * \param[in,out] params         Connection resource parameters
  * \param[out]    call_id        Where to store call ID on success
  *
  * \return Standard Pacemaker return code
  * \note This takes ownership of \p params, which should not be used or freed
  *       after calling this function.
  */
 int
 controld_execute_remote_agent(const lrm_state_t *lrm_state, const char *rsc_id,
                               const char *action, const char *userdata,
                               guint interval_ms, int timeout_ms,
                               int start_delay_ms, lrmd_key_value_t *params,
                               int *call_id)
 {
     lrm_state_t *connection_rsc = NULL;
     remote_ra_cmd_t *cmd = NULL;
     remote_ra_data_t *ra_data = NULL;
 
     *call_id = 0;
 
     CRM_CHECK((lrm_state != NULL) && (rsc_id != NULL) && (action != NULL)
               && (userdata != NULL) && (call_id != NULL),
               lrmd_key_value_freeall(params); return EINVAL);
 
     if (!is_remote_ra_supported_action(action)) {
         lrmd_key_value_freeall(params);
         return EOPNOTSUPP;
     }
 
     connection_rsc = lrm_state_find(rsc_id);
     if (connection_rsc == NULL) {
         lrmd_key_value_freeall(params);
         return ENOTCONN;
     }
 
     remote_ra_data_init(connection_rsc);
     ra_data = connection_rsc->remote_ra_data;
 
     cmd = handle_dup_monitor(ra_data, interval_ms, userdata);
     if (cmd) {
         *call_id = cmd->call_id;
         lrmd_key_value_freeall(params);
         return pcmk_rc_ok;
     }
 
     cmd = pcmk__assert_alloc(1, sizeof(remote_ra_cmd_t));
 
     cmd->owner = pcmk__str_copy(lrm_state->node_name);
     cmd->rsc_id = pcmk__str_copy(rsc_id);
     cmd->action = pcmk__str_copy(action);
     cmd->userdata = pcmk__str_copy(userdata);
     cmd->interval_ms = interval_ms;
     cmd->timeout = timeout_ms;
     cmd->start_delay = start_delay_ms;
     cmd->params = params;
     cmd->start_time = time(NULL);
 
     cmd->call_id = generate_callid();
 
     if (cmd->start_delay) {
         cmd->delay_id = g_timeout_add(cmd->start_delay, start_delay_helper, cmd);
     }
 
     ra_data->cmds = g_list_append(ra_data->cmds, cmd);
     mainloop_set_trigger(ra_data->work);
 
     *call_id = cmd->call_id;
     return pcmk_rc_ok;
 }
 
 /*!
  * \internal
  * \brief Immediately fail all monitors of a remote node, if proxied here
  *
  * \param[in] node_name  Name of pacemaker_remote node
  */
 void
 remote_ra_fail(const char *node_name)
 {
     lrm_state_t *lrm_state = lrm_state_find(node_name);
 
     if (lrm_state && lrm_state_is_connected(lrm_state)) {
         remote_ra_data_t *ra_data = lrm_state->remote_ra_data;
 
         crm_info("Failing monitors on Pacemaker Remote node %s", node_name);
         ra_data->recurring_cmds = fail_all_monitor_cmds(ra_data->recurring_cmds);
         ra_data->cmds = fail_all_monitor_cmds(ra_data->cmds);
     }
 }
 
 /* A guest node fencing implied by host fencing looks like:
  *
  *  <pseudo_event id="103" operation="stonith" operation_key="stonith-lxc1-off"
  *                on_node="lxc1" on_node_uuid="lxc1">
  *     <attributes CRM_meta_on_node="lxc1" CRM_meta_on_node_uuid="lxc1"
  *                 CRM_meta_stonith_action="off" crm_feature_set="3.0.12"/>
  *     <downed>
  *       <node id="lxc1"/>
  *     </downed>
  *  </pseudo_event>
  */
 #define XPATH_PSEUDO_FENCE "/" PCMK__XE_PSEUDO_EVENT \
     "[@" PCMK_XA_OPERATION "='stonith']/" PCMK__XE_DOWNED "/" PCMK_XE_NODE
 
 /*!
  * \internal
  * \brief Check a pseudo-action for Pacemaker Remote node side effects
  *
  * \param[in,out] xml  XML of pseudo-action to check
  */
 void
 remote_ra_process_pseudo(xmlNode *xml)
 {
     xmlXPathObjectPtr search = xpath_search(xml, XPATH_PSEUDO_FENCE);
 
     if (numXpathResults(search) == 1) {
         xmlNode *result = getXpathResult(search, 0);
 
         /* Normally, we handle the necessary side effects of a guest node stop
          * action when reporting the remote agent's result. However, if the stop
          * is implied due to fencing, it will be a fencing pseudo-event, and
          * there won't be a result to report. Handle that case here.
          *
          * This will result in a duplicate call to remote_node_down() if the
          * guest stop was real instead of implied, but that shouldn't hurt.
          *
          * There is still one corner case that isn't handled: if a guest node
          * isn't running any resources when its host is fenced, it will appear
          * to be cleanly stopped, so there will be no pseudo-fence, and our
          * peer cache state will be incorrect unless and until the guest is
          * recovered.
          */
         if (result) {
             const char *remote = pcmk__xe_id(result);
 
             if (remote) {
                 remote_node_down(remote, DOWN_ERASE_LRM);
             }
         }
     }
     freeXpathObject(search);
 }
 
 static void
 remote_ra_maintenance(lrm_state_t * lrm_state, gboolean maintenance)
 {
     xmlNode *update, *state;
     int call_opt;
     crm_node_t *node;
 
     call_opt = crmd_cib_smart_opt();
-    node = crm_remote_peer_get(lrm_state->node_name);
+    node = pcmk__cluster_lookup_remote_node(lrm_state->node_name);
     CRM_CHECK(node != NULL, return);
     update = pcmk__xe_create(NULL, PCMK_XE_STATUS);
     state = create_node_state_update(node, node_update_none, update,
                                      __func__);
     crm_xml_add(state, PCMK__XA_NODE_IN_MAINTENANCE, (maintenance? "1" : "0"));
     if (controld_update_cib(PCMK_XE_STATUS, update, call_opt,
                             NULL) == pcmk_rc_ok) {
         /* TODO: still not 100% sure that async update will succeed ... */
         if (maintenance) {
             lrm_remote_set_flags(lrm_state, remote_in_maint);
         } else {
             lrm_remote_clear_flags(lrm_state, remote_in_maint);
         }
     }
     free_xml(update);
 }
 
 #define XPATH_PSEUDO_MAINTENANCE "//" PCMK__XE_PSEUDO_EVENT         \
     "[@" PCMK_XA_OPERATION "='" PCMK_ACTION_MAINTENANCE_NODES "']/" \
     PCMK__XE_MAINTENANCE
 
 /*!
  * \internal
  * \brief Check a pseudo-action holding updates for maintenance state
  *
  * \param[in,out] xml  XML of pseudo-action to check
  */
 void
 remote_ra_process_maintenance_nodes(xmlNode *xml)
 {
     xmlXPathObjectPtr search = xpath_search(xml, XPATH_PSEUDO_MAINTENANCE);
 
     if (numXpathResults(search) == 1) {
         xmlNode *node;
         int cnt = 0, cnt_remote = 0;
 
         for (node = pcmk__xe_first_child(getXpathResult(search, 0),
                                          PCMK_XE_NODE, NULL, NULL);
              node != NULL; node = pcmk__xe_next_same(node)) {
 
             lrm_state_t *lrm_state = lrm_state_find(pcmk__xe_id(node));
 
             cnt++;
             if (lrm_state && lrm_state->remote_ra_data &&
                 pcmk_is_set(((remote_ra_data_t *) lrm_state->remote_ra_data)->status, remote_active)) {
 
                 const char *in_maint_s = NULL;
                 int in_maint;
 
                 cnt_remote++;
                 in_maint_s = crm_element_value(node,
                                                PCMK__XA_NODE_IN_MAINTENANCE);
                 pcmk__scan_min_int(in_maint_s, &in_maint, 0);
                 remote_ra_maintenance(lrm_state, in_maint);
             }
         }
         crm_trace("Action holds %d nodes (%d remotes found) adjusting "
                   PCMK_OPT_MAINTENANCE_MODE,
                   cnt, cnt_remote);
     }
     freeXpathObject(search);
 }
 
 gboolean
 remote_ra_is_in_maintenance(lrm_state_t * lrm_state)
 {
     remote_ra_data_t *ra_data = lrm_state->remote_ra_data;
     return pcmk_is_set(ra_data->status, remote_in_maint);
 }
 
 gboolean
 remote_ra_controlling_guest(lrm_state_t * lrm_state)
 {
     remote_ra_data_t *ra_data = lrm_state->remote_ra_data;
     return pcmk_is_set(ra_data->status, controlling_guest);
 }
diff --git a/include/crm/cluster/internal.h b/include/crm/cluster/internal.h
index a96a063943..4761e65426 100644
--- a/include/crm/cluster/internal.h
+++ b/include/crm/cluster/internal.h
@@ -1,157 +1,158 @@
 /*
  * Copyright 2004-2024 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU Lesser General Public License
  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
  */
 
 #ifndef PCMK__CRM_CLUSTER_INTERNAL__H
 #  define PCMK__CRM_CLUSTER_INTERNAL__H
 
 #  include <stdint.h>       // uint32_t, uint64_t
 
 #  include <glib.h>         // gboolean
 
 #  include <crm/cluster.h>
 
 /* *INDENT-OFF* */
 enum crm_proc_flag {
     crm_proc_none       = 0x00000001,
 
     // Cluster layers
     crm_proc_cpg        = 0x04000000,
 
     // Daemons
     crm_proc_execd      = 0x00000010,
     crm_proc_based      = 0x00000100,
     crm_proc_controld   = 0x00000200,
     crm_proc_attrd      = 0x00001000,
     crm_proc_schedulerd = 0x00010000,
     crm_proc_fenced     = 0x00100000,
 };
 /* *INDENT-ON* */
 
 // Used with node cache search functions
 enum pcmk__node_search_flags {
     pcmk__node_search_none      = 0,
     pcmk__node_search_cluster   = (1 << 0), // Search for cluster nodes
     pcmk__node_search_remote    = (1 << 1), // Search for remote nodes
     pcmk__node_search_any       = pcmk__node_search_cluster
                                   |pcmk__node_search_remote,
 
     /* @COMPAT The values before this must stay the same until we can drop
      * support for enum crm_get_peer_flags
      */
 
     pcmk__node_search_known     = (1 << 2), // Search previously known nodes
 };
 
 /*!
  * \internal
  * \brief Return the process bit corresponding to the current cluster stack
  *
  * \return Process flag if detectable, otherwise 0
  */
 static inline uint32_t
 crm_get_cluster_proc(void)
 {
     switch (get_cluster_type()) {
         case pcmk_cluster_corosync:
             return crm_proc_cpg;
 
         default:
             break;
     }
     return crm_proc_none;
 }
 
 /*!
  * \internal
  * \brief Get log-friendly string description of a Corosync return code
  *
  * \param[in] error  Corosync return code
  *
  * \return Log-friendly string description corresponding to \p error
  */
 static inline const char *
 pcmk__cs_err_str(int error)
 {
 #  if SUPPORT_COROSYNC
     switch (error) {
         case CS_OK:                         return "OK";
         case CS_ERR_LIBRARY:                return "Library error";
         case CS_ERR_VERSION:                return "Version error";
         case CS_ERR_INIT:                   return "Initialization error";
         case CS_ERR_TIMEOUT:                return "Timeout";
         case CS_ERR_TRY_AGAIN:              return "Try again";
         case CS_ERR_INVALID_PARAM:          return "Invalid parameter";
         case CS_ERR_NO_MEMORY:              return "No memory";
         case CS_ERR_BAD_HANDLE:             return "Bad handle";
         case CS_ERR_BUSY:                   return "Busy";
         case CS_ERR_ACCESS:                 return "Access error";
         case CS_ERR_NOT_EXIST:              return "Doesn't exist";
         case CS_ERR_NAME_TOO_LONG:          return "Name too long";
         case CS_ERR_EXIST:                  return "Exists";
         case CS_ERR_NO_SPACE:               return "No space";
         case CS_ERR_INTERRUPT:              return "Interrupt";
         case CS_ERR_NAME_NOT_FOUND:         return "Name not found";
         case CS_ERR_NO_RESOURCES:           return "No resources";
         case CS_ERR_NOT_SUPPORTED:          return "Not supported";
         case CS_ERR_BAD_OPERATION:          return "Bad operation";
         case CS_ERR_FAILED_OPERATION:       return "Failed operation";
         case CS_ERR_MESSAGE_ERROR:          return "Message error";
         case CS_ERR_QUEUE_FULL:             return "Queue full";
         case CS_ERR_QUEUE_NOT_AVAILABLE:    return "Queue not available";
         case CS_ERR_BAD_FLAGS:              return "Bad flags";
         case CS_ERR_TOO_BIG:                return "Too big";
         case CS_ERR_NO_SECTIONS:            return "No sections";
     }
 #  endif
     return "Corosync error";
 }
 
 #  if SUPPORT_COROSYNC
 
 #if 0
 /* This is the new way to do it, but we still support all Corosync 2 versions,
  * and this isn't always available. A better alternative here would be to check
  * for support in the configure script and enable this conditionally.
  */
 #define pcmk__init_cmap(handle) cmap_initialize_map((handle), CMAP_MAP_ICMAP)
 #else
 #define pcmk__init_cmap(handle) cmap_initialize(handle)
 #endif
 
 char *pcmk__corosync_cluster_name(void);
 bool pcmk__corosync_add_nodes(xmlNode *xml_parent);
 #  endif
 
 crm_node_t *crm_update_peer_proc(const char *source, crm_node_t * peer,
                                  uint32_t flag, const char *status);
 crm_node_t *pcmk__update_peer_state(const char *source, crm_node_t *node,
                                     const char *state, uint64_t membership);
 
 void pcmk__update_peer_expected(const char *source, crm_node_t *node,
                                 const char *expected);
 void pcmk__reap_unseen_nodes(uint64_t ring_id);
 
 void pcmk__corosync_quorum_connect(gboolean (*dispatch)(unsigned long long,
                                                         gboolean),
                                    void (*destroy) (gpointer));
 
 unsigned int pcmk__cluster_num_remote_nodes(void);
 
+crm_node_t *pcmk__cluster_lookup_remote_node(const char *node_name);
 crm_node_t *pcmk__search_node_caches(unsigned int id, const char *uname,
                                      uint32_t flags);
 crm_node_t *pcmk__search_cluster_node_cache(unsigned int id, const char *uname,
                                             const char *uuid);
 void pcmk__purge_node_from_cache(const char *node_name, uint32_t node_id);
 
 void pcmk__refresh_node_caches_from_cib(xmlNode *cib);
 
 crm_node_t *pcmk__get_node(unsigned int id, const char *uname,
                            const char *uuid, uint32_t flags);
 
 #endif // PCMK__CRM_CLUSTER_INTERNAL__H
diff --git a/lib/cluster/membership.c b/lib/cluster/membership.c
index acfd04081e..43f4350eb3 100644
--- a/lib/cluster/membership.c
+++ b/lib/cluster/membership.c
@@ -1,1406 +1,1427 @@
 /*
  * Copyright 2004-2024 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU Lesser General Public License
  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #ifndef _GNU_SOURCE
 #  define _GNU_SOURCE
 #endif
 
 #include <sys/param.h>
 #include <sys/types.h>
 #include <stdio.h>
 #include <unistd.h>
 #include <string.h>
 #include <glib.h>
 #include <crm/common/ipc.h>
 #include <crm/common/xml_internal.h>
 #include <crm/cluster/internal.h>
 #include <crm/common/xml.h>
 #include <crm/stonith-ng.h>
 #include "crmcluster_private.h"
 
 /* The peer cache remembers cluster nodes that have been seen.
  * This is managed mostly automatically by libcluster, based on
  * cluster membership events.
  *
  * Because cluster nodes can have conflicting names or UUIDs,
  * the hash table key is a uniquely generated ID.
  */
 GHashTable *crm_peer_cache = NULL;
 
 /*
  * The remote peer cache tracks pacemaker_remote nodes. While the
  * value has the same type as the peer cache's, it is tracked separately for
  * three reasons: pacemaker_remote nodes can't have conflicting names or UUIDs,
  * so the name (which is also the UUID) is used as the hash table key; there
  * is no equivalent of membership events, so management is not automatic; and
  * most users of the peer cache need to exclude pacemaker_remote nodes.
  *
  * That said, using a single cache would be more logical and less error-prone,
  * so it would be a good idea to merge them one day.
  *
  * libcluster provides two avenues for populating the cache:
- * crm_remote_peer_get() and crm_remote_peer_cache_remove() directly manage it,
- * while refresh_remote_nodes() populates it via the CIB.
+ * pcmk__cluster_lookup_remote_node() and crm_remote_peer_cache_remove()
+ * directly manage it, while refresh_remote_nodes() populates it via the CIB.
  */
 GHashTable *crm_remote_peer_cache = NULL;
 
 /*
  * The known node cache tracks cluster and remote nodes that have been seen in
  * the CIB. It is useful mainly when a caller needs to know about a node that
  * may no longer be in the membership, but doesn't want to add the node to the
  * main peer cache tables.
  */
 static GHashTable *known_node_cache = NULL;
 
 unsigned long long crm_peer_seq = 0;
 gboolean crm_have_quorum = FALSE;
 static gboolean crm_autoreap  = TRUE;
 
 // Flag setting and clearing for crm_node_t:flags
 
 #define set_peer_flags(peer, flags_to_set) do {                               \
         (peer)->flags = pcmk__set_flags_as(__func__, __LINE__, LOG_TRACE,     \
                                            "Peer", (peer)->uname,             \
                                            (peer)->flags, (flags_to_set),     \
                                            #flags_to_set);                    \
     } while (0)
 
 #define clear_peer_flags(peer, flags_to_clear) do {                           \
         (peer)->flags = pcmk__clear_flags_as(__func__, __LINE__,              \
                                              LOG_TRACE,                       \
                                              "Peer", (peer)->uname,           \
                                              (peer)->flags, (flags_to_clear), \
                                              #flags_to_clear);                \
     } while (0)
 
 static void update_peer_uname(crm_node_t *node, const char *uname);
 static crm_node_t *find_known_node(const char *id, const char *uname);
 
 /*!
  * \internal
  * \brief Get the number of Pacemaker Remote nodes that have been seen
  *
  * \return Number of cached Pacemaker Remote nodes
  */
 unsigned int
 pcmk__cluster_num_remote_nodes(void)
 {
     if (crm_remote_peer_cache == NULL) {
         return 0U;
     }
     return g_hash_table_size(crm_remote_peer_cache);
 }
 
 /*!
- * \brief Get a remote node peer cache entry, creating it if necessary
+ * \internal
+ * \brief Get a remote node cache entry, creating it if necessary
  *
  * \param[in] node_name  Name of remote node
  *
- * \return Cache entry for node on success, NULL (and set errno) otherwise
+ * \return Cache entry for node on success, or \c NULL (and set \c errno)
+ *         otherwise
  *
- * \note When creating a new entry, this will leave the node state undetermined,
- *       so the caller should also call pcmk__update_peer_state() if the state
+ * \note When creating a new entry, this will leave the node state undetermined.
+ *       The caller should also call \c pcmk__update_peer_state() if the state
  *       is known.
  * \note Because this can add and remove cache entries, callers should not
  *       assume any previously obtained cache entry pointers remain valid.
  */
 crm_node_t *
-crm_remote_peer_get(const char *node_name)
+pcmk__cluster_lookup_remote_node(const char *node_name)
 {
     crm_node_t *node;
     char *node_name_copy = NULL;
 
     if (node_name == NULL) {
         errno = EINVAL;
         return NULL;
     }
 
     /* It's theoretically possible that the node was added to the cluster peer
      * cache before it was known to be a Pacemaker Remote node. Remove that
      * entry unless it has a node ID, which means the name actually is
      * associated with a cluster node. (@TODO return an error in that case?)
      */
     node = pcmk__search_node_caches(0, node_name, pcmk__node_search_cluster);
     if ((node != NULL) && (node->uuid == NULL)) {
         /* node_name could be a pointer into the cache entry being removed, so
          * reassign it to a copy before the original gets freed
          */
         node_name_copy = strdup(node_name);
         if (node_name_copy == NULL) {
             errno = ENOMEM;
             return NULL;
         }
         node_name = node_name_copy;
         reap_crm_member(0, node_name);
     }
 
     /* Return existing cache entry if one exists */
     node = g_hash_table_lookup(crm_remote_peer_cache, node_name);
     if (node) {
         free(node_name_copy);
         return node;
     }
 
     /* Allocate a new entry */
     node = calloc(1, sizeof(crm_node_t));
     if (node == NULL) {
         free(node_name_copy);
         return NULL;
     }
 
     /* Populate the essential information */
     set_peer_flags(node, crm_remote_node);
     node->uuid = strdup(node_name);
     if (node->uuid == NULL) {
         free(node);
         errno = ENOMEM;
         free(node_name_copy);
         return NULL;
     }
 
     /* Add the new entry to the cache */
     g_hash_table_replace(crm_remote_peer_cache, node->uuid, node);
     crm_trace("added %s to remote cache", node_name);
 
     /* Update the entry's uname, ensuring peer status callbacks are called */
     update_peer_uname(node, node_name);
     free(node_name_copy);
     return node;
 }
 
+/*!
+ * \brief Get a remote node peer cache entry, creating it if necessary
+ *
+ * \param[in] node_name  Name of remote node
+ *
+ * \return Cache entry for node on success, NULL (and set errno) otherwise
+ *
+ * \note When creating a new entry, this will leave the node state undetermined,
+ *       so the caller should also call pcmk__update_peer_state() if the state
+ *       is known.
+ * \note Because this can add and remove cache entries, callers should not
+ *       assume any previously obtained cache entry pointers remain valid.
+ */
+crm_node_t *
+crm_remote_peer_get(const char *node_name)
+{
+    return pcmk__cluster_lookup_remote_node(node_name);
+}
+
 /*!
  * \brief Remove a node from the Pacemaker Remote node cache
  *
  * \param[in] node_name  Name of node to remove from cache
  *
  * \note The caller must be careful not to use \p node_name after calling this
  *       function if it might be a pointer into the cache entry being removed.
  */
 void
 crm_remote_peer_cache_remove(const char *node_name)
 {
     /* Do a lookup first, because node_name could be a pointer within the entry
      * being removed -- we can't log it *after* removing it.
      */
     if (g_hash_table_lookup(crm_remote_peer_cache, node_name) != NULL) {
         crm_trace("Removing %s from Pacemaker Remote node cache", node_name);
         g_hash_table_remove(crm_remote_peer_cache, node_name);
     }
 }
 
 /*!
  * \internal
  * \brief Return node status based on a CIB status entry
  *
  * \param[in] node_state  XML of node state
  *
  * \return \c CRM_NODE_LOST if \c PCMK__XA_IN_CCM is false in
  *         \c PCMK__XE_NODE_STATE, \c CRM_NODE_MEMBER otherwise
  * \note Unlike most boolean XML attributes, this one defaults to true, for
  *       backward compatibility with older controllers that don't set it.
  */
 static const char *
 remote_state_from_cib(const xmlNode *node_state)
 {
     bool status = false;
 
     if ((pcmk__xe_get_bool_attr(node_state, PCMK__XA_IN_CCM,
                                 &status) == pcmk_rc_ok) && !status) {
         return CRM_NODE_LOST;
     } else {
         return CRM_NODE_MEMBER;
     }
 }
 
 /* user data for looping through remote node xpath searches */
 struct refresh_data {
     const char *field;  /* XML attribute to check for node name */
     gboolean has_state; /* whether to update node state based on XML */
 };
 
 /*!
  * \internal
  * \brief Process one pacemaker_remote node xpath search result
  *
  * \param[in] result     XML search result
  * \param[in] user_data  what to look for in the XML
  */
 static void
 remote_cache_refresh_helper(xmlNode *result, void *user_data)
 {
     const struct refresh_data *data = user_data;
     const char *remote = crm_element_value(result, data->field);
     const char *state = NULL;
     crm_node_t *node;
 
     CRM_CHECK(remote != NULL, return);
 
     /* Determine node's state, if the result has it */
     if (data->has_state) {
         state = remote_state_from_cib(result);
     }
 
     /* Check whether cache already has entry for node */
     node = g_hash_table_lookup(crm_remote_peer_cache, remote);
 
     if (node == NULL) {
         /* Node is not in cache, so add a new entry for it */
-        node = crm_remote_peer_get(remote);
+        node = pcmk__cluster_lookup_remote_node(remote);
         CRM_ASSERT(node);
         if (state) {
             pcmk__update_peer_state(__func__, node, state, 0);
         }
 
     } else if (pcmk_is_set(node->flags, crm_node_dirty)) {
         /* Node is in cache and hasn't been updated already, so mark it clean */
         clear_peer_flags(node, crm_node_dirty);
         if (state) {
             pcmk__update_peer_state(__func__, node, state, 0);
         }
     }
 }
 
 static void
 mark_dirty(gpointer key, gpointer value, gpointer user_data)
 {
     set_peer_flags((crm_node_t *) value, crm_node_dirty);
 }
 
 static gboolean
 is_dirty(gpointer key, gpointer value, gpointer user_data)
 {
     return pcmk_is_set(((crm_node_t*)value)->flags, crm_node_dirty);
 }
 
 /*!
  * \internal
  * \brief Repopulate the remote node cache based on CIB XML
  *
  * \param[in] cib  CIB XML to parse
  */
 static void
 refresh_remote_nodes(xmlNode *cib)
 {
     struct refresh_data data;
 
     crm_peer_init();
 
     /* First, we mark all existing cache entries as dirty,
      * so that later we can remove any that weren't in the CIB.
      * We don't empty the cache, because we need to detect changes in state.
      */
     g_hash_table_foreach(crm_remote_peer_cache, mark_dirty, NULL);
 
     /* Look for guest nodes and remote nodes in the status section */
     data.field = PCMK_XA_ID;
     data.has_state = TRUE;
     crm_foreach_xpath_result(cib, PCMK__XP_REMOTE_NODE_STATUS,
                              remote_cache_refresh_helper, &data);
 
     /* Look for guest nodes and remote nodes in the configuration section,
      * because they may have just been added and not have a status entry yet.
      * In that case, the cached node state will be left NULL, so that the
      * peer status callback isn't called until we're sure the node started
      * successfully.
      */
     data.field = PCMK_XA_VALUE;
     data.has_state = FALSE;
     crm_foreach_xpath_result(cib, PCMK__XP_GUEST_NODE_CONFIG,
                              remote_cache_refresh_helper, &data);
     data.field = PCMK_XA_ID;
     data.has_state = FALSE;
     crm_foreach_xpath_result(cib, PCMK__XP_REMOTE_NODE_CONFIG,
                              remote_cache_refresh_helper, &data);
 
     /* Remove all old cache entries that weren't seen in the CIB */
     g_hash_table_foreach_remove(crm_remote_peer_cache, is_dirty, NULL);
 }
 
 gboolean
 crm_is_peer_active(const crm_node_t * node)
 {
     if(node == NULL) {
         return FALSE;
     }
 
     if (pcmk_is_set(node->flags, crm_remote_node)) {
         /* remote nodes are never considered active members. This
          * guarantees they will never be considered for DC membership.*/
         return FALSE;
     }
 #if SUPPORT_COROSYNC
     if (is_corosync_cluster()) {
         return crm_is_corosync_peer_active(node);
     }
 #endif
     crm_err("Unhandled cluster type: %s", name_for_cluster_type(get_cluster_type()));
     return FALSE;
 }
 
 static gboolean
 crm_reap_dead_member(gpointer key, gpointer value, gpointer user_data)
 {
     crm_node_t *node = value;
     crm_node_t *search = user_data;
 
     if (search == NULL) {
         return FALSE;
 
     } else if (search->id && node->id != search->id) {
         return FALSE;
 
     } else if (search->id == 0 && !pcmk__str_eq(node->uname, search->uname, pcmk__str_casei)) {
         return FALSE;
 
     } else if (crm_is_peer_active(value) == FALSE) {
         crm_info("Removing node with name %s and " PCMK_XA_ID
                  " %u from membership cache",
                  (node->uname? node->uname : "unknown"), node->id);
         return TRUE;
     }
     return FALSE;
 }
 
 /*!
  * \brief Remove all peer cache entries matching a node ID and/or uname
  *
  * \param[in] id    ID of node to remove (or 0 to ignore)
  * \param[in] name  Uname of node to remove (or NULL to ignore)
  *
  * \return Number of cache entries removed
  *
  * \note The caller must be careful not to use \p name after calling this
  *       function if it might be a pointer into the cache entry being removed.
  */
 guint
 reap_crm_member(uint32_t id, const char *name)
 {
     int matches = 0;
     crm_node_t search = { 0, };
 
     if (crm_peer_cache == NULL) {
         crm_trace("Membership cache not initialized, ignoring purge request");
         return 0;
     }
 
     search.id = id;
     search.uname = pcmk__str_copy(name);
     matches = g_hash_table_foreach_remove(crm_peer_cache, crm_reap_dead_member, &search);
     if(matches) {
         crm_notice("Purged %d peer%s with " PCMK_XA_ID
                    "=%u%s%s from the membership cache",
                    matches, pcmk__plural_s(matches), search.id,
                    (search.uname? " and/or uname=" : ""),
                    (search.uname? search.uname : ""));
 
     } else {
         crm_info("No peers with " PCMK_XA_ID
                  "=%u%s%s to purge from the membership cache",
                  search.id, (search.uname? " and/or uname=" : ""),
                  (search.uname? search.uname : ""));
     }
 
     free(search.uname);
     return matches;
 }
 
 static void
 count_peer(gpointer key, gpointer value, gpointer user_data)
 {
     guint *count = user_data;
     crm_node_t *node = value;
 
     if (crm_is_peer_active(node)) {
         *count = *count + 1;
     }
 }
 
 guint
 crm_active_peers(void)
 {
     guint count = 0;
 
     if (crm_peer_cache) {
         g_hash_table_foreach(crm_peer_cache, count_peer, &count);
     }
     return count;
 }
 
 static void
 destroy_crm_node(gpointer data)
 {
     crm_node_t *node = data;
 
     crm_trace("Destroying entry for node %u: %s", node->id, node->uname);
 
     free(node->uname);
     free(node->state);
     free(node->uuid);
     free(node->expected);
     free(node->conn_host);
     free(node);
 }
 
 void
 crm_peer_init(void)
 {
     if (crm_peer_cache == NULL) {
         crm_peer_cache = pcmk__strikey_table(free, destroy_crm_node);
     }
 
     if (crm_remote_peer_cache == NULL) {
         crm_remote_peer_cache = pcmk__strikey_table(NULL, destroy_crm_node);
     }
 
     if (known_node_cache == NULL) {
         known_node_cache = pcmk__strikey_table(free, destroy_crm_node);
     }
 }
 
 void
 crm_peer_destroy(void)
 {
     if (crm_peer_cache != NULL) {
         crm_trace("Destroying peer cache with %d members", g_hash_table_size(crm_peer_cache));
         g_hash_table_destroy(crm_peer_cache);
         crm_peer_cache = NULL;
     }
 
     if (crm_remote_peer_cache != NULL) {
         crm_trace("Destroying remote peer cache with %d members",
                   pcmk__cluster_num_remote_nodes());
         g_hash_table_destroy(crm_remote_peer_cache);
         crm_remote_peer_cache = NULL;
     }
 
     if (known_node_cache != NULL) {
         crm_trace("Destroying known node cache with %d members",
                   g_hash_table_size(known_node_cache));
         g_hash_table_destroy(known_node_cache);
         known_node_cache = NULL;
     }
 
 }
 
 static void (*peer_status_callback)(enum crm_status_type, crm_node_t *,
                                     const void *) = NULL;
 
 /*!
  * \brief Set a client function that will be called after peer status changes
  *
  * \param[in] dispatch  Pointer to function to use as callback
  *
  * \note Previously, client callbacks were responsible for peer cache
  *       management. This is no longer the case, and client callbacks should do
  *       only client-specific handling. Callbacks MUST NOT add or remove entries
  *       in the peer caches.
  */
 void
 crm_set_status_callback(void (*dispatch) (enum crm_status_type, crm_node_t *, const void *))
 {
     peer_status_callback = dispatch;
 }
 
 /*!
  * \brief Tell the library whether to automatically reap lost nodes
  *
  * If TRUE (the default), calling crm_update_peer_proc() will also update the
  * peer state to CRM_NODE_MEMBER or CRM_NODE_LOST, and pcmk__update_peer_state()
  * will reap peers whose state changes to anything other than CRM_NODE_MEMBER.
  * Callers should leave this enabled unless they plan to manage the cache
  * separately on their own.
  *
  * \param[in] autoreap  TRUE to enable automatic reaping, FALSE to disable
  */
 void
 crm_set_autoreap(gboolean autoreap)
 {
     crm_autoreap = autoreap;
 }
 
 static void
 dump_peer_hash(int level, const char *caller)
 {
     GHashTableIter iter;
     const char *id = NULL;
     crm_node_t *node = NULL;
 
     g_hash_table_iter_init(&iter, crm_peer_cache);
     while (g_hash_table_iter_next(&iter, (gpointer *) &id, (gpointer *) &node)) {
         do_crm_log(level, "%s: Node %u/%s = %p - %s", caller, node->id, node->uname, node, id);
     }
 }
 
 static gboolean
 hash_find_by_data(gpointer key, gpointer value, gpointer user_data)
 {
     return value == user_data;
 }
 
 /*!
  * \internal
  * \brief Search caches for a node (cluster or Pacemaker Remote)
  *
  * \param[in] id     If not 0, cluster node ID to search for
  * \param[in] uname  If not NULL, node name to search for
  * \param[in] flags  Group of enum pcmk__node_search_flags
  *
  * \return Node cache entry if found, otherwise NULL
  */
 crm_node_t *
 pcmk__search_node_caches(unsigned int id, const char *uname, uint32_t flags)
 {
     crm_node_t *node = NULL;
 
     CRM_ASSERT(id > 0 || uname != NULL);
 
     crm_peer_init();
 
     if ((uname != NULL) && pcmk_is_set(flags, pcmk__node_search_remote)) {
         node = g_hash_table_lookup(crm_remote_peer_cache, uname);
     }
 
     if ((node == NULL) && pcmk_is_set(flags, pcmk__node_search_cluster)) {
         node = pcmk__search_cluster_node_cache(id, uname, NULL);
     }
 
     if ((node == NULL) && pcmk_is_set(flags, pcmk__node_search_known)) {
         char *id_str = (id == 0)? NULL : crm_strdup_printf("%u", id);
 
         node = find_known_node(id_str, uname);
         free(id_str);
     }
 
     return node;
 }
 
 /*!
  * \internal
  * \brief Purge a node from cache (both cluster and Pacemaker Remote)
  *
  * \param[in] node_name  If not NULL, purge only nodes with this name
  * \param[in] node_id    If not 0, purge cluster nodes only if they have this ID
  *
  * \note If \p node_name is NULL and \p node_id is 0, no nodes will be purged.
  *       If \p node_name is not NULL and \p node_id is not 0, Pacemaker Remote
  *       nodes that match \p node_name will be purged, and cluster nodes that
  *       match both \p node_name and \p node_id will be purged.
  * \note The caller must be careful not to use \p node_name after calling this
  *       function if it might be a pointer into a cache entry being removed.
  */
 void
 pcmk__purge_node_from_cache(const char *node_name, uint32_t node_id)
 {
     char *node_name_copy = NULL;
 
     if ((node_name == NULL) && (node_id == 0U)) {
         return;
     }
 
     // Purge from Pacemaker Remote node cache
     if ((node_name != NULL)
         && (g_hash_table_lookup(crm_remote_peer_cache, node_name) != NULL)) {
         /* node_name could be a pointer into the cache entry being purged,
          * so reassign it to a copy before the original gets freed
          */
         node_name_copy = pcmk__str_copy(node_name);
         node_name = node_name_copy;
 
         crm_trace("Purging %s from Pacemaker Remote node cache", node_name);
         g_hash_table_remove(crm_remote_peer_cache, node_name);
     }
 
     reap_crm_member(node_id, node_name);
     free(node_name_copy);
 }
 
 /*!
  * \internal
  * \brief Search cluster node cache
  *
  * \param[in] id     If not 0, cluster node ID to search for
  * \param[in] uname  If not NULL, node name to search for
  * \param[in] uuid   If not NULL while id is 0, node UUID instead of cluster
  *                   node ID to search for
  *
  * \return Cluster node cache entry if found, otherwise NULL
  */
 crm_node_t *
 pcmk__search_cluster_node_cache(unsigned int id, const char *uname,
                                 const char *uuid)
 {
     GHashTableIter iter;
     crm_node_t *node = NULL;
     crm_node_t *by_id = NULL;
     crm_node_t *by_name = NULL;
 
     CRM_ASSERT(id > 0 || uname != NULL);
 
     crm_peer_init();
 
     if (uname != NULL) {
         g_hash_table_iter_init(&iter, crm_peer_cache);
         while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
             if(node->uname && strcasecmp(node->uname, uname) == 0) {
                 crm_trace("Name match: %s = %p", node->uname, node);
                 by_name = node;
                 break;
             }
         }
     }
 
     if (id > 0) {
         g_hash_table_iter_init(&iter, crm_peer_cache);
         while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
             if(node->id == id) {
                 crm_trace("ID match: %u = %p", node->id, node);
                 by_id = node;
                 break;
             }
         }
 
     } else if (uuid != NULL) {
         g_hash_table_iter_init(&iter, crm_peer_cache);
         while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
             if (pcmk__str_eq(node->uuid, uuid, pcmk__str_casei)) {
                 crm_trace("UUID match: %s = %p", node->uuid, node);
                 by_id = node;
                 break;
             }
         }
     }
 
     node = by_id; /* Good default */
     if(by_id == by_name) {
         /* Nothing to do if they match (both NULL counts) */
         crm_trace("Consistent: %p for %u/%s", by_id, id, uname);
 
     } else if(by_id == NULL && by_name) {
         crm_trace("Only one: %p for %u/%s", by_name, id, uname);
 
         if(id && by_name->id) {
             dump_peer_hash(LOG_WARNING, __func__);
             crm_crit("Node %u and %u share the same name '%s'",
                      id, by_name->id, uname);
             node = NULL; /* Create a new one */
 
         } else {
             node = by_name;
         }
 
     } else if(by_name == NULL && by_id) {
         crm_trace("Only one: %p for %u/%s", by_id, id, uname);
 
         if(uname && by_id->uname) {
             dump_peer_hash(LOG_WARNING, __func__);
             crm_crit("Node '%s' and '%s' share the same cluster nodeid %u: assuming '%s' is correct",
                      uname, by_id->uname, id, uname);
         }
 
     } else if(uname && by_id->uname) {
         if(pcmk__str_eq(uname, by_id->uname, pcmk__str_casei)) {
             crm_notice("Node '%s' has changed its ID from %u to %u", by_id->uname, by_name->id, by_id->id);
             g_hash_table_foreach_remove(crm_peer_cache, hash_find_by_data, by_name);
 
         } else {
             crm_warn("Node '%s' and '%s' share the same cluster nodeid: %u %s", by_id->uname, by_name->uname, id, uname);
             dump_peer_hash(LOG_INFO, __func__);
             crm_abort(__FILE__, __func__, __LINE__, "member weirdness", TRUE,
                       TRUE);
         }
 
     } else if(id && by_name->id) {
         crm_warn("Node %u and %u share the same name: '%s'", by_id->id, by_name->id, uname);
 
     } else {
         /* Simple merge */
 
         /* Only corosync-based clusters use node IDs. The functions that call
          * pcmk__update_peer_state() and crm_update_peer_proc() only know
          * nodeid, so 'by_id' is authoritative when merging.
          */
         dump_peer_hash(LOG_DEBUG, __func__);
 
         crm_info("Merging %p into %p", by_name, by_id);
         g_hash_table_foreach_remove(crm_peer_cache, hash_find_by_data, by_name);
     }
 
     return node;
 }
 
 #if SUPPORT_COROSYNC
 static guint
 remove_conflicting_peer(crm_node_t *node)
 {
     int matches = 0;
     GHashTableIter iter;
     crm_node_t *existing_node = NULL;
 
     if (node->id == 0 || node->uname == NULL) {
         return 0;
     }
 
     if (!pcmk__corosync_has_nodelist()) {
         return 0;
     }
 
     g_hash_table_iter_init(&iter, crm_peer_cache);
     while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &existing_node)) {
         if (existing_node->id > 0
             && existing_node->id != node->id
             && existing_node->uname != NULL
             && strcasecmp(existing_node->uname, node->uname) == 0) {
 
             if (crm_is_peer_active(existing_node)) {
                 continue;
             }
 
             crm_warn("Removing cached offline node %u/%s which has conflicting uname with %u",
                      existing_node->id, existing_node->uname, node->id);
 
             g_hash_table_iter_remove(&iter);
             matches++;
         }
     }
 
     return matches;
 }
 #endif
 
 /*!
  * \brief Get a cluster node cache entry
  *
  * \param[in] id     If not 0, cluster node ID to search for
  * \param[in] uname  If not NULL, node name to search for
  * \param[in] uuid   If not NULL while id is 0, node UUID instead of cluster
  *                   node ID to search for
  * \param[in] flags  Group of enum pcmk__node_search_flags
  *
  * \return (Possibly newly created) cluster node cache entry
  */
 /* coverity[-alloc] Memory is referenced in one or both hashtables */
 crm_node_t *
 pcmk__get_node(unsigned int id, const char *uname, const char *uuid,
                uint32_t flags)
 {
     crm_node_t *node = NULL;
     char *uname_lookup = NULL;
 
     CRM_ASSERT(id > 0 || uname != NULL);
 
     crm_peer_init();
 
     // Check the Pacemaker Remote node cache first
     if (pcmk_is_set(flags, pcmk__node_search_remote)) {
         node = g_hash_table_lookup(crm_remote_peer_cache, uname);
         if (node != NULL) {
             return node;
         }
     }
 
     if (!pcmk_is_set(flags, pcmk__node_search_cluster)) {
         return NULL;
     }
 
     node = pcmk__search_cluster_node_cache(id, uname, uuid);
 
     /* if uname wasn't provided, and find_peer did not turn up a uname based on id.
      * we need to do a lookup of the node name using the id in the cluster membership. */
     if ((node == NULL || node->uname == NULL) && (uname == NULL)) { 
         uname_lookup = get_node_name(id);
     }
 
     if (uname_lookup) {
         uname = uname_lookup;
         crm_trace("Inferred a name of '%s' for node %u", uname, id);
 
         /* try to turn up the node one more time now that we know the uname. */
         if (node == NULL) {
             node = pcmk__search_cluster_node_cache(id, uname, uuid);
         }
     }
 
     if (node == NULL) {
         char *uniqueid = crm_generate_uuid();
 
         node = pcmk__assert_alloc(1, sizeof(crm_node_t));
 
         crm_info("Created entry %s/%p for node %s/%u (%d total)",
                  uniqueid, node, uname, id, 1 + g_hash_table_size(crm_peer_cache));
         g_hash_table_replace(crm_peer_cache, uniqueid, node);
     }
 
     if(id > 0 && uname && (node->id == 0 || node->uname == NULL)) {
         crm_info("Node %u is now known as %s", id, uname);
     }
 
     if(id > 0 && node->id == 0) {
         node->id = id;
     }
 
     if (uname && (node->uname == NULL)) {
         update_peer_uname(node, uname);
     }
 
     if(node->uuid == NULL) {
         if (uuid == NULL) {
             uuid = crm_peer_uuid(node);
         }
 
         if (uuid) {
             crm_info("Node %u has uuid %s", id, uuid);
 
         } else {
             crm_info("Cannot obtain a UUID for node %u/%s", id, node->uname);
         }
     }
 
     free(uname_lookup);
 
     return node;
 }
 
 /*!
  * \internal
  * \brief Update a node's uname
  *
  * \param[in,out] node   Node object to update
  * \param[in]     uname  New name to set
  *
  * \note This function should not be called within a peer cache iteration,
  *       because in some cases it can remove conflicting cache entries,
  *       which would invalidate the iterator.
  */
 static void
 update_peer_uname(crm_node_t *node, const char *uname)
 {
     CRM_CHECK(uname != NULL,
               crm_err("Bug: can't update node name without name"); return);
     CRM_CHECK(node != NULL,
               crm_err("Bug: can't update node name to %s without node", uname);
               return);
 
     if (pcmk__str_eq(uname, node->uname, pcmk__str_casei)) {
         crm_debug("Node uname '%s' did not change", uname);
         return;
     }
 
     for (const char *c = uname; *c; ++c) {
         if ((*c >= 'A') && (*c <= 'Z')) {
             crm_warn("Node names with capitals are discouraged, consider changing '%s'",
                      uname);
             break;
         }
     }
 
     pcmk__str_update(&node->uname, uname);
 
     if (peer_status_callback != NULL) {
         peer_status_callback(crm_status_uname, node, NULL);
     }
 
 #if SUPPORT_COROSYNC
     if (is_corosync_cluster() && !pcmk_is_set(node->flags, crm_remote_node)) {
         remove_conflicting_peer(node);
     }
 #endif
 }
 
 /*!
  * \internal
  * \brief Get log-friendly string equivalent of a process flag
  *
  * \param[in] proc  Process flag
  *
  * \return Log-friendly string equivalent of \p proc
  */
 static inline const char *
 proc2text(enum crm_proc_flag proc)
 {
     const char *text = "unknown";
 
     switch (proc) {
         case crm_proc_none:
             text = "none";
             break;
         case crm_proc_based:
             text = "pacemaker-based";
             break;
         case crm_proc_controld:
             text = "pacemaker-controld";
             break;
         case crm_proc_schedulerd:
             text = "pacemaker-schedulerd";
             break;
         case crm_proc_execd:
             text = "pacemaker-execd";
             break;
         case crm_proc_attrd:
             text = "pacemaker-attrd";
             break;
         case crm_proc_fenced:
             text = "pacemaker-fenced";
             break;
         case crm_proc_cpg:
             text = "corosync-cpg";
             break;
     }
     return text;
 }
 
 /*!
  * \internal
  * \brief Update a node's process information (and potentially state)
  *
  * \param[in]     source  Caller's function name (for log messages)
  * \param[in,out] node    Node object to update
  * \param[in]     flag    Bitmask of new process information
  * \param[in]     status  node status (online, offline, etc.)
  *
  * \return NULL if any node was reaped from peer caches, value of node otherwise
  *
  * \note If this function returns NULL, the supplied node object was likely
  *       freed and should not be used again. This function should not be
  *       called within a cache iteration if reaping is possible, otherwise
  *       reaping could invalidate the iterator.
  */
 crm_node_t *
 crm_update_peer_proc(const char *source, crm_node_t * node, uint32_t flag, const char *status)
 {
     uint32_t last = 0;
     gboolean changed = FALSE;
 
     CRM_CHECK(node != NULL, crm_err("%s: Could not set %s to %s for NULL",
                                     source, proc2text(flag), status);
                             return NULL);
 
     /* Pacemaker doesn't spawn processes on remote nodes */
     if (pcmk_is_set(node->flags, crm_remote_node)) {
         return node;
     }
 
     last = node->processes;
     if (status == NULL) {
         node->processes = flag;
         if (node->processes != last) {
             changed = TRUE;
         }
 
     } else if (pcmk__str_eq(status, PCMK_VALUE_ONLINE, pcmk__str_casei)) {
         if ((node->processes & flag) != flag) {
             node->processes = pcmk__set_flags_as(__func__, __LINE__,
                                                  LOG_TRACE, "Peer process",
                                                  node->uname, node->processes,
                                                  flag, "processes");
             changed = TRUE;
         }
 
     } else if (node->processes & flag) {
         node->processes = pcmk__clear_flags_as(__func__, __LINE__,
                                                LOG_TRACE, "Peer process",
                                                node->uname, node->processes,
                                                flag, "processes");
         changed = TRUE;
     }
 
     if (changed) {
         if (status == NULL && flag <= crm_proc_none) {
             crm_info("%s: Node %s[%u] - all processes are now offline", source, node->uname,
                      node->id);
         } else {
             crm_info("%s: Node %s[%u] - %s is now %s", source, node->uname, node->id,
                      proc2text(flag), status);
         }
 
         if (pcmk_is_set(node->processes, crm_get_cluster_proc())) {
             node->when_online = time(NULL);
 
         } else {
             node->when_online = 0;
         }
 
         /* Call the client callback first, then update the peer state,
          * in case the node will be reaped
          */
         if (peer_status_callback != NULL) {
             peer_status_callback(crm_status_processes, node, &last);
         }
 
         /* The client callback shouldn't touch the peer caches,
          * but as a safety net, bail if the peer cache was destroyed.
          */
         if (crm_peer_cache == NULL) {
             return NULL;
         }
 
         if (crm_autoreap) {
             const char *peer_state = NULL;
 
             if (pcmk_is_set(node->processes, crm_get_cluster_proc())) {
                 peer_state = CRM_NODE_MEMBER;
             } else {
                 peer_state = CRM_NODE_LOST;
             }
             node = pcmk__update_peer_state(__func__, node, peer_state, 0);
         }
     } else {
         crm_trace("%s: Node %s[%u] - %s is unchanged (%s)", source, node->uname, node->id,
                   proc2text(flag), status);
     }
     return node;
 }
 
 /*!
  * \internal
  * \brief Update a cluster node cache entry's expected join state
  *
  * \param[in]     source    Caller's function name (for logging)
  * \param[in,out] node      Node to update
  * \param[in]     expected  Node's new join state
  */
 void
 pcmk__update_peer_expected(const char *source, crm_node_t *node,
                            const char *expected)
 {
     char *last = NULL;
     gboolean changed = FALSE;
 
     CRM_CHECK(node != NULL, crm_err("%s: Could not set 'expected' to %s", source, expected);
               return);
 
     /* Remote nodes don't participate in joins */
     if (pcmk_is_set(node->flags, crm_remote_node)) {
         return;
     }
 
     last = node->expected;
     if (expected != NULL && !pcmk__str_eq(node->expected, expected, pcmk__str_casei)) {
         node->expected = strdup(expected);
         changed = TRUE;
     }
 
     if (changed) {
         crm_info("%s: Node %s[%u] - expected state is now %s (was %s)", source, node->uname, node->id,
                  expected, last);
         free(last);
     } else {
         crm_trace("%s: Node %s[%u] - expected state is unchanged (%s)", source, node->uname,
                   node->id, expected);
     }
 }
 
 /*!
  * \internal
  * \brief Update a node's state and membership information
  *
  * \param[in]     source      Caller's function name (for log messages)
  * \param[in,out] node        Node object to update
  * \param[in]     state       Node's new state
  * \param[in]     membership  Node's new membership ID
  * \param[in,out] iter        If not NULL, pointer to node's peer cache iterator
  *
  * \return NULL if any node was reaped, value of node otherwise
  *
  * \note If this function returns NULL, the supplied node object was likely
  *       freed and should not be used again. This function may be called from
  *       within a peer cache iteration if the iterator is supplied.
  */
 static crm_node_t *
 update_peer_state_iter(const char *source, crm_node_t *node, const char *state,
                        uint64_t membership, GHashTableIter *iter)
 {
     gboolean is_member;
 
     CRM_CHECK(node != NULL,
               crm_err("Could not set state for unknown host to %s"
                       CRM_XS " source=%s", state, source);
               return NULL);
 
     is_member = pcmk__str_eq(state, CRM_NODE_MEMBER, pcmk__str_casei);
     if (is_member) {
         node->when_lost = 0;
         if (membership) {
             node->last_seen = membership;
         }
     }
 
     if (state && !pcmk__str_eq(node->state, state, pcmk__str_casei)) {
         char *last = node->state;
 
         if (is_member) {
              node->when_member = time(NULL);
 
         } else {
              node->when_member = 0;
         }
 
         node->state = strdup(state);
         crm_notice("Node %s state is now %s " CRM_XS
                    " nodeid=%u previous=%s source=%s", node->uname, state,
                    node->id, (last? last : "unknown"), source);
         if (peer_status_callback != NULL) {
             peer_status_callback(crm_status_nstate, node, last);
         }
         free(last);
 
         if (crm_autoreap && !is_member
             && !pcmk_is_set(node->flags, crm_remote_node)) {
             /* We only autoreap from the peer cache, not the remote peer cache,
              * because the latter should be managed only by
              * refresh_remote_nodes().
              */
             if(iter) {
                 crm_notice("Purged 1 peer with " PCMK_XA_ID
                            "=%u and/or uname=%s from the membership cache",
                            node->id, node->uname);
                 g_hash_table_iter_remove(iter);
 
             } else {
                 reap_crm_member(node->id, node->uname);
             }
             node = NULL;
         }
 
     } else {
         crm_trace("Node %s state is unchanged (%s) " CRM_XS
                   " nodeid=%u source=%s", node->uname, state, node->id, source);
     }
     return node;
 }
 
 /*!
  * \brief Update a node's state and membership information
  *
  * \param[in]     source      Caller's function name (for log messages)
  * \param[in,out] node        Node object to update
  * \param[in]     state       Node's new state
  * \param[in]     membership  Node's new membership ID
  *
  * \return NULL if any node was reaped, value of node otherwise
  *
  * \note If this function returns NULL, the supplied node object was likely
  *       freed and should not be used again. This function should not be
  *       called within a cache iteration if reaping is possible,
  *       otherwise reaping could invalidate the iterator.
  */
 crm_node_t *
 pcmk__update_peer_state(const char *source, crm_node_t *node,
                         const char *state, uint64_t membership)
 {
     return update_peer_state_iter(source, node, state, membership, NULL);
 }
 
 /*!
  * \internal
  * \brief Reap all nodes from cache whose membership information does not match
  *
  * \param[in] membership  Membership ID of nodes to keep
  */
 void
 pcmk__reap_unseen_nodes(uint64_t membership)
 {
     GHashTableIter iter;
     crm_node_t *node = NULL;
 
     crm_trace("Reaping unseen nodes...");
     g_hash_table_iter_init(&iter, crm_peer_cache);
     while (g_hash_table_iter_next(&iter, NULL, (gpointer *)&node)) {
         if (node->last_seen != membership) {
             if (node->state) {
                 /*
                  * Calling update_peer_state_iter() allows us to
                  * remove the node from crm_peer_cache without
                  * invalidating our iterator
                  */
                 update_peer_state_iter(__func__, node, CRM_NODE_LOST,
                                            membership, &iter);
 
             } else {
                 crm_info("State of node %s[%u] is still unknown",
                          node->uname, node->id);
             }
         }
     }
 }
 
 static crm_node_t *
 find_known_node(const char *id, const char *uname)
 {
     GHashTableIter iter;
     crm_node_t *node = NULL;
     crm_node_t *by_id = NULL;
     crm_node_t *by_name = NULL;
 
     if (uname) {
         g_hash_table_iter_init(&iter, known_node_cache);
         while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
             if (node->uname && strcasecmp(node->uname, uname) == 0) {
                 crm_trace("Name match: %s = %p", node->uname, node);
                 by_name = node;
                 break;
             }
         }
     }
 
     if (id) {
         g_hash_table_iter_init(&iter, known_node_cache);
         while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) {
             if(strcasecmp(node->uuid, id) == 0) {
                 crm_trace("ID match: %s= %p", id, node);
                 by_id = node;
                 break;
             }
         }
     }
 
     node = by_id; /* Good default */
     if (by_id == by_name) {
         /* Nothing to do if they match (both NULL counts) */
         crm_trace("Consistent: %p for %s/%s", by_id, id, uname);
 
     } else if (by_id == NULL && by_name) {
         crm_trace("Only one: %p for %s/%s", by_name, id, uname);
 
         if (id) {
             node = NULL;
 
         } else {
             node = by_name;
         }
 
     } else if (by_name == NULL && by_id) {
         crm_trace("Only one: %p for %s/%s", by_id, id, uname);
 
         if (uname) {
             node = NULL;
         }
 
     } else if (uname && by_id->uname
                && pcmk__str_eq(uname, by_id->uname, pcmk__str_casei)) {
         /* Multiple nodes have the same uname in the CIB.
          * Return by_id. */
 
     } else if (id && by_name->uuid
                && pcmk__str_eq(id, by_name->uuid, pcmk__str_casei)) {
         /* Multiple nodes have the same id in the CIB.
          * Return by_name. */
         node = by_name;
 
     } else {
         node = NULL;
     }
 
     if (node == NULL) {
         crm_debug("Couldn't find node%s%s%s%s",
                    id? " " : "",
                    id? id : "",
                    uname? " with name " : "",
                    uname? uname : "");
     }
 
     return node;
 }
 
 static void
 known_node_cache_refresh_helper(xmlNode *xml_node, void *user_data)
 {
     const char *id = crm_element_value(xml_node, PCMK_XA_ID);
     const char *uname = crm_element_value(xml_node, PCMK_XA_UNAME);
     crm_node_t * node =  NULL;
 
     CRM_CHECK(id != NULL && uname !=NULL, return);
     node = find_known_node(id, uname);
 
     if (node == NULL) {
         char *uniqueid = crm_generate_uuid();
 
         node = pcmk__assert_alloc(1, sizeof(crm_node_t));
 
         node->uname = pcmk__str_copy(uname);
         node->uuid = pcmk__str_copy(id);
 
         g_hash_table_replace(known_node_cache, uniqueid, node);
 
     } else if (pcmk_is_set(node->flags, crm_node_dirty)) {
         pcmk__str_update(&node->uname, uname);
 
         /* Node is in cache and hasn't been updated already, so mark it clean */
         clear_peer_flags(node, crm_node_dirty);
     }
 
 }
 
 static void
 refresh_known_node_cache(xmlNode *cib)
 {
     crm_peer_init();
 
     g_hash_table_foreach(known_node_cache, mark_dirty, NULL);
 
     crm_foreach_xpath_result(cib, PCMK__XP_MEMBER_NODE_CONFIG,
                              known_node_cache_refresh_helper, NULL);
 
     /* Remove all old cache entries that weren't seen in the CIB */
     g_hash_table_foreach_remove(known_node_cache, is_dirty, NULL);
 }
 
 void
 pcmk__refresh_node_caches_from_cib(xmlNode *cib)
 {
     refresh_remote_nodes(cib);
     refresh_known_node_cache(cib);
 }
 
 // Deprecated functions kept only for backward API compatibility
 // LCOV_EXCL_START
 
 #include <crm/cluster/compat.h>
 
 int
 crm_terminate_member(int nodeid, const char *uname, void *unused)
 {
     return stonith_api_kick(nodeid, uname, 120, TRUE);
 }
 
 int
 crm_terminate_member_no_mainloop(int nodeid, const char *uname, int *connection)
 {
     return stonith_api_kick(nodeid, uname, 120, TRUE);
 }
 
 crm_node_t *
 crm_get_peer(unsigned int id, const char *uname)
 {
     return pcmk__get_node(id, uname, NULL, pcmk__node_search_cluster);
 }
 
 crm_node_t *
 crm_get_peer_full(unsigned int id, const char *uname, int flags)
 {
     return pcmk__get_node(id, uname, NULL, flags);
 }
 
 int
 crm_remote_peer_cache_size(void)
 {
     unsigned int count = pcmk__cluster_num_remote_nodes();
 
     return QB_MIN(count, INT_MAX);
 }
 
 void
 crm_remote_peer_cache_refresh(xmlNode *cib)
 {
     refresh_remote_nodes(cib);
 }
 
 // LCOV_EXCL_STOP
 // End deprecated API