diff --git a/daemons/attrd/attrd_commands.c b/daemons/attrd/attrd_commands.c
index 06ed14eb30..6bd3b81578 100644
--- a/daemons/attrd/attrd_commands.c
+++ b/daemons/attrd/attrd_commands.c
@@ -1,1390 +1,1391 @@
 /*
  * Copyright 2013-2022 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU General Public License version 2
  * or later (GPLv2+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #include <sys/types.h>
 #include <regex.h>
 #include <glib.h>
 
 #include <crm/msg_xml.h>
 #include <crm/cluster.h>
 #include <crm/cib.h>
 #include <crm/common/xml_internal.h>
 #include <crm/cluster/internal.h>
 #include <crm/cluster/election_internal.h>
 #include <crm/cib/internal.h>
 
 #include "pacemaker-attrd.h"
 
 /*
  * Legacy attrd (all pre-1.1.11 Pacemaker versions, plus all versions when used
  * with the no-longer-supported CMAN or corosync-plugin stacks) is unversioned.
  *
  * With atomic attrd, each attrd will send ATTRD_PROTOCOL_VERSION with every
  * peer request and reply. As of Pacemaker 2.0.0, at start-up each attrd will
  * also set a private attribute for itself with its version, so any attrd can
  * determine the minimum version supported by all peers.
  *
  * Protocol  Pacemaker  Significant changes
  * --------  ---------  -------------------
  *     1       1.1.11   PCMK__ATTRD_CMD_UPDATE (PCMK__XA_ATTR_NAME only),
  *                      PCMK__ATTRD_CMD_PEER_REMOVE, PCMK__ATTRD_CMD_REFRESH,
  *                      PCMK__ATTRD_CMD_FLUSH, PCMK__ATTRD_CMD_SYNC,
  *                      PCMK__ATTRD_CMD_SYNC_RESPONSE
  *     1       1.1.13   PCMK__ATTRD_CMD_UPDATE (with PCMK__XA_ATTR_PATTERN),
  *                      PCMK__ATTRD_CMD_QUERY
  *     1       1.1.15   PCMK__ATTRD_CMD_UPDATE_BOTH,
  *                      PCMK__ATTRD_CMD_UPDATE_DELAY
  *     2       1.1.17   PCMK__ATTRD_CMD_CLEAR_FAILURE
  *     3       2.1.1    PCMK__ATTRD_CMD_SYNC_RESPONSE indicates remote nodes
  */
 #define ATTRD_PROTOCOL_VERSION "3"
 
 int last_cib_op_done = 0;
 GHashTable *attributes = NULL;
 
 void write_attribute(attribute_t *a, bool ignore_delay);
 void write_or_elect_attribute(attribute_t *a);
 void attrd_peer_update(crm_node_t *peer, xmlNode *xml, const char *host, bool filter);
 void attrd_peer_sync(crm_node_t *peer, xmlNode *xml);
 void attrd_peer_remove(const char *host, gboolean uncache, const char *source);
 
 static void broadcast_unseen_local_values(crm_node_t *peer, xmlNode *xml);
 
 static gboolean
 send_attrd_message(crm_node_t * node, xmlNode * data)
 {
     crm_xml_add(data, F_TYPE, T_ATTRD);
     crm_xml_add(data, PCMK__XA_ATTR_VERSION, ATTRD_PROTOCOL_VERSION);
     attrd_xml_add_writer(data);
     return send_cluster_message(node, crm_msg_attrd, data, TRUE);
 }
 
 static gboolean
 attribute_timer_cb(gpointer data)
 {
     attribute_t *a = data;
     crm_trace("Dampen interval expired for %s", a->id);
     write_or_elect_attribute(a);
     return FALSE;
 }
 
 static void
 free_attribute_value(gpointer data)
 {
     attribute_value_t *v = data;
 
     free(v->nodename);
     free(v->current);
     free(v->requested);
     free(v);
 }
 
 void
 free_attribute(gpointer data)
 {
     attribute_t *a = data;
     if(a) {
         free(a->id);
         free(a->set);
         free(a->uuid);
         free(a->user);
 
         mainloop_timer_del(a->timer);
         g_hash_table_destroy(a->values);
 
         free(a);
     }
 }
 
 /*!
  * \internal
  * \brief Ensure a Pacemaker Remote node is in the correct peer cache
  *
  * \param[in]
  */
 static void
 cache_remote_node(const char *node_name)
 {
     /* If we previously assumed this node was an unseen cluster node,
      * remove its entry from the cluster peer cache.
      */
     crm_node_t *dup = pcmk__search_cluster_node_cache(0, node_name);
 
     if (dup && (dup->uuid == NULL)) {
         reap_crm_member(0, node_name);
     }
 
     // Ensure node is in the remote peer cache
     CRM_ASSERT(crm_remote_peer_get(node_name) != NULL);
 }
 
 /*!
  * \internal
  * \brief Create an XML representation of an attribute for use in peer messages
  *
  * \param[in] parent       Create attribute XML as child element of this element
  * \param[in] a            Attribute to represent
  * \param[in] v            Attribute value to represent
  * \param[in] force_write  If true, value should be written even if unchanged
  *
  * \return XML representation of attribute
  */
 static xmlNode *
 add_attribute_value_xml(xmlNode *parent, attribute_t *a, attribute_value_t *v,
                         bool force_write)
 {
     xmlNode *xml = create_xml_node(parent, __func__);
 
     crm_xml_add(xml, PCMK__XA_ATTR_NAME, a->id);
     crm_xml_add(xml, PCMK__XA_ATTR_SET, a->set);
     crm_xml_add(xml, PCMK__XA_ATTR_UUID, a->uuid);
     crm_xml_add(xml, PCMK__XA_ATTR_USER, a->user);
     crm_xml_add(xml, PCMK__XA_ATTR_NODE_NAME, v->nodename);
     if (v->nodeid > 0) {
         crm_xml_add_int(xml, PCMK__XA_ATTR_NODE_ID, v->nodeid);
     }
     if (v->is_remote != 0) {
         crm_xml_add_int(xml, PCMK__XA_ATTR_IS_REMOTE, 1);
     }
     crm_xml_add(xml, PCMK__XA_ATTR_VALUE, v->current);
     crm_xml_add_int(xml, PCMK__XA_ATTR_DAMPENING, a->timeout_ms / 1000);
     crm_xml_add_int(xml, PCMK__XA_ATTR_IS_PRIVATE, a->is_private);
     crm_xml_add_int(xml, PCMK__XA_ATTR_FORCE, force_write);
 
     return xml;
 }
 
 static void
 clear_attribute_value_seen(void)
 {
     GHashTableIter aIter;
     GHashTableIter vIter;
     attribute_t *a;
     attribute_value_t *v = NULL;
 
     g_hash_table_iter_init(&aIter, attributes);
     while (g_hash_table_iter_next(&aIter, NULL, (gpointer *) & a)) {
         g_hash_table_iter_init(&vIter, a->values);
         while (g_hash_table_iter_next(&vIter, NULL, (gpointer *) & v)) {
             v->seen = FALSE;
             crm_trace("Clear seen flag %s[%s] = %s.", a->id, v->nodename, v->current);
         }
     }
 }
 
 static attribute_t *
 create_attribute(xmlNode *xml)
 {
     int dampen = 0;
     const char *value = crm_element_value(xml, PCMK__XA_ATTR_DAMPENING);
     attribute_t *a = calloc(1, sizeof(attribute_t));
 
     a->id      = crm_element_value_copy(xml, PCMK__XA_ATTR_NAME);
     a->set     = crm_element_value_copy(xml, PCMK__XA_ATTR_SET);
     a->uuid    = crm_element_value_copy(xml, PCMK__XA_ATTR_UUID);
     a->values = pcmk__strikey_table(NULL, free_attribute_value);
 
     crm_element_value_int(xml, PCMK__XA_ATTR_IS_PRIVATE, &a->is_private);
 
     a->user = crm_element_value_copy(xml, PCMK__XA_ATTR_USER);
     crm_trace("Performing all %s operations as user '%s'", a->id, a->user);
 
     if(value) {
         dampen = crm_get_msec(value);
         crm_trace("Created attribute %s with delay %dms (%s)", a->id, dampen, value);
     } else {
         crm_trace("Created attribute %s with no delay", a->id);
     }
 
     if(dampen > 0) {
         a->timeout_ms = dampen;
         a->timer = mainloop_timer_add(a->id, a->timeout_ms, FALSE, attribute_timer_cb, a);
     } else if (dampen < 0) {
         crm_warn("Ignoring invalid delay %s for attribute %s", value, a->id);
     }
 
     g_hash_table_replace(attributes, a->id, a);
     return a;
 }
 
 /*!
  * \internal
  * \brief Respond to a client peer-remove request (i.e. propagate to all peers)
  *
  * \param[in] client_name Name of client that made request (for log messages)
  * \param[in] xml         Root of request XML
  *
  * \return void
  */
 void
 attrd_client_peer_remove(pcmk__client_t *client, xmlNode *xml)
 {
     // Host and ID are not used in combination, rather host has precedence
     const char *host = crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME);
     char *host_alloc = NULL;
 
     if (host == NULL) {
         int nodeid = 0;
 
         crm_element_value_int(xml, PCMK__XA_ATTR_NODE_ID, &nodeid);
         if (nodeid > 0) {
             crm_node_t *node = pcmk__search_cluster_node_cache(nodeid, NULL);
             char *host_alloc = NULL;
 
             if (node && node->uname) {
                 // Use cached name if available
                 host = node->uname;
             } else {
                 // Otherwise ask cluster layer
                 host_alloc = get_node_name(nodeid);
                 host = host_alloc;
             }
             crm_xml_add(xml, PCMK__XA_ATTR_NODE_NAME, host);
         }
     }
 
     if (host) {
         crm_info("Client %s is requesting all values for %s be removed",
                  pcmk__client_name(client), host);
         send_attrd_message(NULL, xml); /* ends up at attrd_peer_message() */
         free(host_alloc);
     } else {
         crm_info("Ignoring request by client %s to remove all peer values without specifying peer",
                  pcmk__client_name(client));
     }
 }
 
 /*!
  * \internal
  * \brief Respond to a client update request
  *
  * \param[in] xml         Root of request XML
  *
  * \return void
  */
 void
 attrd_client_update(xmlNode *xml)
 {
     attribute_t *a = NULL;
     char *host = crm_element_value_copy(xml, PCMK__XA_ATTR_NODE_NAME);
     const char *attr = crm_element_value(xml, PCMK__XA_ATTR_NAME);
     const char *value = crm_element_value(xml, PCMK__XA_ATTR_VALUE);
     const char *regex = crm_element_value(xml, PCMK__XA_ATTR_PATTERN);
 
     /* If a regex was specified, broadcast a message for each match */
     if ((attr == NULL) && regex) {
         GHashTableIter aIter;
         regex_t *r_patt = calloc(1, sizeof(regex_t));
 
         crm_debug("Setting %s to %s", regex, value);
         if (regcomp(r_patt, regex, REG_EXTENDED|REG_NOSUB)) {
             crm_err("Bad regex '%s' for update", regex);
 
         } else {
             g_hash_table_iter_init(&aIter, attributes);
             while (g_hash_table_iter_next(&aIter, (gpointer *) & attr, NULL)) {
                 int status = regexec(r_patt, attr, 0, NULL, 0);
 
                 if (status == 0) {
                     crm_trace("Matched %s with %s", attr, regex);
                     crm_xml_add(xml, PCMK__XA_ATTR_NAME, attr);
                     send_attrd_message(NULL, xml);
                 }
             }
         }
 
         free(host);
         regfree(r_patt);
         free(r_patt);
         return;
 
     } else if (attr == NULL) {
         crm_err("Update request did not specify attribute or regular expression");
         free(host);
         return;
     }
 
     if (host == NULL) {
         crm_trace("Inferring host");
         host = strdup(attrd_cluster->uname);
         crm_xml_add(xml, PCMK__XA_ATTR_NODE_NAME, host);
         crm_xml_add_int(xml, PCMK__XA_ATTR_NODE_ID, attrd_cluster->nodeid);
     }
 
     a = g_hash_table_lookup(attributes, attr);
 
     /* If value was specified using ++ or += notation, expand to real value */
     if (value) {
         if (attrd_value_needs_expansion(value)) {
             int int_value;
             attribute_value_t *v = NULL;
 
             if (a) {
                 v = g_hash_table_lookup(a->values, host);
             }
             int_value = attrd_expand_value(value, (v? v->current : NULL));
 
             crm_info("Expanded %s=%s to %d", attr, value, int_value);
             crm_xml_add_int(xml, PCMK__XA_ATTR_VALUE, int_value);
 
             /* Replacing the value frees the previous memory, so re-query it */
             value = crm_element_value(xml, PCMK__XA_ATTR_VALUE);
         }
     }
 
     crm_debug("Broadcasting %s[%s]=%s%s", attr, host, value,
               (attrd_election_won()? " (writer)" : ""));
 
     free(host);
 
     send_attrd_message(NULL, xml); /* ends up at attrd_peer_message() */
 }
 
 /*!
  * \internal
  * \brief Respond to client clear-failure request
  *
  * \param[in] xml         Request XML
  */
 void
 attrd_client_clear_failure(xmlNode *xml)
 {
 #if 0
     /* @TODO Track the minimum supported protocol version across all nodes,
      * then enable this more-efficient code.
      */
     if (compare_version("2", minimum_protocol_version) <= 0) {
         /* Propagate to all peers (including ourselves).
          * This ends up at attrd_peer_message().
          */
         send_attrd_message(NULL, xml);
         return;
     }
 #endif
 
     const char *rsc = crm_element_value(xml, PCMK__XA_ATTR_RESOURCE);
     const char *op = crm_element_value(xml, PCMK__XA_ATTR_OPERATION);
     const char *interval_spec = crm_element_value(xml, PCMK__XA_ATTR_INTERVAL);
 
     /* Map this to an update */
     crm_xml_add(xml, PCMK__XA_TASK, PCMK__ATTRD_CMD_UPDATE);
 
     /* Add regular expression matching desired attributes */
 
     if (rsc) {
         char *pattern;
 
         if (op == NULL) {
             pattern = crm_strdup_printf(ATTRD_RE_CLEAR_ONE, rsc);
 
         } else {
             guint interval_ms = crm_parse_interval_spec(interval_spec);
 
             pattern = crm_strdup_printf(ATTRD_RE_CLEAR_OP,
                                         rsc, op, interval_ms);
         }
 
         crm_xml_add(xml, PCMK__XA_ATTR_PATTERN, pattern);
         free(pattern);
 
     } else {
         crm_xml_add(xml, PCMK__XA_ATTR_PATTERN, ATTRD_RE_CLEAR_ALL);
     }
 
     /* Make sure attribute and value are not set, so we delete via regex */
     if (crm_element_value(xml, PCMK__XA_ATTR_NAME)) {
         crm_xml_replace(xml, PCMK__XA_ATTR_NAME, NULL);
     }
     if (crm_element_value(xml, PCMK__XA_ATTR_VALUE)) {
         crm_xml_replace(xml, PCMK__XA_ATTR_VALUE, NULL);
     }
 
     attrd_client_update(xml);
 }
 
 /*!
  * \internal
  * \brief Respond to a client refresh request (i.e. write out all attributes)
  *
  * \return void
  */
 void
 attrd_client_refresh(void)
 {
     crm_info("Updating all attributes");
     write_attributes(TRUE, TRUE);
 }
 
 /*!
  * \internal
  * \brief Build the XML reply to a client query
  *
  * param[in] attr Name of requested attribute
  * param[in] host Name of requested host (or NULL for all hosts)
  *
  * \return New XML reply
  * \note Caller is responsible for freeing the resulting XML
  */
 static xmlNode *build_query_reply(const char *attr, const char *host)
 {
     xmlNode *reply = create_xml_node(NULL, __func__);
     attribute_t *a;
 
     if (reply == NULL) {
         return NULL;
     }
     crm_xml_add(reply, F_TYPE, T_ATTRD);
+    crm_xml_add(reply, F_SUBTYPE, PCMK__ATTRD_CMD_QUERY);
     crm_xml_add(reply, PCMK__XA_ATTR_VERSION, ATTRD_PROTOCOL_VERSION);
 
     /* If desired attribute exists, add its value(s) to the reply */
     a = g_hash_table_lookup(attributes, attr);
     if (a) {
         attribute_value_t *v;
         xmlNode *host_value;
 
         crm_xml_add(reply, PCMK__XA_ATTR_NAME, attr);
 
         /* Allow caller to use "localhost" to refer to local node */
         if (pcmk__str_eq(host, "localhost", pcmk__str_casei)) {
             host = attrd_cluster->uname;
             crm_trace("Mapped localhost to %s", host);
         }
 
         /* If a specific node was requested, add its value */
         if (host) {
             v = g_hash_table_lookup(a->values, host);
             host_value = create_xml_node(reply, XML_CIB_TAG_NODE);
             if (host_value == NULL) {
                 free_xml(reply);
                 return NULL;
             }
             crm_xml_add(host_value, PCMK__XA_ATTR_NODE_NAME, host);
             crm_xml_add(host_value, PCMK__XA_ATTR_VALUE,
                         (v? v->current : NULL));
 
         /* Otherwise, add all nodes' values */
         } else {
             GHashTableIter iter;
 
             g_hash_table_iter_init(&iter, a->values);
             while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &v)) {
                 host_value = create_xml_node(reply, XML_CIB_TAG_NODE);
                 if (host_value == NULL) {
                     free_xml(reply);
                     return NULL;
                 }
                 crm_xml_add(host_value, PCMK__XA_ATTR_NODE_NAME, v->nodename);
                 crm_xml_add(host_value, PCMK__XA_ATTR_VALUE, v->current);
             }
         }
     }
     return reply;
 }
 
 /*!
  * \internal
  * \brief Respond to a client query
  *
  * \param[in] client Who queried us
  * \param[in] query  Root of query XML
  *
  * \return void
  */
 void
 attrd_client_query(pcmk__client_t *client, uint32_t id, uint32_t flags,
                    xmlNode *query)
 {
     const char *attr;
     const char *origin = crm_element_value(query, F_ORIG);
     xmlNode *reply;
 
     if (origin == NULL) {
         origin = "unknown client";
     }
     crm_debug("Query arrived from %s", origin);
 
     /* Request must specify attribute name to query */
     attr = crm_element_value(query, PCMK__XA_ATTR_NAME);
     if (attr == NULL) {
         crm_warn("Ignoring malformed query from %s (no attribute name given)",
                  origin);
         return;
     }
 
     /* Build the XML reply */
     reply = build_query_reply(attr, crm_element_value(query,
                                                       PCMK__XA_ATTR_NODE_NAME));
     if (reply == NULL) {
         crm_err("Could not respond to query from %s: could not create XML reply",
                  origin);
         return;
     }
     crm_log_xml_trace(reply, "Reply");
 
     /* Send the reply to the client */
     client->request_id = 0;
     {
         int rc = pcmk__ipc_send_xml(client, id, reply, flags);
 
         if (rc != pcmk_rc_ok) {
             crm_err("Could not respond to query from %s: %s " CRM_XS " rc=%d",
                     origin, pcmk_rc_str(rc), rc);
         }
     }
     free_xml(reply);
 }
 
 /*!
  * \internal
  * \brief Clear failure-related attributes
  *
  * \param[in] peer  Peer that sent clear request
  * \param[in] xml   Request XML
  */
 static void
 attrd_peer_clear_failure(crm_node_t *peer, xmlNode *xml)
 {
     const char *rsc = crm_element_value(xml, PCMK__XA_ATTR_RESOURCE);
     const char *host = crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME);
     const char *op = crm_element_value(xml, PCMK__XA_ATTR_OPERATION);
     const char *interval_spec = crm_element_value(xml, PCMK__XA_ATTR_INTERVAL);
     guint interval_ms = crm_parse_interval_spec(interval_spec);
     char *attr = NULL;
     GHashTableIter iter;
     regex_t regex;
 
     if (attrd_failure_regex(&regex, rsc, op, interval_ms) != pcmk_ok) {
         crm_info("Ignoring invalid request to clear failures for %s",
                  (rsc? rsc : "all resources"));
         return;
     }
 
     crm_xml_add(xml, PCMK__XA_TASK, PCMK__ATTRD_CMD_UPDATE);
 
     /* Make sure value is not set, so we delete */
     if (crm_element_value(xml, PCMK__XA_ATTR_VALUE)) {
         crm_xml_replace(xml, PCMK__XA_ATTR_VALUE, NULL);
     }
 
     g_hash_table_iter_init(&iter, attributes);
     while (g_hash_table_iter_next(&iter, (gpointer *) &attr, NULL)) {
         if (regexec(&regex, attr, 0, NULL, 0) == 0) {
             crm_trace("Matched %s when clearing %s",
                       attr, (rsc? rsc : "all resources"));
             crm_xml_add(xml, PCMK__XA_ATTR_NAME, attr);
             attrd_peer_update(peer, xml, host, FALSE);
         }
     }
     regfree(&regex);
 }
 
 /*!
  * \internal
  * \brief Load attributes from a peer sync response
  *
  * \param[in] peer      Peer that sent clear request
  * \param[in] peer_won  Whether peer is the attribute writer
  * \param[in] xml       Request XML
  */
 static void
 process_peer_sync_response(crm_node_t *peer, bool peer_won, xmlNode *xml)
 {
     crm_info("Processing " PCMK__ATTRD_CMD_SYNC_RESPONSE " from %s",
              peer->uname);
 
     if (peer_won) {
         /* Initialize the "seen" flag for all attributes to cleared, so we can
          * detect attributes that local node has but the writer doesn't.
          */
         clear_attribute_value_seen();
     }
 
     // Process each attribute update in the sync response
     for (xmlNode *child = pcmk__xml_first_child(xml); child != NULL;
          child = pcmk__xml_next(child)) {
         attrd_peer_update(peer, child,
                           crm_element_value(child, PCMK__XA_ATTR_NODE_NAME),
                           TRUE);
     }
 
     if (peer_won) {
         /* If any attributes are still not marked as seen, the writer doesn't
          * know about them, so send all peers an update with them.
          */
         broadcast_unseen_local_values(peer, xml);
     }
 }
 
 /*!
     \internal
     \brief Broadcast private attribute for local node with protocol version
 */
 void
 attrd_broadcast_protocol()
 {
     xmlNode *attrd_op = create_xml_node(NULL, __func__);
 
     crm_xml_add(attrd_op, F_TYPE, T_ATTRD);
     crm_xml_add(attrd_op, F_ORIG, crm_system_name);
     crm_xml_add(attrd_op, PCMK__XA_TASK, PCMK__ATTRD_CMD_UPDATE);
     crm_xml_add(attrd_op, PCMK__XA_ATTR_NAME, CRM_ATTR_PROTOCOL);
     crm_xml_add(attrd_op, PCMK__XA_ATTR_VALUE, ATTRD_PROTOCOL_VERSION);
     crm_xml_add_int(attrd_op, PCMK__XA_ATTR_IS_PRIVATE, 1);
     attrd_client_update(attrd_op);
     free_xml(attrd_op);
 }
 
 void
 attrd_peer_message(crm_node_t *peer, xmlNode *xml)
 {
     const char *op = crm_element_value(xml, PCMK__XA_TASK);
     const char *election_op = crm_element_value(xml, F_CRM_TASK);
     const char *host = crm_element_value(xml, PCMK__XA_ATTR_NODE_NAME);
     bool peer_won = false;
 
     if (election_op) {
         attrd_handle_election_op(peer, xml);
         return;
     }
 
     if (attrd_shutting_down()) {
         /* If we're shutting down, we want to continue responding to election
          * ops as long as we're a cluster member (because our vote may be
          * needed). Ignore all other messages.
          */
         return;
     }
 
     peer_won = attrd_check_for_new_writer(peer, xml);
 
     if (pcmk__strcase_any_of(op, PCMK__ATTRD_CMD_UPDATE, PCMK__ATTRD_CMD_UPDATE_BOTH,
                              PCMK__ATTRD_CMD_UPDATE_DELAY, NULL)) {
         attrd_peer_update(peer, xml, host, FALSE);
 
     } else if (pcmk__str_eq(op, PCMK__ATTRD_CMD_SYNC, pcmk__str_casei)) {
         attrd_peer_sync(peer, xml);
 
     } else if (pcmk__str_eq(op, PCMK__ATTRD_CMD_PEER_REMOVE, pcmk__str_casei)) {
         attrd_peer_remove(host, TRUE, peer->uname);
 
     } else if (pcmk__str_eq(op, PCMK__ATTRD_CMD_CLEAR_FAILURE, pcmk__str_casei)) {
         /* It is not currently possible to receive this as a peer command,
          * but will be, if we one day enable propagating this operation.
          */
         attrd_peer_clear_failure(peer, xml);
 
     } else if (pcmk__str_eq(op, PCMK__ATTRD_CMD_SYNC_RESPONSE, pcmk__str_casei)
                && !pcmk__str_eq(peer->uname, attrd_cluster->uname, pcmk__str_casei)) {
         process_peer_sync_response(peer, peer_won, xml);
 
     } else if (pcmk__str_eq(op, PCMK__ATTRD_CMD_FLUSH, pcmk__str_casei)) {
         /* Ignore. The flush command was removed in 2.0.0 but may be
          * received from peers running older versions.
          */
     }
 }
 
 void
 attrd_peer_sync(crm_node_t *peer, xmlNode *xml)
 {
     GHashTableIter aIter;
     GHashTableIter vIter;
 
     attribute_t *a = NULL;
     attribute_value_t *v = NULL;
     xmlNode *sync = create_xml_node(NULL, __func__);
 
     crm_xml_add(sync, PCMK__XA_TASK, PCMK__ATTRD_CMD_SYNC_RESPONSE);
 
     g_hash_table_iter_init(&aIter, attributes);
     while (g_hash_table_iter_next(&aIter, NULL, (gpointer *) & a)) {
         g_hash_table_iter_init(&vIter, a->values);
         while (g_hash_table_iter_next(&vIter, NULL, (gpointer *) & v)) {
             crm_debug("Syncing %s[%s] = %s to %s", a->id, v->nodename, v->current, peer?peer->uname:"everyone");
             add_attribute_value_xml(sync, a, v, false);
         }
     }
 
     crm_debug("Syncing values to %s", peer?peer->uname:"everyone");
     send_attrd_message(peer, sync);
     free_xml(sync);
 }
 
 /*!
  * \internal
  * \brief Remove all attributes and optionally peer cache entries for a node
  *
  * \param[in] host     Name of node to purge
  * \param[in] uncache  If TRUE, remove node from peer caches
  * \param[in] source   Who requested removal (only used for logging)
  */
 void
 attrd_peer_remove(const char *host, gboolean uncache, const char *source)
 {
     attribute_t *a = NULL;
     GHashTableIter aIter;
 
     CRM_CHECK(host != NULL, return);
     crm_notice("Removing all %s attributes for peer %s", host, source);
 
     g_hash_table_iter_init(&aIter, attributes);
     while (g_hash_table_iter_next(&aIter, NULL, (gpointer *) & a)) {
         if(g_hash_table_remove(a->values, host)) {
             crm_debug("Removed %s[%s] for peer %s", a->id, host, source);
         }
     }
 
     if (uncache) {
         crm_remote_peer_cache_remove(host);
         reap_crm_member(0, host);
     }
 }
 
 /*!
  * \internal
  * \brief Return host's hash table entry (creating one if needed)
  *
  * \param[in] values Hash table of values
  * \param[in] host Name of peer to look up
  * \param[in] xml XML describing the attribute
  *
  * \return Pointer to new or existing hash table entry
  */
 static attribute_value_t *
 attrd_lookup_or_create_value(GHashTable *values, const char *host, xmlNode *xml)
 {
     attribute_value_t *v = g_hash_table_lookup(values, host);
     int is_remote = 0;
 
     crm_element_value_int(xml, PCMK__XA_ATTR_IS_REMOTE, &is_remote);
     if (is_remote) {
         cache_remote_node(host);
     }
 
     if (v == NULL) {
         v = calloc(1, sizeof(attribute_value_t));
         CRM_ASSERT(v != NULL);
 
         pcmk__str_update(&v->nodename, host);
         v->is_remote = is_remote;
         g_hash_table_replace(values, v->nodename, v);
     }
     return(v);
 }
 
 void
 broadcast_unseen_local_values(crm_node_t *peer, xmlNode *xml)
 {
     GHashTableIter aIter;
     GHashTableIter vIter;
     attribute_t *a = NULL;
     attribute_value_t *v = NULL;
     xmlNode *sync = NULL;
 
     g_hash_table_iter_init(&aIter, attributes);
     while (g_hash_table_iter_next(&aIter, NULL, (gpointer *) & a)) {
         g_hash_table_iter_init(&vIter, a->values);
         while (g_hash_table_iter_next(&vIter, NULL, (gpointer *) & v)) {
             if (!(v->seen) && pcmk__str_eq(v->nodename, attrd_cluster->uname,
                                            pcmk__str_casei)) {
                 if (sync == NULL) {
                     sync = create_xml_node(NULL, __func__);
                     crm_xml_add(sync, PCMK__XA_TASK, PCMK__ATTRD_CMD_SYNC_RESPONSE);
                 }
                 add_attribute_value_xml(sync, a, v, a->timeout_ms && a->timer);
             }
         }
     }
 
     if (sync != NULL) {
         crm_debug("Broadcasting local-only values");
         send_attrd_message(NULL, sync);
         free_xml(sync);
     }
 }
 
 /*!
  * \internal
  * \brief Override an attribute sync with a local value
  *
  * Broadcast the local node's value for an attribute that's different from the
  * value provided in a peer's attribute synchronization response. This ensures a
  * node's values for itself take precedence and all peers are kept in sync.
  *
  * \param[in] a          Attribute entry to override
  *
  * \return Local instance of attribute value
  */
 static attribute_value_t *
 broadcast_local_value(attribute_t *a)
 {
     attribute_value_t *v = g_hash_table_lookup(a->values, attrd_cluster->uname);
     xmlNode *sync = create_xml_node(NULL, __func__);
 
     crm_xml_add(sync, PCMK__XA_TASK, PCMK__ATTRD_CMD_SYNC_RESPONSE);
     add_attribute_value_xml(sync, a, v, false);
     attrd_xml_add_writer(sync);
     send_attrd_message(NULL, sync);
     free_xml(sync);
     return v;
 }
 
 void
 attrd_peer_update(crm_node_t *peer, xmlNode *xml, const char *host, bool filter)
 {
     bool update_both = FALSE;
     attribute_t *a;
     attribute_value_t *v = NULL;
     gboolean is_force_write = FALSE;
 
     const char *op = crm_element_value(xml, PCMK__XA_TASK);
     const char *attr = crm_element_value(xml, PCMK__XA_ATTR_NAME);
     const char *value = crm_element_value(xml, PCMK__XA_ATTR_VALUE);
     crm_element_value_int(xml, PCMK__XA_ATTR_FORCE, &is_force_write);
 
     if (attr == NULL) {
         crm_warn("Could not update attribute: peer did not specify name");
         return;
     }
 
     // NULL because PCMK__ATTRD_CMD_SYNC_RESPONSE has no PCMK__XA_TASK
     update_both = pcmk__str_eq(op, PCMK__ATTRD_CMD_UPDATE_BOTH,
                                pcmk__str_null_matches | pcmk__str_casei);
 
     // Look up or create attribute entry
     a = g_hash_table_lookup(attributes, attr);
     if (a == NULL) {
         if (update_both || pcmk__str_eq(op, PCMK__ATTRD_CMD_UPDATE, pcmk__str_casei)) {
             a = create_attribute(xml);
         } else {
             crm_warn("Could not update %s: attribute not found", attr);
             return;
         }
     }
 
     // Update attribute dampening
     if (update_both || pcmk__str_eq(op, PCMK__ATTRD_CMD_UPDATE_DELAY, pcmk__str_casei)) {
         const char *dvalue = crm_element_value(xml, PCMK__XA_ATTR_DAMPENING);
         int dampen = 0;
 
         if (dvalue == NULL) {
             crm_warn("Could not update %s: peer did not specify value for delay",
                      attr);
             return;
         }
 
         dampen = crm_get_msec(dvalue);
         if (dampen < 0) {
             crm_warn("Could not update %s: invalid delay value %dms (%s)",
                      attr, dampen, dvalue);
             return;
         }
 
         if (a->timeout_ms != dampen) {
             mainloop_timer_del(a->timer);
             a->timeout_ms = dampen;
             if (dampen > 0) {
                 a->timer = mainloop_timer_add(attr, a->timeout_ms, FALSE,
                                               attribute_timer_cb, a);
                 crm_info("Update attribute %s delay to %dms (%s)",
                          attr, dampen, dvalue);
             } else {
                 a->timer = NULL;
                 crm_info("Update attribute %s to remove delay", attr);
             }
 
             /* If dampening changed, do an immediate write-out,
              * otherwise repeated dampening changes would prevent write-outs
              */
             write_or_elect_attribute(a);
         }
 
         if (!update_both) {
             return;
         }
     }
 
     // If no host was specified, update all hosts recursively
     if (host == NULL) {
         GHashTableIter vIter;
 
         crm_debug("Setting %s for all hosts to %s", attr, value);
         xml_remove_prop(xml, PCMK__XA_ATTR_NODE_ID);
         g_hash_table_iter_init(&vIter, a->values);
         while (g_hash_table_iter_next(&vIter, (gpointer *) & host, NULL)) {
             attrd_peer_update(peer, xml, host, filter);
         }
         return;
     }
 
     // Update attribute value for one host
 
     v = attrd_lookup_or_create_value(a->values, host, xml);
 
     if (filter && !pcmk__str_eq(v->current, value, pcmk__str_casei)
         && pcmk__str_eq(host, attrd_cluster->uname, pcmk__str_casei)) {
 
         crm_notice("%s[%s]: local value '%s' takes priority over '%s' from %s",
                    attr, host, v->current, value, peer->uname);
         v = broadcast_local_value(a);
 
     } else if (!pcmk__str_eq(v->current, value, pcmk__str_casei)) {
         crm_notice("Setting %s[%s]: %s -> %s " CRM_XS " from %s",
                    attr, host, v->current? v->current : "(unset)", value? value : "(unset)", peer->uname);
         pcmk__str_update(&v->current, value);
         a->changed = TRUE;
 
         if (pcmk__str_eq(host, attrd_cluster->uname, pcmk__str_casei)
             && pcmk__str_eq(attr, XML_CIB_ATTR_SHUTDOWN, pcmk__str_none)) {
 
             if (!pcmk__str_eq(value, "0", pcmk__str_null_matches)) {
                 attrd_set_requesting_shutdown();
 
             } else {
                 attrd_clear_requesting_shutdown();
             }
         }
 
         // Write out new value or start dampening timer
         if (a->timeout_ms && a->timer) {
             crm_trace("Delayed write out (%dms) for %s", a->timeout_ms, attr);
             mainloop_timer_start(a->timer);
         } else {
             write_or_elect_attribute(a);
         }
 
     } else {
         if (is_force_write && a->timeout_ms && a->timer) {
             /* Save forced writing and set change flag. */
             /* The actual attribute is written by Writer after election. */
             crm_trace("Unchanged %s[%s] from %s is %s(Set the forced write flag)", attr, host, peer->uname, value);
             a->force_write = TRUE;
         } else {
             crm_trace("Unchanged %s[%s] from %s is %s", attr, host, peer->uname, value);
         }
     }
 
     /* Set the seen flag for attribute processing held only in the own node. */
     v->seen = TRUE;
 
     /* If this is a cluster node whose node ID we are learning, remember it */
     if ((v->nodeid == 0) && (v->is_remote == FALSE)
         && (crm_element_value_int(xml, PCMK__XA_ATTR_NODE_ID,
                                   (int*)&v->nodeid) == 0) && (v->nodeid > 0)) {
 
         crm_node_t *known_peer = crm_get_peer(v->nodeid, host);
 
         crm_trace("Learned %s has node id %s",
                   known_peer->uname, known_peer->uuid);
         if (attrd_election_won()) {
             write_attributes(FALSE, FALSE);
         }
     }
 }
 
 void
 write_or_elect_attribute(attribute_t *a)
 {
     if (attrd_election_won()) {
         write_attribute(a, FALSE);
     } else {
         attrd_start_election_if_needed();
     }
 }
 
 gboolean
 attrd_election_cb(gpointer user_data)
 {
     attrd_declare_winner();
 
     /* Update the peers after an election */
     attrd_peer_sync(NULL, NULL);
 
     /* Update the CIB after an election */
     write_attributes(TRUE, FALSE);
     return FALSE;
 }
 
 #define state_text(state) ((state)? (const char *)(state) : "in unknown state")
 
 void
 attrd_peer_change_cb(enum crm_status_type kind, crm_node_t *peer, const void *data)
 {
     bool gone = false;
     bool is_remote = pcmk_is_set(peer->flags, crm_remote_node);
 
     switch (kind) {
         case crm_status_uname:
             crm_debug("%s node %s is now %s",
                       (is_remote? "Remote" : "Cluster"),
                       peer->uname, state_text(peer->state));
             break;
 
         case crm_status_processes:
             if (!pcmk_is_set(peer->processes, crm_get_cluster_proc())) {
                 gone = true;
             }
             crm_debug("Node %s is %s a peer",
                       peer->uname, (gone? "no longer" : "now"));
             break;
 
         case crm_status_nstate:
             crm_debug("%s node %s is now %s (was %s)",
                       (is_remote? "Remote" : "Cluster"),
                       peer->uname, state_text(peer->state), state_text(data));
             if (pcmk__str_eq(peer->state, CRM_NODE_MEMBER, pcmk__str_casei)) {
                 /* If we're the writer, send new peers a list of all attributes
                  * (unless it's a remote node, which doesn't run its own attrd)
                  */
                 if (attrd_election_won()
                     && !pcmk_is_set(peer->flags, crm_remote_node)) {
                     attrd_peer_sync(peer, NULL);
                 }
             } else {
                 // Remove all attribute values associated with lost nodes
                 attrd_peer_remove(peer->uname, FALSE, "loss");
                 gone = true;
             }
             break;
     }
 
     // Remove votes from cluster nodes that leave, in case election in progress
     if (gone && !is_remote) {
         attrd_remove_voter(peer);
 
     // Ensure remote nodes that come up are in the remote node cache
     } else if (!gone && is_remote) {
         cache_remote_node(peer->uname);
     }
 }
 
 static void
 attrd_cib_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void *user_data)
 {
     int level = LOG_ERR;
     GHashTableIter iter;
     const char *peer = NULL;
     attribute_value_t *v = NULL;
 
     char *name = user_data;
     attribute_t *a = g_hash_table_lookup(attributes, name);
 
     if(a == NULL) {
         crm_info("Attribute %s no longer exists", name);
         return;
     }
 
     a->update = 0;
     if (rc == pcmk_ok && call_id < 0) {
         rc = call_id;
     }
 
     switch (rc) {
         case pcmk_ok:
             level = LOG_INFO;
             last_cib_op_done = call_id;
             if (a->timer && !a->timeout_ms) {
                 // Remove temporary dampening for failed writes
                 mainloop_timer_del(a->timer);
                 a->timer = NULL;
             }
             break;
 
         case -pcmk_err_diff_failed:    /* When an attr changes while the CIB is syncing */
         case -ETIME:           /* When an attr changes while there is a DC election */
         case -ENXIO:           /* When an attr changes while the CIB is syncing a
                                 *   newer config from a node that just came up
                                 */
             level = LOG_WARNING;
             break;
     }
 
     do_crm_log(level, "CIB update %d result for %s: %s " CRM_XS " rc=%d",
                call_id, a->id, pcmk_strerror(rc), rc);
 
     g_hash_table_iter_init(&iter, a->values);
     while (g_hash_table_iter_next(&iter, (gpointer *) & peer, (gpointer *) & v)) {
         do_crm_log(level, "* %s[%s]=%s", a->id, peer, v->requested);
         free(v->requested);
         v->requested = NULL;
         if (rc != pcmk_ok) {
             a->changed = TRUE; /* Attempt write out again */
         }
     }
 
     if (a->changed && attrd_election_won()) {
         if (rc == pcmk_ok) {
             /* We deferred a write of a new update because this update was in
              * progress. Write out the new value without additional delay.
              */
             write_attribute(a, FALSE);
 
         /* We're re-attempting a write because the original failed; delay
          * the next attempt so we don't potentially flood the CIB manager
          * and logs with a zillion attempts per second.
          *
          * @TODO We could elect a new writer instead. However, we'd have to
          * somehow downgrade our vote, and we'd still need something like this
          * if all peers similarly fail to write this attribute (which may
          * indicate a corrupted attribute entry rather than a CIB issue).
          */
         } else if (a->timer) {
             // Attribute has a dampening value, so use that as delay
             if (!mainloop_timer_running(a->timer)) {
                 crm_trace("Delayed re-attempted write (%dms) for %s",
                           a->timeout_ms, name);
                 mainloop_timer_start(a->timer);
             }
         } else {
             /* Set a temporary dampening of 2 seconds (timer will continue
              * to exist until the attribute's dampening gets set or the
              * write succeeds).
              */
             a->timer = mainloop_timer_add(a->id, 2000, FALSE,
                                           attribute_timer_cb, a);
             mainloop_timer_start(a->timer);
         }
     }
 }
 
 void
 write_attributes(bool all, bool ignore_delay)
 {
     GHashTableIter iter;
     attribute_t *a = NULL;
 
     crm_debug("Writing out %s attributes", all? "all" : "changed");
     g_hash_table_iter_init(&iter, attributes);
     while (g_hash_table_iter_next(&iter, NULL, (gpointer *) & a)) {
         if (!all && a->unknown_peer_uuids) {
             // Try writing this attribute again, in case peer ID was learned
             a->changed = TRUE;
         } else if (a->force_write) {
             /* If the force_write flag is set, write the attribute. */
             a->changed = TRUE;
         }
 
         if(all || a->changed) {
             /* When forced write flag is set, ignore delay. */
             write_attribute(a, (a->force_write ? TRUE : ignore_delay));
         } else {
             crm_trace("Skipping unchanged attribute %s", a->id);
         }
     }
 }
 
 static void
 build_update_element(xmlNode *parent, attribute_t *a, const char *nodeid, const char *value)
 {
     const char *set = NULL;
     xmlNode *xml_obj = NULL;
 
     xml_obj = create_xml_node(parent, XML_CIB_TAG_STATE);
     crm_xml_add(xml_obj, XML_ATTR_ID, nodeid);
 
     xml_obj = create_xml_node(xml_obj, XML_TAG_TRANSIENT_NODEATTRS);
     crm_xml_add(xml_obj, XML_ATTR_ID, nodeid);
 
     xml_obj = create_xml_node(xml_obj, XML_TAG_ATTR_SETS);
     if (a->set) {
         crm_xml_set_id(xml_obj, "%s", a->set);
     } else {
         crm_xml_set_id(xml_obj, "%s-%s", XML_CIB_TAG_STATUS, nodeid);
     }
     set = ID(xml_obj);
 
     xml_obj = create_xml_node(xml_obj, XML_CIB_TAG_NVPAIR);
     if (a->uuid) {
         crm_xml_set_id(xml_obj, "%s", a->uuid);
     } else {
         crm_xml_set_id(xml_obj, "%s-%s", set, a->id);
     }
     crm_xml_add(xml_obj, XML_NVPAIR_ATTR_NAME, a->id);
 
     if(value) {
         crm_xml_add(xml_obj, XML_NVPAIR_ATTR_VALUE, value);
 
     } else {
         crm_xml_add(xml_obj, XML_NVPAIR_ATTR_VALUE, "");
         crm_xml_add(xml_obj, "__delete__", XML_NVPAIR_ATTR_VALUE);
     }
 }
 
 static void
 set_alert_attribute_value(GHashTable *t, attribute_value_t *v)
 {
     attribute_value_t *a_v = NULL;
     a_v = calloc(1, sizeof(attribute_value_t));
     CRM_ASSERT(a_v != NULL);
 
     a_v->nodeid = v->nodeid;
     a_v->nodename = strdup(v->nodename);
     pcmk__str_update(&a_v->current, v->current);
 
     g_hash_table_replace(t, a_v->nodename, a_v);
 }
 
 static void
 send_alert_attributes_value(attribute_t *a, GHashTable *t)
 {
     int rc = 0;
     attribute_value_t *at = NULL;
     GHashTableIter vIter;
 
     g_hash_table_iter_init(&vIter, t);
 
     while (g_hash_table_iter_next(&vIter, NULL, (gpointer *) & at)) {
         rc = attrd_send_attribute_alert(at->nodename, at->nodeid,
                                         a->id, at->current);
         crm_trace("Sent alerts for %s[%s]=%s: nodeid=%d rc=%d",
                   a->id, at->nodename, at->current, at->nodeid, rc);
     }
 }
 
 void
 write_attribute(attribute_t *a, bool ignore_delay)
 {
     int private_updates = 0, cib_updates = 0;
     xmlNode *xml_top = NULL;
     attribute_value_t *v = NULL;
     GHashTableIter iter;
     enum cib_call_options flags = cib_quorum_override;
     GHashTable *alert_attribute_value = NULL;
 
     if (a == NULL) {
         return;
     }
 
     /* If this attribute will be written to the CIB ... */
     if (!a->is_private) {
 
         /* Defer the write if now's not a good time */
         CRM_CHECK(the_cib != NULL, return);
         if (a->update && (a->update < last_cib_op_done)) {
             crm_info("Write out of '%s' continuing: update %d considered lost", a->id, a->update);
             a->update = 0; // Don't log this message again
 
         } else if (a->update) {
             crm_info("Write out of '%s' delayed: update %d in progress", a->id, a->update);
             return;
 
         } else if (mainloop_timer_running(a->timer)) {
             if (ignore_delay) {
                 /* 'refresh' forces a write of the current value of all attributes
                  * Cancel any existing timers, we're writing it NOW
                  */
                 mainloop_timer_stop(a->timer);
                 crm_debug("Write out of '%s': timer is running but ignore delay", a->id);
             } else {
                 crm_info("Write out of '%s' delayed: timer is running", a->id);
                 return;
             }
         }
 
         /* Initialize the status update XML */
         xml_top = create_xml_node(NULL, XML_CIB_TAG_STATUS);
     }
 
     /* Attribute will be written shortly, so clear changed flag */
     a->changed = FALSE;
 
     /* We will check all peers' uuids shortly, so initialize this to false */
     a->unknown_peer_uuids = FALSE;
 
     /* Attribute will be written shortly, so clear forced write flag */
     a->force_write = FALSE;    
 
     /* Make the table for the attribute trap */
     alert_attribute_value = pcmk__strikey_table(NULL, free_attribute_value);
 
     /* Iterate over each peer value of this attribute */
     g_hash_table_iter_init(&iter, a->values);
     while (g_hash_table_iter_next(&iter, NULL, (gpointer *) & v)) {
         crm_node_t *peer = crm_get_peer_full(v->nodeid, v->nodename, CRM_GET_PEER_ANY);
 
         /* If the value's peer info does not correspond to a peer, ignore it */
         if (peer == NULL) {
             crm_notice("Cannot update %s[%s]=%s because peer not known",
                        a->id, v->nodename, v->current);
             continue;
         }
 
         /* If we're just learning the peer's node id, remember it */
         if (peer->id && (v->nodeid == 0)) {
             crm_trace("Learned ID %u for node %s", peer->id, v->nodename);
             v->nodeid = peer->id;
         }
 
         /* If this is a private attribute, no update needs to be sent */
         if (a->is_private) {
             private_updates++;
             continue;
         }
 
         /* If the peer is found, but its uuid is unknown, defer write */
         if (peer->uuid == NULL) {
             a->unknown_peer_uuids = TRUE;
             crm_notice("Cannot update %s[%s]=%s because peer UUID not known "
                        "(will retry if learned)",
                        a->id, v->nodename, v->current);
             continue;
         }
 
         /* Add this value to status update XML */
         crm_debug("Updating %s[%s]=%s (peer known as %s, UUID %s, ID %u/%u)",
                   a->id, v->nodename, v->current,
                   peer->uname, peer->uuid, peer->id, v->nodeid);
         build_update_element(xml_top, a, peer->uuid, v->current);
         cib_updates++;
 
         /* Preservation of the attribute to transmit alert */
         set_alert_attribute_value(alert_attribute_value, v);
 
         free(v->requested);
         v->requested = NULL;
         if (v->current) {
             v->requested = strdup(v->current);
         } else {
             /* Older attrd versions don't know about the cib_mixed_update
              * flag so make sure it goes to the local cib which does
              */
             cib__set_call_options(flags, crm_system_name,
                                   cib_mixed_update|cib_scope_local);
         }
     }
 
     if (private_updates) {
         crm_info("Processed %d private change%s for %s, id=%s, set=%s",
                  private_updates, pcmk__plural_s(private_updates),
                  a->id, (a->uuid? a->uuid : "n/a"), (a->set? a->set : "n/a"));
     }
     if (cib_updates) {
         crm_log_xml_trace(xml_top, __func__);
 
         a->update = cib_internal_op(the_cib, CIB_OP_MODIFY, NULL, XML_CIB_TAG_STATUS, xml_top, NULL,
                                     flags, a->user);
 
         crm_info("Sent CIB request %d with %d change%s for %s (id %s, set %s)",
                  a->update, cib_updates, pcmk__plural_s(cib_updates),
                  a->id, (a->uuid? a->uuid : "n/a"), (a->set? a->set : "n/a"));
 
         the_cib->cmds->register_callback_full(the_cib, a->update,
                                               CIB_OP_TIMEOUT_S, FALSE,
                                               strdup(a->id),
                                               "attrd_cib_callback",
                                               attrd_cib_callback, free);
         /* Transmit alert of the attribute */
         send_alert_attributes_value(a, alert_attribute_value);
     }
 
     g_hash_table_destroy(alert_attribute_value);
     free_xml(xml_top);
 }
diff --git a/include/crm/common/Makefile.am b/include/crm/common/Makefile.am
index dc1fb97e0f..c492680441 100644
--- a/include/crm/common/Makefile.am
+++ b/include/crm/common/Makefile.am
@@ -1,50 +1,51 @@
 #
 # Copyright 2004-2022 the Pacemaker project contributors
 #
 # The version control history for this file may have further details.
 #
 # This source code is licensed under the GNU General Public License version 2
 # or later (GPLv2+) WITHOUT ANY WARRANTY.
 #
 
 MAINTAINERCLEANFILES = Makefile.in
 
 headerdir=$(pkgincludedir)/crm/common
 
 header_HEADERS = acl.h \
 				 agents.h \
 				 agents_compat.h \
 				 cib.h \
 				 ipc.h \
+				 ipc_attrd_internal.h \
 				 ipc_controld.h \
 				 ipc_pacemakerd.h \
 				 ipc_schedulerd.h \
 				 iso8601.h \
 				 logging.h \
 				 logging_compat.h \
 				 mainloop.h \
 				 mainloop_compat.h \
 				 nvpair.h \
 				 output.h \
 				 results.h \
 				 util.h \
 				 util_compat.h \
 				 xml.h \
 				 xml_compat.h
 
 noinst_HEADERS = alerts_internal.h	\
 				 attrd_internal.h	\
 				 cmdline_internal.h	\
 				 health_internal.h	\
 				 internal.h		\
 				 ipc_internal.h		\
 				 iso8601_internal.h	\
 				 lists_internal.h	\
 				 logging_internal.h	\
 				 messages_internal.h	\
 				 options_internal.h	\
 				 output_internal.h	\
 				 remote_internal.h	\
 				 results_internal.h	\
 				 strings_internal.h	\
 				 xml_internal.h
diff --git a/include/crm/common/ipc_attrd_internal.h b/include/crm/common/ipc_attrd_internal.h
new file mode 100644
index 0000000000..6b79e52753
--- /dev/null
+++ b/include/crm/common/ipc_attrd_internal.h
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2022 the Pacemaker project contributors
+ *
+ * The version control history for this file may have further details.
+ *
+ * This source code is licensed under the GNU Lesser General Public License
+ * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
+ */
+
+#ifndef PCMK__CRM_COMMON_IPC_ATTRD_INTERNAL__H
+#  define PCMK__CRM_COMMON_IPC_ATTRD_INTERNAL__H
+
+#include <glib.h>            // GList
+#include <crm/common/ipc.h>  // pcmk_ipc_api_t
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+//! Possible types of attribute manager replies
+enum pcmk__attrd_api_reply {
+    pcmk__attrd_reply_unknown,
+    pcmk__attrd_reply_query,
+};
+
+// Information passed with pcmk__attrd_reply_query
+typedef struct {
+    const char *node;
+    const char *name;
+    const char *value;
+} pcmk__attrd_query_pair_t;
+
+/*!
+ * Attribute manager reply passed to event callback
+ *
+ * \note The pointers in the reply are only guaranteed to be meaningful for the
+ *       execution of the callback; if the values are needed for later, the
+ *       callback should copy them.
+ */
+typedef struct {
+    enum pcmk__attrd_api_reply reply_type;
+
+    union {
+        // pcmk__attrd_reply_query
+        GList *pairs;
+    } data;
+} pcmk__attrd_api_reply_t;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // PCMK__CRM_COMMON_IPC_ATTRD_INTERNAL__H
diff --git a/lib/common/crmcommon_private.h b/lib/common/crmcommon_private.h
index 6b7be9c68a..71d891b2f7 100644
--- a/lib/common/crmcommon_private.h
+++ b/lib/common/crmcommon_private.h
@@ -1,283 +1,286 @@
 /*
  * Copyright 2018-2021 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU Lesser General Public License
  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
  */
 
 #ifndef CRMCOMMON_PRIVATE__H
 #  define CRMCOMMON_PRIVATE__H
 
 /* This header is for the sole use of libcrmcommon, so that functions can be
  * declared with G_GNUC_INTERNAL for efficiency.
  */
 
 #include <stdint.h>         // uint8_t, uint32_t
 #include <stdbool.h>        // bool
 #include <sys/types.h>      // size_t
 #include <glib.h>           // GList
 #include <libxml/tree.h>    // xmlNode, xmlAttr
 #include <qb/qbipcc.h>      // struct qb_ipc_response_header
 
 // Decent chunk size for processing large amounts of data
 #define PCMK__BUFFER_SIZE 4096
 
 /* When deleting portions of an XML tree, we keep a record so we can know later
  * (e.g. when checking differences) that something was deleted.
  */
 typedef struct pcmk__deleted_xml_s {
         char *path;
         int position;
 } pcmk__deleted_xml_t;
 
 typedef struct xml_private_s {
         long check;
         uint32_t flags;
         char *user;
         GList *acls;
         GList *deleted_objs; // List of pcmk__deleted_xml_t
 } xml_private_t;
 
 #define pcmk__set_xml_flags(xml_priv, flags_to_set) do {                    \
         (xml_priv)->flags = pcmk__set_flags_as(__func__, __LINE__,          \
             LOG_NEVER, "XML", "XML node", (xml_priv)->flags,                \
             (flags_to_set), #flags_to_set);                                 \
     } while (0)
 
 #define pcmk__clear_xml_flags(xml_priv, flags_to_clear) do {                \
         (xml_priv)->flags = pcmk__clear_flags_as(__func__, __LINE__,        \
             LOG_NEVER, "XML", "XML node", (xml_priv)->flags,                \
             (flags_to_clear), #flags_to_clear);                             \
     } while (0)
 
 G_GNUC_INTERNAL
 void pcmk__xml2text(xmlNode *data, int options, char **buffer, int *offset,
                     int *max, int depth);
 
 G_GNUC_INTERNAL
 void pcmk__buffer_add_char(char **buffer, int *offset, int *max, char c);
 
 G_GNUC_INTERNAL
 bool pcmk__tracking_xml_changes(xmlNode *xml, bool lazy);
 
 G_GNUC_INTERNAL
 int pcmk__element_xpath(const char *prefix, xmlNode *xml, char *buffer,
                         int offset, size_t buffer_size);
 
 G_GNUC_INTERNAL
 void pcmk__mark_xml_created(xmlNode *xml);
 
 G_GNUC_INTERNAL
 int pcmk__xml_position(xmlNode *xml, enum xml_private_flags ignore_if_set);
 
 G_GNUC_INTERNAL
 xmlNode *pcmk__xml_match(xmlNode *haystack, xmlNode *needle, bool exact);
 
 G_GNUC_INTERNAL
 void pcmk__xe_log(int log_level, const char *file, const char *function,
                   int line, const char *prefix, xmlNode *data, int depth,
                   int options);
 
 G_GNUC_INTERNAL
 void pcmk__xml_update(xmlNode *parent, xmlNode *target, xmlNode *update,
                       bool as_diff);
 
 G_GNUC_INTERNAL
 xmlNode *pcmk__xc_match(xmlNode *root, xmlNode *search_comment, bool exact);
 
 G_GNUC_INTERNAL
 void pcmk__xc_update(xmlNode *parent, xmlNode *target, xmlNode *update);
 
 G_GNUC_INTERNAL
 void pcmk__free_acls(GList *acls);
 
 G_GNUC_INTERNAL
 void pcmk__unpack_acl(xmlNode *source, xmlNode *target, const char *user);
 
 G_GNUC_INTERNAL
 void pcmk__apply_acl(xmlNode *xml);
 
 G_GNUC_INTERNAL
 void pcmk__apply_creation_acl(xmlNode *xml, bool check_top);
 
 G_GNUC_INTERNAL
 void pcmk__mark_xml_attr_dirty(xmlAttr *a);
 
 G_GNUC_INTERNAL
 bool pcmk__xa_filterable(const char *name);
 
 static inline const char *
 pcmk__xml_attr_value(const xmlAttr *attr)
 {
     return ((attr == NULL) || (attr->children == NULL))? NULL
            : (const char *) attr->children->content;
 }
 
 /*
  * IPC
  */
 
 #define PCMK__IPC_VERSION 1
 
 #define PCMK__CONTROLD_API_MAJOR "1"
 #define PCMK__CONTROLD_API_MINOR "0"
 
 // IPC behavior that varies by daemon
 typedef struct pcmk__ipc_methods_s {
     /*!
      * \internal
      * \brief Allocate any private data needed by daemon IPC
      *
      * \param[in] api  IPC API connection
      *
      * \return Standard Pacemaker return code
      */
     int (*new_data)(pcmk_ipc_api_t *api);
 
     /*!
      * \internal
      * \brief Free any private data used by daemon IPC
      *
      * \param[in] api_data  Data allocated by new_data() method
      */
     void (*free_data)(void *api_data);
 
     /*!
      * \internal
      * \brief Perform daemon-specific handling after successful connection
      *
      * Some daemons require clients to register before sending any other
      * commands. The controller requires a CRM_OP_HELLO (with no reply), and
      * the CIB manager, executor, and fencer require a CRM_OP_REGISTER (with a
      * reply). Ideally this would be consistent across all daemons, but for now
      * this allows each to do its own authorization.
      *
      * \param[in] api  IPC API connection
      *
      * \return Standard Pacemaker return code
      */
     int (*post_connect)(pcmk_ipc_api_t *api);
 
     /*!
      * \internal
      * \brief Check whether an IPC request results in a reply
      *
      * \param[in] api      IPC API connection
      * \param[in] request  IPC request XML
      *
      * \return true if request would result in an IPC reply, false otherwise
      */
     bool (*reply_expected)(pcmk_ipc_api_t *api, xmlNode *request);
 
     /*!
      * \internal
      * \brief Perform daemon-specific handling of an IPC message
      *
      * \param[in] api  IPC API connection
      * \param[in] msg  Message read from IPC connection
      *
      * \return true if more IPC reply messages should be expected
      */
     bool (*dispatch)(pcmk_ipc_api_t *api, xmlNode *msg);
 
     /*!
      * \internal
      * \brief Perform daemon-specific handling of an IPC disconnect
      *
      * \param[in] api  IPC API connection
      */
     void (*post_disconnect)(pcmk_ipc_api_t *api);
 } pcmk__ipc_methods_t;
 
 // Implementation of pcmk_ipc_api_t
 struct pcmk_ipc_api_s {
     enum pcmk_ipc_server server;          // Daemon this IPC API instance is for
     enum pcmk_ipc_dispatch dispatch_type; // How replies should be dispatched
     size_t ipc_size_max;                  // maximum IPC buffer size
     crm_ipc_t *ipc;                       // IPC connection
     mainloop_io_t *mainloop_io;     // If using mainloop, I/O source for IPC
     bool free_on_disconnect;        // Whether disconnect should free object
     pcmk_ipc_callback_t cb;         // Caller-registered callback (if any)
     void *user_data;                // Caller-registered data (if any)
     void *api_data;                 // For daemon-specific use
     pcmk__ipc_methods_t *cmds;      // Behavior that varies by daemon
 };
 
 typedef struct pcmk__ipc_header_s {
     struct qb_ipc_response_header qb;
     uint32_t size_uncompressed;
     uint32_t size_compressed;
     uint32_t flags;
     uint8_t version;
 } pcmk__ipc_header_t;
 
 G_GNUC_INTERNAL
 int pcmk__send_ipc_request(pcmk_ipc_api_t *api, xmlNode *request);
 
 G_GNUC_INTERNAL
 void pcmk__call_ipc_callback(pcmk_ipc_api_t *api,
                              enum pcmk_ipc_event event_type,
                              crm_exit_t status, void *event_data);
 
 G_GNUC_INTERNAL
 unsigned int pcmk__ipc_buffer_size(unsigned int max);
 
 G_GNUC_INTERNAL
 bool pcmk__valid_ipc_header(const pcmk__ipc_header_t *header);
 
+G_GNUC_INTERNAL
+pcmk__ipc_methods_t *pcmk__attrd_api_methods(void);
+
 G_GNUC_INTERNAL
 pcmk__ipc_methods_t *pcmk__controld_api_methods(void);
 
 G_GNUC_INTERNAL
 pcmk__ipc_methods_t *pcmk__pacemakerd_api_methods(void);
 
 G_GNUC_INTERNAL
 pcmk__ipc_methods_t *pcmk__schedulerd_api_methods(void);
 
 
 /*
  * Logging
  */
 
 /*!
  * \brief Check the authenticity of the IPC socket peer process
  *
  * If everything goes well, peer's authenticity is verified by the means
  * of comparing against provided referential UID and GID (either satisfies),
  * and the result of this check can be deduced from the return value.
  * As an exception, detected UID of 0 ("root") satisfies arbitrary
  * provided referential daemon's credentials.
  *
  * \param[in]  qb_ipc  libqb client connection if available
  * \param[in]  sock    IPC related, connected Unix socket to check peer of
  * \param[in]  refuid  referential UID to check against
  * \param[in]  refgid  referential GID to check against
  * \param[out] gotpid  to optionally store obtained PID of the peer
  *                     (not available on FreeBSD, special value of 1
  *                     used instead, and the caller is required to
  *                     special case this value respectively)
  * \param[out] gotuid  to optionally store obtained UID of the peer
  * \param[out] gotgid  to optionally store obtained GID of the peer
  *
  * \return Standard Pacemaker return code
  *         ie: 0 if it the connection is authentic
  *         pcmk_rc_ipc_unauthorized if the connection is not authentic,
  *         standard errors.
  *
  * \note While this function is tolerant on what constitutes authorized
  *       IPC daemon process (its effective user matches UID=0 or \p refuid,
  *       or at least its group matches \p refgid), either or both (in case
  *       of UID=0) mismatches on the expected credentials of such peer
  *       process \e shall be investigated at the caller when value of 1
  *       gets returned there, since higher-than-expected privileges in
  *       respect to the expected/intended credentials possibly violate
  *       the least privilege principle and may pose an additional risk
  *       (i.e. such accidental inconsistency shall be eventually fixed).
  */
 int pcmk__crm_ipc_is_authentic_process(qb_ipcc_connection_t *qb_ipc, int sock, uid_t refuid, gid_t refgid,
                                        pid_t *gotpid, uid_t *gotuid, gid_t *gotgid);
 
 
 #endif  // CRMCOMMON_PRIVATE__H
diff --git a/lib/common/ipc_attrd.c b/lib/common/ipc_attrd.c
index e134ea42bc..aea1d9a218 100644
--- a/lib/common/ipc_attrd.c
+++ b/lib/common/ipc_attrd.c
@@ -1,271 +1,376 @@
 /*
  * Copyright 2011-2022 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU Lesser General Public License
  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
  */
 
 #ifndef _GNU_SOURCE
 #  define _GNU_SOURCE
 #endif
 
 #include <crm_internal.h>
 
 #include <stdio.h>
 
 #include <crm/crm.h>
-#include <crm/msg_xml.h>
+#include <crm/common/ipc.h>
+#include <crm/common/ipc_attrd_internal.h>
+#include <crm/common/ipc_internal.h>
 #include <crm/common/attrd_internal.h>
+#include <crm/msg_xml.h>
+#include "crmcommon_private.h"
+
+static void
+set_pairs_data(pcmk__attrd_api_reply_t *data, xmlNode *msg_data)
+{
+    const char *name = NULL;
+    pcmk__attrd_query_pair_t *pair;
+
+    name = crm_element_value(msg_data, PCMK__XA_ATTR_NAME);
+
+    for (xmlNode *node = first_named_child(msg_data, XML_CIB_TAG_NODE);
+         node != NULL; node = crm_next_same_xml(node)) {
+        pair = calloc(1, sizeof(pcmk__attrd_query_pair_t));
+
+        CRM_ASSERT(pair != NULL);
+
+        pair->node = crm_element_value(node, PCMK__XA_ATTR_NODE_NAME);
+        pair->name = name;
+        pair->value = crm_element_value(node, PCMK__XA_ATTR_VALUE);
+        data->data.pairs = g_list_prepend(data->data.pairs, pair);
+    }
+}
+
+static bool
+reply_expected(pcmk_ipc_api_t *api, xmlNode *request)
+{
+    const char *command = crm_element_value(request, PCMK__XA_TASK);
+
+    /* Only the query command gets a reply. */
+    return pcmk__str_any_of(command, PCMK__ATTRD_CMD_QUERY, NULL);
+}
+
+static bool
+dispatch(pcmk_ipc_api_t *api, xmlNode *reply)
+{
+    const char *value = NULL;
+    crm_exit_t status = CRM_EX_OK;
+
+    pcmk__attrd_api_reply_t reply_data = {
+        pcmk__attrd_reply_unknown
+    };
+
+    if (pcmk__str_eq((const char *) reply->name, "ack", pcmk__str_none)) {
+        return false;
+    }
+
+    /* Do some basic validation of the reply */
+    value = crm_element_value(reply, F_TYPE);
+    if (value == NULL || !pcmk__str_eq(value, T_ATTRD, pcmk__str_none)) {
+        crm_debug("Unrecognizable attrd message: invalid message type '%s'",
+                  crm_str(value));
+        status = CRM_EX_PROTOCOL;
+        goto done;
+    }
+
+    /* Only the query command gets a reply for now. */
+    value = crm_element_value(reply, F_SUBTYPE);
+    if (pcmk__str_eq(value, PCMK__ATTRD_CMD_QUERY, pcmk__str_null_matches)) {
+        /* This likely means the client gave us an attribute name that doesn't
+         * exist.
+         */
+        if (!xmlHasProp(reply, (pcmkXmlStr) PCMK__XA_ATTR_NAME)) {
+            crm_debug("Empty attrd message: no attribute name");
+            status = ENXIO;
+            goto done;
+        }
+
+        reply_data.reply_type = pcmk__attrd_reply_query;
+
+        set_pairs_data(&reply_data, reply);
+    } else {
+        crm_debug("Cannot handle a reply from message type '%s'",
+                  crm_str(value));
+        status = CRM_EX_PROTOCOL;
+        goto done;
+    }
+
+done:
+    pcmk__call_ipc_callback(api, pcmk_ipc_event_reply, status, &reply_data);
+
+    /* Free any reply data that was allocated */
+    if (reply_data.data.pairs) {
+        g_list_free_full(reply_data.data.pairs, free);
+    }
+
+    return false;
+}
+
+pcmk__ipc_methods_t *
+pcmk__attrd_api_methods()
+{
+    pcmk__ipc_methods_t *cmds = calloc(1, sizeof(pcmk__ipc_methods_t));
+
+    if (cmds != NULL) {
+        cmds->new_data = NULL;
+        cmds->free_data = NULL;
+        cmds->post_connect = NULL;
+        cmds->reply_expected = reply_expected;
+        cmds->dispatch = dispatch;
+    }
+    return cmds;
+}
 
 /*!
  * \internal
  * \brief Create a generic pacemaker-attrd operation
  *
  * \param[in] user_name  If not NULL, ACL user to set for operation
  *
  * \return XML of pacemaker-attrd operation
  */
 static xmlNode *
 create_attrd_op(const char *user_name)
 {
     xmlNode *attrd_op = create_xml_node(NULL, __func__);
 
     crm_xml_add(attrd_op, F_TYPE, T_ATTRD);
     crm_xml_add(attrd_op, F_ORIG, (crm_system_name? crm_system_name: "unknown"));
     crm_xml_add(attrd_op, PCMK__XA_ATTR_USER, user_name);
 
     return attrd_op;
 }
 
 /*!
  * \internal
  * \brief Send an operation to pacemaker-attrd via IPC
  *
  * \param[in] ipc       Connection to pacemaker-attrd (or create one if NULL)
  * \param[in] attrd_op  XML of pacemaker-attrd operation to send
  *
  * \return Standard Pacemaker return code
  */
 static int
 send_attrd_op(crm_ipc_t *ipc, xmlNode *attrd_op)
 {
     int rc = -ENOTCONN; // initially handled as legacy return code
     int max = 5;
 
     static gboolean connected = TRUE;
     static crm_ipc_t *local_ipc = NULL;
     static enum crm_ipc_flags flags = crm_ipc_flags_none;
 
     if (ipc == NULL && local_ipc == NULL) {
         local_ipc = crm_ipc_new(T_ATTRD, 0);
         pcmk__set_ipc_flags(flags, "client", crm_ipc_client_response);
         connected = FALSE;
     }
 
     if (ipc == NULL) {
         ipc = local_ipc;
     }
 
     while (max > 0) {
         if (connected == FALSE) {
             crm_info("Connecting to cluster... %d retries remaining", max);
             connected = crm_ipc_connect(ipc);
         }
 
         if (connected) {
             rc = crm_ipc_send(ipc, attrd_op, flags, 0, NULL);
         } else {
             crm_perror(LOG_INFO, "Connection to cluster attribute manager failed");
         }
 
         if (ipc != local_ipc) {
             break;
 
         } else if (rc > 0) {
             break;
 
         } else if (rc == -EAGAIN || rc == -EALREADY) {
             sleep(5 - max);
             max--;
 
         } else {
             crm_ipc_close(ipc);
             connected = FALSE;
             sleep(5 - max);
             max--;
         }
     }
 
     if (rc > 0) {
         rc = pcmk_ok;
     }
     return pcmk_legacy2rc(rc);
 }
 
 /*!
  * \internal
  * \brief Send a request to pacemaker-attrd
  *
  * \param[in] ipc      Connection to pacemaker-attrd (or NULL to use a local connection)
  * \param[in] command  A character indicating the type of pacemaker-attrd request:
  *                     U or v: update attribute (or refresh if name is NULL)
  *                     u: update attributes matching regular expression in name
  *                     D: delete attribute (value must be NULL)
  *                     R: refresh
  *                     B: update both attribute and its dampening
  *                     Y: update attribute dampening only
  *                     Q: query attribute
  *                     C: remove peer specified by host
  * \param[in] host     Affect only this host (or NULL for all hosts)
  * \param[in] name     Name of attribute to affect
  * \param[in] value    Attribute value to set
  * \param[in] section  Status or nodes
  * \param[in] set      ID of attribute set to use (or NULL to choose first)
  * \param[in] dampen   Attribute dampening to use with B/Y, and U/v if creating
  * \param[in] user_name ACL user to pass to pacemaker-attrd
  * \param[in] options  Bitmask of pcmk__node_attr_opts
  *
  * \return Standard Pacemaker return code
  */
 int
 pcmk__node_attr_request(crm_ipc_t *ipc, char command, const char *host,
                         const char *name, const char *value,
                         const char *section, const char *set,
                         const char *dampen, const char *user_name, int options)
 {
     int rc = pcmk_rc_ok;
     const char *task = NULL;
     const char *name_as = NULL;
     const char *display_host = (host ? host : "localhost");
     const char *display_command = NULL; /* for commands without name/value */
     xmlNode *update = create_attrd_op(user_name);
 
     /* remap common aliases */
     if (pcmk__str_eq(section, "reboot", pcmk__str_casei)) {
         section = XML_CIB_TAG_STATUS;
 
     } else if (pcmk__str_eq(section, "forever", pcmk__str_casei)) {
         section = XML_CIB_TAG_NODES;
     }
 
     if (name == NULL && command == 'U') {
         command = 'R';
     }
 
     switch (command) {
         case 'u':
             task = PCMK__ATTRD_CMD_UPDATE;
             name_as = PCMK__XA_ATTR_PATTERN;
             break;
         case 'D':
         case 'U':
         case 'v':
             task = PCMK__ATTRD_CMD_UPDATE;
             name_as = PCMK__XA_ATTR_NAME;
             break;
         case 'R':
             task = PCMK__ATTRD_CMD_REFRESH;
             display_command = "refresh";
             break;
         case 'B':
             task = PCMK__ATTRD_CMD_UPDATE_BOTH;
             name_as = PCMK__XA_ATTR_NAME;
             break;
         case 'Y':
             task = PCMK__ATTRD_CMD_UPDATE_DELAY;
             name_as = PCMK__XA_ATTR_NAME;
             break;
         case 'Q':
             task = PCMK__ATTRD_CMD_QUERY;
             name_as = PCMK__XA_ATTR_NAME;
             break;
         case 'C':
             task = PCMK__ATTRD_CMD_PEER_REMOVE;
             display_command = "purge";
             break;
     }
 
     if (name_as != NULL) {
         if (name == NULL) {
             rc = EINVAL;
             goto done;
         }
         crm_xml_add(update, name_as, name);
     }
 
     crm_xml_add(update, PCMK__XA_TASK, task);
     crm_xml_add(update, PCMK__XA_ATTR_VALUE, value);
     crm_xml_add(update, PCMK__XA_ATTR_DAMPENING, dampen);
     crm_xml_add(update, PCMK__XA_ATTR_SECTION, section);
     crm_xml_add(update, PCMK__XA_ATTR_NODE_NAME, host);
     crm_xml_add(update, PCMK__XA_ATTR_SET, set);
     crm_xml_add_int(update, PCMK__XA_ATTR_IS_REMOTE,
                     pcmk_is_set(options, pcmk__node_attr_remote));
     crm_xml_add_int(update, PCMK__XA_ATTR_IS_PRIVATE,
                     pcmk_is_set(options, pcmk__node_attr_private));
 
     rc = send_attrd_op(ipc, update);
 
 done:
     free_xml(update);
 
     if (display_command) {
         crm_debug("Asked pacemaker-attrd to %s %s: %s (%d)",
                   display_command, display_host, pcmk_rc_str(rc), rc);
     } else {
         crm_debug("Asked pacemaker-attrd to update %s=%s for %s: %s (%d)",
                   name, value, display_host, pcmk_rc_str(rc), rc);
     }
     return rc;
 }
 
 /*!
  * \internal
  * \brief Send a request to pacemaker-attrd to clear resource failure
  *
  * \param[in] ipc           Connection to pacemaker-attrd (NULL to use local connection)
  * \param[in] host          Affect only this host (or NULL for all hosts)
  * \param[in] resource      Name of resource to clear (or NULL for all)
  * \param[in] operation     Name of operation to clear (or NULL for all)
  * \param[in] interval_spec If operation is not NULL, its interval
  * \param[in] user_name     ACL user to pass to pacemaker-attrd
  * \param[in] options       Bitmask of pcmk__node_attr_opts
  *
  * \return pcmk_ok if request was successfully submitted to pacemaker-attrd, else -errno
  */
 int
 pcmk__node_attr_request_clear(crm_ipc_t *ipc, const char *host,
                               const char *resource, const char *operation,
                               const char *interval_spec, const char *user_name,
                               int options)
 {
     int rc = pcmk_rc_ok;
     xmlNode *clear_op = create_attrd_op(user_name);
     const char *interval_desc = NULL;
     const char *op_desc = NULL;
 
     crm_xml_add(clear_op, PCMK__XA_TASK, PCMK__ATTRD_CMD_CLEAR_FAILURE);
     crm_xml_add(clear_op, PCMK__XA_ATTR_NODE_NAME, host);
     crm_xml_add(clear_op, PCMK__XA_ATTR_RESOURCE, resource);
     crm_xml_add(clear_op, PCMK__XA_ATTR_OPERATION, operation);
     crm_xml_add(clear_op, PCMK__XA_ATTR_INTERVAL, interval_spec);
     crm_xml_add_int(clear_op, PCMK__XA_ATTR_IS_REMOTE,
                     pcmk_is_set(options, pcmk__node_attr_remote));
 
     rc = send_attrd_op(ipc, clear_op);
     free_xml(clear_op);
 
     if (operation) {
         interval_desc = interval_spec? interval_spec : "nonrecurring";
         op_desc = operation;
     } else {
         interval_desc = "all";
         op_desc = "operations";
     }
     crm_debug("Asked pacemaker-attrd to clear failure of %s %s for %s on %s: %s (%d)",
               interval_desc, op_desc, (resource? resource : "all resources"),
               (host? host : "all nodes"), pcmk_rc_str(rc), rc);
     return rc;
 }
diff --git a/lib/common/ipc_client.c b/lib/common/ipc_client.c
index 5922ce6cb3..ac983b8721 100644
--- a/lib/common/ipc_client.c
+++ b/lib/common/ipc_client.c
@@ -1,1551 +1,1552 @@
 /*
  * Copyright 2004-2022 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU Lesser General Public License
  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #if defined(US_AUTH_PEERCRED_UCRED) || defined(US_AUTH_PEERCRED_SOCKPEERCRED)
 #  ifdef US_AUTH_PEERCRED_UCRED
 #    ifndef _GNU_SOURCE
 #      define _GNU_SOURCE
 #    endif
 #  endif
 #  include <sys/socket.h>
 #elif defined(US_AUTH_GETPEERUCRED)
 #  include <ucred.h>
 #endif
 
 #include <stdio.h>
 #include <sys/types.h>
 #include <errno.h>
 #include <bzlib.h>
 
 #include <crm/crm.h>   /* indirectly: pcmk_err_generic */
 #include <crm/msg_xml.h>
 #include <crm/common/ipc.h>
 #include <crm/common/ipc_internal.h>
 #include "crmcommon_private.h"
 
 /*!
  * \brief Create a new object for using Pacemaker daemon IPC
  *
  * \param[out] api     Where to store new IPC object
  * \param[in]  server  Which Pacemaker daemon the object is for
  *
  * \return Standard Pacemaker result code
  *
  * \note The caller is responsible for freeing *api using pcmk_free_ipc_api().
  * \note This is intended to supersede crm_ipc_new() but currently only
  *       supports the controller, pacemakerd, and schedulerd IPC API.
  */
 int
 pcmk_new_ipc_api(pcmk_ipc_api_t **api, enum pcmk_ipc_server server)
 {
     if (api == NULL) {
         return EINVAL;
     }
 
     *api = calloc(1, sizeof(pcmk_ipc_api_t));
     if (*api == NULL) {
         return errno;
     }
 
     (*api)->server = server;
     if (pcmk_ipc_name(*api, false) == NULL) {
         pcmk_free_ipc_api(*api);
         *api = NULL;
         return EOPNOTSUPP;
     }
 
     (*api)->ipc_size_max = 0;
 
     // Set server methods and max_size (if not default)
     switch (server) {
         case pcmk_ipc_attrd:
+            (*api)->cmds = pcmk__attrd_api_methods();
             break;
 
         case pcmk_ipc_based:
             (*api)->ipc_size_max = 512 * 1024; // 512KB
             break;
 
         case pcmk_ipc_controld:
             (*api)->cmds = pcmk__controld_api_methods();
             break;
 
         case pcmk_ipc_execd:
             break;
 
         case pcmk_ipc_fenced:
             break;
 
         case pcmk_ipc_pacemakerd:
             (*api)->cmds = pcmk__pacemakerd_api_methods();
             break;
 
         case pcmk_ipc_schedulerd:
             (*api)->cmds = pcmk__schedulerd_api_methods();
             // @TODO max_size could vary by client, maybe take as argument?
             (*api)->ipc_size_max = 5 * 1024 * 1024; // 5MB
             break;
     }
     if ((*api)->cmds == NULL) {
         pcmk_free_ipc_api(*api);
         *api = NULL;
         return ENOMEM;
     }
 
     (*api)->ipc = crm_ipc_new(pcmk_ipc_name(*api, false),
                               (*api)->ipc_size_max);
     if ((*api)->ipc == NULL) {
         pcmk_free_ipc_api(*api);
         *api = NULL;
         return ENOMEM;
     }
 
     // If daemon API has its own data to track, allocate it
     if ((*api)->cmds->new_data != NULL) {
         if ((*api)->cmds->new_data(*api) != pcmk_rc_ok) {
             pcmk_free_ipc_api(*api);
             *api = NULL;
             return ENOMEM;
         }
     }
     crm_trace("Created %s API IPC object", pcmk_ipc_name(*api, true));
     return pcmk_rc_ok;
 }
 
 static void
 free_daemon_specific_data(pcmk_ipc_api_t *api)
 {
     if ((api != NULL) && (api->cmds != NULL)) {
         if ((api->cmds->free_data != NULL) && (api->api_data != NULL)) {
             api->cmds->free_data(api->api_data);
             api->api_data = NULL;
         }
         free(api->cmds);
         api->cmds = NULL;
     }
 }
 
 /*!
  * \internal
  * \brief Call an IPC API event callback, if one is registed
  *
  * \param[in] api         IPC API connection
  * \param[in] event_type  The type of event that occurred
  * \param[in] status      Event status
  * \param[in] event_data  Event-specific data
  */
 void
 pcmk__call_ipc_callback(pcmk_ipc_api_t *api, enum pcmk_ipc_event event_type,
                         crm_exit_t status, void *event_data)
 {
     if ((api != NULL) && (api->cb != NULL)) {
         api->cb(api, event_type, status, event_data, api->user_data);
     }
 }
 
 /*!
  * \internal
  * \brief Clean up after an IPC disconnect
  *
  * \param[in]  user_data  IPC API connection that disconnected
  *
  * \note This function can be used as a main loop IPC destroy callback.
  */
 static void
 ipc_post_disconnect(gpointer user_data)
 {
     pcmk_ipc_api_t *api = user_data;
 
     crm_info("Disconnected from %s IPC API", pcmk_ipc_name(api, true));
 
     // Perform any daemon-specific handling needed
     if ((api->cmds != NULL) && (api->cmds->post_disconnect != NULL)) {
         api->cmds->post_disconnect(api);
     }
 
     // Call client's registered event callback
     pcmk__call_ipc_callback(api, pcmk_ipc_event_disconnect, CRM_EX_DISCONNECT,
                             NULL);
 
     /* If this is being called from a running main loop, mainloop_gio_destroy()
      * will free ipc and mainloop_io immediately after calling this function.
      * If this is called from a stopped main loop, these will leak, so the best
      * practice is to close the connection before stopping the main loop.
      */
     api->ipc = NULL;
     api->mainloop_io = NULL;
 
     if (api->free_on_disconnect) {
         /* pcmk_free_ipc_api() has already been called, but did not free api
          * or api->cmds because this function needed them. Do that now.
          */
         free_daemon_specific_data(api);
         crm_trace("Freeing IPC API object after disconnect");
         free(api);
     }
 }
 
 /*!
  * \brief Free the contents of an IPC API object
  *
  * \param[in] api  IPC API object to free
  */
 void
 pcmk_free_ipc_api(pcmk_ipc_api_t *api)
 {
     bool free_on_disconnect = false;
 
     if (api == NULL) {
         return;
     }
     crm_debug("Releasing %s IPC API", pcmk_ipc_name(api, true));
 
     if (api->ipc != NULL) {
         if (api->mainloop_io != NULL) {
             /* We need to keep the api pointer itself around, because it is the
              * user data for the IPC client destroy callback. That will be
              * triggered by the pcmk_disconnect_ipc() call below, but it might
              * happen later in the main loop (if still running).
              *
              * This flag tells the destroy callback to free the object. It can't
              * do that unconditionally, because the application might call this
              * function after a disconnect that happened by other means.
              */
             free_on_disconnect = api->free_on_disconnect = true;
         }
         pcmk_disconnect_ipc(api); // Frees api if free_on_disconnect is true
     }
     if (!free_on_disconnect) {
         free_daemon_specific_data(api);
         crm_trace("Freeing IPC API object");
         free(api);
     }
 }
 
 /*!
  * \brief Get the IPC name used with an IPC API connection
  *
  * \param[in] api      IPC API connection
  * \param[in] for_log  If true, return human-friendly name instead of IPC name
  *
  * \return IPC API's human-friendly or connection name, or if none is available,
  *         "Pacemaker" if for_log is true and NULL if for_log is false
  */
 const char *
 pcmk_ipc_name(pcmk_ipc_api_t *api, bool for_log)
 {
     if (api == NULL) {
         return for_log? "Pacemaker" : NULL;
     }
     switch (api->server) {
         case pcmk_ipc_attrd:
-            return for_log? "attribute manager" : NULL /* T_ATTRD */;
+            return for_log? "attribute manager" : T_ATTRD;
 
         case pcmk_ipc_based:
             return for_log? "CIB manager" : NULL /* PCMK__SERVER_BASED_RW */;
 
         case pcmk_ipc_controld:
             return for_log? "controller" : CRM_SYSTEM_CRMD;
 
         case pcmk_ipc_execd:
             return for_log? "executor" : NULL /* CRM_SYSTEM_LRMD */;
 
         case pcmk_ipc_fenced:
             return for_log? "fencer" : NULL /* "stonith-ng" */;
 
         case pcmk_ipc_pacemakerd:
             return for_log? "launcher" : CRM_SYSTEM_MCP;
 
         case pcmk_ipc_schedulerd:
             return for_log? "scheduler" : CRM_SYSTEM_PENGINE;
 
         default:
             return for_log? "Pacemaker" : NULL;
     }
 }
 
 /*!
  * \brief Check whether an IPC API connection is active
  *
  * \param[in] api  IPC API connection
  *
  * \return true if IPC is connected, false otherwise
  */
 bool
 pcmk_ipc_is_connected(pcmk_ipc_api_t *api)
 {
     return (api != NULL) && crm_ipc_connected(api->ipc);
 }
 
 /*!
  * \internal
  * \brief Call the daemon-specific API's dispatch function
  *
  * Perform daemon-specific handling of IPC reply dispatch. It is the daemon
  * method's responsibility to call the client's registered event callback, as
  * well as allocate and free any event data.
  *
  * \param[in] api  IPC API connection
  */
 static bool
 call_api_dispatch(pcmk_ipc_api_t *api, xmlNode *message)
 {
     crm_log_xml_trace(message, "ipc-received");
     if ((api->cmds != NULL) && (api->cmds->dispatch != NULL)) {
         return api->cmds->dispatch(api, message);
     }
 
     return false;
 }
 
 /*!
  * \internal
  * \brief Dispatch previously read IPC data
  *
  * \param[in] buffer Data read from IPC
  * \param[in] api    IPC object
  *
  * \return Standard Pacemaker return code.  In particular:
  *
  * pcmk_rc_ok: There are no more messages expected from the server.  Quit
  *             reading.
  * EINPROGRESS: There are more messages expected from the server.  Keep reading.
  *
  * All other values indicate an error.
  */
 static int
 dispatch_ipc_data(const char *buffer, pcmk_ipc_api_t *api)
 {
     bool more = false;
     xmlNode *msg;
 
     if (buffer == NULL) {
         crm_warn("Empty message received from %s IPC",
                  pcmk_ipc_name(api, true));
         return ENOMSG;
     }
 
     msg = string2xml(buffer);
     if (msg == NULL) {
         crm_warn("Malformed message received from %s IPC",
                  pcmk_ipc_name(api, true));
         return EPROTO;
     }
 
     more = call_api_dispatch(api, msg);
     free_xml(msg);
 
     if (more) {
         return EINPROGRESS;
     } else {
         return pcmk_rc_ok;
     }
 }
 
 /*!
  * \internal
  * \brief Dispatch data read from IPC source
  *
  * \param[in] buffer     Data read from IPC
  * \param[in] length     Number of bytes of data in buffer (ignored)
  * \param[in] user_data  IPC object
  *
  * \return Always 0 (meaning connection is still required)
  *
  * \note This function can be used as a main loop IPC dispatch callback.
  */
 static int
 dispatch_ipc_source_data(const char *buffer, ssize_t length, gpointer user_data)
 {
     pcmk_ipc_api_t *api = user_data;
 
     CRM_CHECK(api != NULL, return 0);
     dispatch_ipc_data(buffer, api);
     return 0;
 }
 
 /*!
  * \brief Check whether an IPC connection has data available (without main loop)
  *
  * \param[in]  api         IPC API connection
  * \param[in]  timeout_ms  If less than 0, poll indefinitely; if 0, poll once
  *                         and return immediately; otherwise, poll for up to
  *                         this many milliseconds
  *
  * \return Standard Pacemaker return code
  *
  * \note Callers of pcmk_connect_ipc() using pcmk_ipc_dispatch_poll should call
  *       this function to check whether IPC data is available. Return values of
  *       interest include pcmk_rc_ok meaning data is available, and EAGAIN
  *       meaning no data is available; all other values indicate errors.
  * \todo This does not allow the caller to poll multiple file descriptors at
  *       once. If there is demand for that, we could add a wrapper for
  *       crm_ipc_get_fd(api->ipc), so the caller can call poll() themselves.
  */
 int
 pcmk_poll_ipc(pcmk_ipc_api_t *api, int timeout_ms)
 {
     int rc;
     struct pollfd pollfd = { 0, };
 
     if ((api == NULL) || (api->dispatch_type != pcmk_ipc_dispatch_poll)) {
         return EINVAL;
     }
     pollfd.fd = crm_ipc_get_fd(api->ipc);
     pollfd.events = POLLIN;
     rc = poll(&pollfd, 1, timeout_ms);
     if (rc < 0) {
         return errno;
     } else if (rc == 0) {
         return EAGAIN;
     }
     return pcmk_rc_ok;
 }
 
 /*!
  * \brief Dispatch available messages on an IPC connection (without main loop)
  *
  * \param[in]  api  IPC API connection
  *
  * \return Standard Pacemaker return code
  *
  * \note Callers of pcmk_connect_ipc() using pcmk_ipc_dispatch_poll should call
  *       this function when IPC data is available.
  */
 void
 pcmk_dispatch_ipc(pcmk_ipc_api_t *api)
 {
     if (api == NULL) {
         return;
     }
     while (crm_ipc_ready(api->ipc) > 0) {
         if (crm_ipc_read(api->ipc) > 0) {
             dispatch_ipc_data(crm_ipc_buffer(api->ipc), api);
         }
     }
 }
 
 // \return Standard Pacemaker return code
 static int
 connect_with_main_loop(pcmk_ipc_api_t *api)
 {
     int rc;
 
     struct ipc_client_callbacks callbacks = {
         .dispatch = dispatch_ipc_source_data,
         .destroy = ipc_post_disconnect,
     };
 
     rc = pcmk__add_mainloop_ipc(api->ipc, G_PRIORITY_DEFAULT, api,
                                 &callbacks, &(api->mainloop_io));
     if (rc != pcmk_rc_ok) {
         return rc;
     }
     crm_debug("Connected to %s IPC (attached to main loop)",
               pcmk_ipc_name(api, true));
     /* After this point, api->mainloop_io owns api->ipc, so api->ipc
      * should not be explicitly freed.
      */
     return pcmk_rc_ok;
 }
 
 // \return Standard Pacemaker return code
 static int
 connect_without_main_loop(pcmk_ipc_api_t *api)
 {
     int rc;
 
     if (!crm_ipc_connect(api->ipc)) {
         rc = errno;
         crm_ipc_close(api->ipc);
         return rc;
     }
     crm_debug("Connected to %s IPC (without main loop)",
               pcmk_ipc_name(api, true));
     return pcmk_rc_ok;
 }
 
 /*!
  * \brief Connect to a Pacemaker daemon via IPC
  *
  * \param[in]  api            IPC API instance
  * \param[out] dispatch_type  How IPC replies should be dispatched
  *
  * \return Standard Pacemaker return code
  */
 int
 pcmk_connect_ipc(pcmk_ipc_api_t *api, enum pcmk_ipc_dispatch dispatch_type)
 {
     int rc = pcmk_rc_ok;
 
     if (api == NULL) {
         crm_err("Cannot connect to uninitialized API object");
         return EINVAL;
     }
 
     if (api->ipc == NULL) {
         api->ipc = crm_ipc_new(pcmk_ipc_name(api, false),
                                   api->ipc_size_max);
         if (api->ipc == NULL) {
             crm_err("Failed to re-create IPC API");
             return ENOMEM;
         }
     }
 
     if (crm_ipc_connected(api->ipc)) {
         crm_trace("Already connected to %s IPC API", pcmk_ipc_name(api, true));
         return pcmk_rc_ok;
     }
 
     api->dispatch_type = dispatch_type;
     switch (dispatch_type) {
         case pcmk_ipc_dispatch_main:
             rc = connect_with_main_loop(api);
             break;
 
         case pcmk_ipc_dispatch_sync:
         case pcmk_ipc_dispatch_poll:
             rc = connect_without_main_loop(api);
             break;
     }
     if (rc != pcmk_rc_ok) {
         return rc;
     }
 
     if ((api->cmds != NULL) && (api->cmds->post_connect != NULL)) {
         rc = api->cmds->post_connect(api);
         if (rc != pcmk_rc_ok) {
             crm_ipc_close(api->ipc);
         }
     }
     return rc;
 }
 
 /*!
  * \brief Disconnect an IPC API instance
  *
  * \param[in]  api  IPC API connection
  *
  * \return Standard Pacemaker return code
  *
  * \note If the connection is attached to a main loop, this function should be
  *       called before quitting the main loop, to ensure that all memory is
  *       freed.
  */
 void
 pcmk_disconnect_ipc(pcmk_ipc_api_t *api)
 {
     if ((api == NULL) || (api->ipc == NULL)) {
         return;
     }
     switch (api->dispatch_type) {
         case pcmk_ipc_dispatch_main:
             {
                 mainloop_io_t *mainloop_io = api->mainloop_io;
 
                 // Make sure no code with access to api can use these again
                 api->mainloop_io = NULL;
                 api->ipc = NULL;
 
                 mainloop_del_ipc_client(mainloop_io);
                 // After this point api might have already been freed
             }
             break;
 
         case pcmk_ipc_dispatch_poll:
         case pcmk_ipc_dispatch_sync:
             {
                 crm_ipc_t *ipc = api->ipc;
 
                 // Make sure no code with access to api can use ipc again
                 api->ipc = NULL;
 
                 // This should always be the case already, but to be safe
                 api->free_on_disconnect = false;
 
                 crm_ipc_destroy(ipc);
                 ipc_post_disconnect(api);
             }
             break;
     }
 }
 
 /*!
  * \brief Register a callback for IPC API events
  *
  * \param[in] api          IPC API connection
  * \param[in] callback     Callback to register
  * \param[in] userdata     Caller data to pass to callback
  *
  * \note This function may be called multiple times to update the callback
  *       and/or user data. The caller remains responsible for freeing
  *       userdata in any case (after the IPC is disconnected, if the
  *       user data is still registered with the IPC).
  */
 void
 pcmk_register_ipc_callback(pcmk_ipc_api_t *api, pcmk_ipc_callback_t cb,
                            void *user_data)
 {
     if (api == NULL) {
         return;
     }
     api->cb = cb;
     api->user_data = user_data;
 }
 
 /*!
  * \internal
  * \brief Send an XML request across an IPC API connection
  *
  * \param[in] api          IPC API connection
  * \param[in] request      XML request to send
  *
  * \return Standard Pacemaker return code
  *
  * \note Daemon-specific IPC API functions should call this function to send
  *       requests, because it handles different dispatch types appropriately.
  */
 int
 pcmk__send_ipc_request(pcmk_ipc_api_t *api, xmlNode *request)
 {
     int rc;
     xmlNode *reply = NULL;
     enum crm_ipc_flags flags = crm_ipc_flags_none;
 
     if ((api == NULL) || (api->ipc == NULL) || (request == NULL)) {
         return EINVAL;
     }
     crm_log_xml_trace(request, "ipc-sent");
 
     // Synchronous dispatch requires waiting for a reply
     if ((api->dispatch_type == pcmk_ipc_dispatch_sync)
         && (api->cmds != NULL)
         && (api->cmds->reply_expected != NULL)
         && (api->cmds->reply_expected(api, request))) {
         flags = crm_ipc_client_response;
     }
 
     // The 0 here means a default timeout of 5 seconds
     rc = crm_ipc_send(api->ipc, request, flags, 0, &reply);
 
     if (rc < 0) {
         return pcmk_legacy2rc(rc);
     } else if (rc == 0) {
         return ENODATA;
     }
 
     // With synchronous dispatch, we dispatch any reply now
     if (reply != NULL) {
         bool more = call_api_dispatch(api, reply);
 
         free_xml(reply);
 
         while (more) {
             rc = crm_ipc_read(api->ipc);
 
             if (rc == -EAGAIN) {
                 continue;
             } else if (rc == -ENOMSG || rc == pcmk_ok) {
                 return pcmk_rc_ok;
             } else if (rc < 0) {
                 return -rc;
             }
 
             rc = dispatch_ipc_data(crm_ipc_buffer(api->ipc), api);
 
             if (rc == pcmk_rc_ok) {
                 more = false;
             } else if (rc == EINPROGRESS) {
                 more = true;
             } else {
                 continue;
             }
         }
     }
     return pcmk_rc_ok;
 }
 
 /*!
  * \internal
  * \brief Create the XML for an IPC request to purge a node from the peer cache
  *
  * \param[in]  api        IPC API connection
  * \param[in]  node_name  If not NULL, name of node to purge
  * \param[in]  nodeid     If not 0, node ID of node to purge
  *
  * \return Newly allocated IPC request XML
  *
  * \note The controller, fencer, and pacemakerd use the same request syntax, but
  *       the attribute manager uses a different one. The CIB manager doesn't
  *       have any syntax for it. The executor and scheduler don't connect to the
  *       cluster layer and thus don't have or need any syntax for it.
  *
  * \todo Modify the attribute manager to accept the common syntax (as well
  *       as its current one, for compatibility with older clients). Modify
  *       the CIB manager to accept and honor the common syntax. Modify the
  *       executor and scheduler to accept the syntax (immediately returning
  *       success), just for consistency. Modify this function to use the
  *       common syntax with all daemons if their version supports it.
  */
 static xmlNode *
 create_purge_node_request(pcmk_ipc_api_t *api, const char *node_name,
                           uint32_t nodeid)
 {
     xmlNode *request = NULL;
     const char *client = crm_system_name? crm_system_name : "client";
 
     switch (api->server) {
         case pcmk_ipc_attrd:
             request = create_xml_node(NULL, __func__);
             crm_xml_add(request, F_TYPE, T_ATTRD);
             crm_xml_add(request, F_ORIG, crm_system_name);
             crm_xml_add(request, PCMK__XA_TASK, PCMK__ATTRD_CMD_PEER_REMOVE);
             crm_xml_add(request, PCMK__XA_ATTR_NODE_NAME, node_name);
             if (nodeid > 0) {
                 crm_xml_add_int(request, PCMK__XA_ATTR_NODE_ID, (int) nodeid);
             }
             break;
 
         case pcmk_ipc_controld:
         case pcmk_ipc_fenced:
         case pcmk_ipc_pacemakerd:
             request = create_request(CRM_OP_RM_NODE_CACHE, NULL, NULL,
                                      pcmk_ipc_name(api, false), client, NULL);
             if (nodeid > 0) {
                 crm_xml_set_id(request, "%lu", (unsigned long) nodeid);
             }
             crm_xml_add(request, XML_ATTR_UNAME, node_name);
             break;
 
         case pcmk_ipc_based:
         case pcmk_ipc_execd:
         case pcmk_ipc_schedulerd:
             break;
     }
     return request;
 }
 
 /*!
  * \brief Ask a Pacemaker daemon to purge a node from its peer cache
  *
  * \param[in]  api        IPC API connection
  * \param[in]  node_name  If not NULL, name of node to purge
  * \param[in]  nodeid     If not 0, node ID of node to purge
  *
  * \return Standard Pacemaker return code
  *
  * \note At least one of node_name or nodeid must be specified.
  */
 int
 pcmk_ipc_purge_node(pcmk_ipc_api_t *api, const char *node_name, uint32_t nodeid)
 {
     int rc = 0;
     xmlNode *request = NULL;
 
     if (api == NULL) {
         return EINVAL;
     }
     if ((node_name == NULL) && (nodeid == 0)) {
         return EINVAL;
     }
 
     request = create_purge_node_request(api, node_name, nodeid);
     if (request == NULL) {
         return EOPNOTSUPP;
     }
     rc = pcmk__send_ipc_request(api, request);
     free_xml(request);
 
     crm_debug("%s peer cache purge of node %s[%lu]: rc=%d",
               pcmk_ipc_name(api, true), node_name, (unsigned long) nodeid, rc);
     return rc;
 }
 
 /*
  * Generic IPC API (to eventually be deprecated as public API and made internal)
  */
 
 struct crm_ipc_s {
     struct pollfd pfd;
     unsigned int max_buf_size; // maximum bytes we can send or receive over IPC
     unsigned int buf_size;     // size of allocated buffer
     int msg_size;
     int need_reply;
     char *buffer;
     char *server_name;          // server IPC name being connected to
     qb_ipcc_connection_t *ipc;
 };
 
 /*!
  * \brief Create a new (legacy) object for using Pacemaker daemon IPC
  *
  * \param[in] name      IPC system name to connect to
  * \param[in] max_size  Use a maximum IPC buffer size of at least this size
  *
  * \return Newly allocated IPC object on success, NULL otherwise
  *
  * \note The caller is responsible for freeing the result using
  *       crm_ipc_destroy().
  * \note This should be considered deprecated for use with daemons supported by
  *       pcmk_new_ipc_api().
  */
 crm_ipc_t *
 crm_ipc_new(const char *name, size_t max_size)
 {
     crm_ipc_t *client = NULL;
 
     client = calloc(1, sizeof(crm_ipc_t));
     if (client == NULL) {
         crm_err("Could not create IPC connection: %s", strerror(errno));
         return NULL;
     }
 
     client->server_name = strdup(name);
     if (client->server_name == NULL) {
         crm_err("Could not create %s IPC connection: %s",
                 name, strerror(errno));
         free(client);
         return NULL;
     }
     client->buf_size = pcmk__ipc_buffer_size(max_size);
     client->buffer = malloc(client->buf_size);
     if (client->buffer == NULL) {
         crm_err("Could not create %s IPC connection: %s",
                 name, strerror(errno));
         free(client->server_name);
         free(client);
         return NULL;
     }
 
     /* Clients initiating connection pick the max buf size */
     client->max_buf_size = client->buf_size;
 
     client->pfd.fd = -1;
     client->pfd.events = POLLIN;
     client->pfd.revents = 0;
 
     return client;
 }
 
 /*!
  * \brief Establish an IPC connection to a Pacemaker component
  *
  * \param[in] client  Connection instance obtained from crm_ipc_new()
  *
  * \return TRUE on success, FALSE otherwise (in which case errno will be set;
  *         specifically, in case of discovering the remote side is not
  *         authentic, its value is set to ECONNABORTED).
  */
 bool
 crm_ipc_connect(crm_ipc_t * client)
 {
     uid_t cl_uid = 0;
     gid_t cl_gid = 0;
     pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0;
     int rv;
 
     client->need_reply = FALSE;
     client->ipc = qb_ipcc_connect(client->server_name, client->buf_size);
 
     if (client->ipc == NULL) {
         crm_debug("Could not establish %s IPC connection: %s (%d)",
                   client->server_name, pcmk_rc_str(errno), errno);
         return FALSE;
     }
 
     client->pfd.fd = crm_ipc_get_fd(client);
     if (client->pfd.fd < 0) {
         rv = errno;
         /* message already omitted */
         crm_ipc_close(client);
         errno = rv;
         return FALSE;
     }
 
     rv = pcmk_daemon_user(&cl_uid, &cl_gid);
     if (rv < 0) {
         /* message already omitted */
         crm_ipc_close(client);
         errno = -rv;
         return FALSE;
     }
 
     if ((rv = pcmk__crm_ipc_is_authentic_process(client->ipc, client->pfd.fd, cl_uid, cl_gid,
                                                   &found_pid, &found_uid,
                                                   &found_gid)) == pcmk_rc_ipc_unauthorized) {
         crm_err("%s IPC provider authentication failed: process %lld has "
                 "uid %lld (expected %lld) and gid %lld (expected %lld)",
                 client->server_name,
                 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
                 (long long) found_uid, (long long) cl_uid,
                 (long long) found_gid, (long long) cl_gid);
         crm_ipc_close(client);
         errno = ECONNABORTED;
         return FALSE;
 
     } else if (rv != pcmk_rc_ok) {
         crm_perror(LOG_ERR, "Could not verify authenticity of %s IPC provider",
                    client->server_name);
         crm_ipc_close(client);
         if (rv > 0) {
             errno = rv;
         } else {
             errno = ENOTCONN;
         }
         return FALSE;
     }
 
     qb_ipcc_context_set(client->ipc, client);
 
     client->max_buf_size = qb_ipcc_get_buffer_size(client->ipc);
     if (client->max_buf_size > client->buf_size) {
         free(client->buffer);
         client->buffer = calloc(1, client->max_buf_size);
         client->buf_size = client->max_buf_size;
     }
     return TRUE;
 }
 
 void
 crm_ipc_close(crm_ipc_t * client)
 {
     if (client) {
         if (client->ipc) {
             qb_ipcc_connection_t *ipc = client->ipc;
 
             client->ipc = NULL;
             qb_ipcc_disconnect(ipc);
         }
     }
 }
 
 void
 crm_ipc_destroy(crm_ipc_t * client)
 {
     if (client) {
         if (client->ipc && qb_ipcc_is_connected(client->ipc)) {
             crm_notice("Destroying active %s IPC connection",
                        client->server_name);
             /* The next line is basically unsafe
              *
              * If this connection was attached to mainloop and mainloop is active,
              *   the 'disconnected' callback will end up back here and we'll end
              *   up free'ing the memory twice - something that can still happen
              *   even without this if we destroy a connection and it closes before
              *   we call exit
              */
             /* crm_ipc_close(client); */
         } else {
             crm_trace("Destroying inactive %s IPC connection",
                       client->server_name);
         }
         free(client->buffer);
         free(client->server_name);
         free(client);
     }
 }
 
 int
 crm_ipc_get_fd(crm_ipc_t * client)
 {
     int fd = 0;
 
     if (client && client->ipc && (qb_ipcc_fd_get(client->ipc, &fd) == 0)) {
         return fd;
     }
     errno = EINVAL;
     crm_perror(LOG_ERR, "Could not obtain file descriptor for %s IPC",
                (client? client->server_name : "unspecified"));
     return -errno;
 }
 
 bool
 crm_ipc_connected(crm_ipc_t * client)
 {
     bool rc = FALSE;
 
     if (client == NULL) {
         crm_trace("No client");
         return FALSE;
 
     } else if (client->ipc == NULL) {
         crm_trace("No connection");
         return FALSE;
 
     } else if (client->pfd.fd < 0) {
         crm_trace("Bad descriptor");
         return FALSE;
     }
 
     rc = qb_ipcc_is_connected(client->ipc);
     if (rc == FALSE) {
         client->pfd.fd = -EINVAL;
     }
     return rc;
 }
 
 /*!
  * \brief Check whether an IPC connection is ready to be read
  *
  * \param[in] client  Connection to check
  *
  * \return Positive value if ready to be read, 0 if not ready, -errno on error
  */
 int
 crm_ipc_ready(crm_ipc_t *client)
 {
     int rc;
 
     CRM_ASSERT(client != NULL);
 
     if (crm_ipc_connected(client) == FALSE) {
         return -ENOTCONN;
     }
 
     client->pfd.revents = 0;
     rc = poll(&(client->pfd), 1, 0);
     return (rc < 0)? -errno : rc;
 }
 
 // \return Standard Pacemaker return code
 static int
 crm_ipc_decompress(crm_ipc_t * client)
 {
     pcmk__ipc_header_t *header = (pcmk__ipc_header_t *)(void*)client->buffer;
 
     if (header->size_compressed) {
         int rc = 0;
         unsigned int size_u = 1 + header->size_uncompressed;
         /* never let buf size fall below our max size required for ipc reads. */
         unsigned int new_buf_size = QB_MAX((sizeof(pcmk__ipc_header_t) + size_u), client->max_buf_size);
         char *uncompressed = calloc(1, new_buf_size);
 
         crm_trace("Decompressing message data %u bytes into %u bytes",
                  header->size_compressed, size_u);
 
         rc = BZ2_bzBuffToBuffDecompress(uncompressed + sizeof(pcmk__ipc_header_t), &size_u,
                                         client->buffer + sizeof(pcmk__ipc_header_t), header->size_compressed, 1, 0);
 
         if (rc != BZ_OK) {
             crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
                     bz2_strerror(rc), rc);
             free(uncompressed);
             return EILSEQ;
         }
 
         /*
          * This assert no longer holds true.  For an identical msg, some clients may
          * require compression, and others may not. If that same msg (event) is sent
          * to multiple clients, it could result in some clients receiving a compressed
          * msg even though compression was not explicitly required for them.
          *
          * CRM_ASSERT((header->size_uncompressed + sizeof(pcmk__ipc_header_t)) >= ipc_buffer_max);
          */
         CRM_ASSERT(size_u == header->size_uncompressed);
 
         memcpy(uncompressed, client->buffer, sizeof(pcmk__ipc_header_t));       /* Preserve the header */
         header = (pcmk__ipc_header_t *)(void*)uncompressed;
 
         free(client->buffer);
         client->buf_size = new_buf_size;
         client->buffer = uncompressed;
     }
 
     CRM_ASSERT(client->buffer[sizeof(pcmk__ipc_header_t) + header->size_uncompressed - 1] == 0);
     return pcmk_rc_ok;
 }
 
 long
 crm_ipc_read(crm_ipc_t * client)
 {
     pcmk__ipc_header_t *header = NULL;
 
     CRM_ASSERT(client != NULL);
     CRM_ASSERT(client->ipc != NULL);
     CRM_ASSERT(client->buffer != NULL);
 
     client->buffer[0] = 0;
     client->msg_size = qb_ipcc_event_recv(client->ipc, client->buffer,
                                           client->buf_size, 0);
     if (client->msg_size >= 0) {
         int rc = crm_ipc_decompress(client);
 
         if (rc != pcmk_rc_ok) {
             return pcmk_rc2legacy(rc);
         }
 
         header = (pcmk__ipc_header_t *)(void*)client->buffer;
         if (!pcmk__valid_ipc_header(header)) {
             return -EBADMSG;
         }
 
         crm_trace("Received %s IPC event %d size=%u rc=%d text='%.100s'",
                   client->server_name, header->qb.id, header->qb.size,
                   client->msg_size,
                   client->buffer + sizeof(pcmk__ipc_header_t));
 
     } else {
         crm_trace("No message received from %s IPC: %s",
                   client->server_name, pcmk_strerror(client->msg_size));
 
         if (client->msg_size == -EAGAIN) {
             return -EAGAIN;
         }
     }
 
     if (crm_ipc_connected(client) == FALSE || client->msg_size == -ENOTCONN) {
         crm_err("Connection to %s IPC failed", client->server_name);
     }
 
     if (header) {
         /* Data excluding the header */
         return header->size_uncompressed;
     }
     return -ENOMSG;
 }
 
 const char *
 crm_ipc_buffer(crm_ipc_t * client)
 {
     CRM_ASSERT(client != NULL);
     return client->buffer + sizeof(pcmk__ipc_header_t);
 }
 
 uint32_t
 crm_ipc_buffer_flags(crm_ipc_t * client)
 {
     pcmk__ipc_header_t *header = NULL;
 
     CRM_ASSERT(client != NULL);
     if (client->buffer == NULL) {
         return 0;
     }
 
     header = (pcmk__ipc_header_t *)(void*)client->buffer;
     return header->flags;
 }
 
 const char *
 crm_ipc_name(crm_ipc_t * client)
 {
     CRM_ASSERT(client != NULL);
     return client->server_name;
 }
 
 // \return Standard Pacemaker return code
 static int
 internal_ipc_get_reply(crm_ipc_t *client, int request_id, int ms_timeout,
                        ssize_t *bytes)
 {
     time_t timeout = time(NULL) + 1 + (ms_timeout / 1000);
     int rc = pcmk_rc_ok;
 
     /* get the reply */
     crm_trace("Waiting on reply to %s IPC message %d",
               client->server_name, request_id);
     do {
 
         *bytes = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, 1000);
         if (*bytes > 0) {
             pcmk__ipc_header_t *hdr = NULL;
 
             rc = crm_ipc_decompress(client);
             if (rc != pcmk_rc_ok) {
                 return rc;
             }
 
             hdr = (pcmk__ipc_header_t *)(void*)client->buffer;
             if (hdr->qb.id == request_id) {
                 /* Got it */
                 break;
             } else if (hdr->qb.id < request_id) {
                 xmlNode *bad = string2xml(crm_ipc_buffer(client));
 
                 crm_err("Discarding old reply %d (need %d)", hdr->qb.id, request_id);
                 crm_log_xml_notice(bad, "OldIpcReply");
 
             } else {
                 xmlNode *bad = string2xml(crm_ipc_buffer(client));
 
                 crm_err("Discarding newer reply %d (need %d)", hdr->qb.id, request_id);
                 crm_log_xml_notice(bad, "ImpossibleReply");
                 CRM_ASSERT(hdr->qb.id <= request_id);
             }
         } else if (crm_ipc_connected(client) == FALSE) {
             crm_err("%s IPC provider disconnected while waiting for message %d",
                     client->server_name, request_id);
             break;
         }
 
     } while (time(NULL) < timeout);
 
     if (*bytes < 0) {
         rc = (int) -*bytes; // System errno
     }
     return rc;
 }
 
 /*!
  * \brief Send an IPC XML message
  *
  * \param[in]  client      Connection to IPC server
  * \param[in]  message     XML message to send
  * \param[in]  flags       Bitmask of crm_ipc_flags
  * \param[in]  ms_timeout  Give up if not sent within this much time
  *                         (5 seconds if 0, or no timeout if negative)
  * \param[out] reply       Reply from server (or NULL if none)
  *
  * \return Negative errno on error, otherwise size of reply received in bytes
  *         if reply was needed, otherwise number of bytes sent
  */
 int
 crm_ipc_send(crm_ipc_t * client, xmlNode * message, enum crm_ipc_flags flags, int32_t ms_timeout,
              xmlNode ** reply)
 {
     int rc = 0;
     ssize_t qb_rc = 0;
     ssize_t bytes = 0;
     struct iovec *iov;
     static uint32_t id = 0;
     static int factor = 8;
     pcmk__ipc_header_t *header;
 
     if (client == NULL) {
         crm_notice("Can't send IPC request without connection (bug?): %.100s",
                    message);
         return -ENOTCONN;
 
     } else if (crm_ipc_connected(client) == FALSE) {
         /* Don't even bother */
         crm_notice("Can't send %s IPC requests: Connection closed",
                    client->server_name);
         return -ENOTCONN;
     }
 
     if (ms_timeout == 0) {
         ms_timeout = 5000;
     }
 
     if (client->need_reply) {
         qb_rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, ms_timeout);
         if (qb_rc < 0) {
             crm_warn("Sending %s IPC disabled until pending reply received",
                      client->server_name);
             return -EALREADY;
 
         } else {
             crm_notice("Sending %s IPC re-enabled after pending reply received",
                        client->server_name);
             client->need_reply = FALSE;
         }
     }
 
     id++;
     CRM_LOG_ASSERT(id != 0); /* Crude wrap-around detection */
     rc = pcmk__ipc_prepare_iov(id, message, client->max_buf_size, &iov, &bytes);
     if (rc != pcmk_rc_ok) {
         crm_warn("Couldn't prepare %s IPC request: %s " CRM_XS " rc=%d",
                  client->server_name, pcmk_rc_str(rc), rc);
         return pcmk_rc2legacy(rc);
     }
 
     header = iov[0].iov_base;
     pcmk__set_ipc_flags(header->flags, client->server_name, flags);
 
     if (pcmk_is_set(flags, crm_ipc_proxied)) {
         /* Don't look for a synchronous response */
         pcmk__clear_ipc_flags(flags, "client", crm_ipc_client_response);
     }
 
     if(header->size_compressed) {
         if(factor < 10 && (client->max_buf_size / 10) < (bytes / factor)) {
             crm_notice("Compressed message exceeds %d0%% of configured IPC "
                        "limit (%u bytes); consider setting PCMK_ipc_buffer to "
                        "%u or higher",
                        factor, client->max_buf_size, 2 * client->max_buf_size);
             factor++;
         }
     }
 
     crm_trace("Sending %s IPC request %d of %u bytes using %dms timeout",
               client->server_name, header->qb.id, header->qb.size, ms_timeout);
 
     if ((ms_timeout > 0) || !pcmk_is_set(flags, crm_ipc_client_response)) {
 
         time_t timeout = time(NULL) + 1 + (ms_timeout / 1000);
 
         do {
             /* @TODO Is this check really needed? Won't qb_ipcc_sendv() return
              * an error if it's not connected?
              */
             if (!crm_ipc_connected(client)) {
                 goto send_cleanup;
             }
 
             qb_rc = qb_ipcc_sendv(client->ipc, iov, 2);
         } while ((qb_rc == -EAGAIN) && (time(NULL) < timeout));
 
         rc = (int) qb_rc; // Negative of system errno, or bytes sent
         if (qb_rc <= 0) {
             goto send_cleanup;
 
         } else if (!pcmk_is_set(flags, crm_ipc_client_response)) {
             crm_trace("Not waiting for reply to %s IPC request %d",
                       client->server_name, header->qb.id);
             goto send_cleanup;
         }
 
         rc = internal_ipc_get_reply(client, header->qb.id, ms_timeout, &bytes);
         if (rc != pcmk_rc_ok) {
             /* We didn't get the reply in time, so disable future sends for now.
              * The only alternative would be to close the connection since we
              * don't know how to detect and discard out-of-sequence replies.
              *
              * @TODO Implement out-of-sequence detection
              */
             client->need_reply = TRUE;
         }
         rc = (int) bytes; // Negative system errno, or size of reply received
 
     } else {
         // No timeout, and client response needed
         do {
             qb_rc = qb_ipcc_sendv_recv(client->ipc, iov, 2, client->buffer,
                                        client->buf_size, -1);
         } while ((qb_rc == -EAGAIN) && crm_ipc_connected(client));
         rc = (int) qb_rc; // Negative system errno, or size of reply received
     }
 
     if (rc > 0) {
         pcmk__ipc_header_t *hdr = (pcmk__ipc_header_t *)(void*)client->buffer;
 
         crm_trace("Received %d-byte reply %d to %s IPC %d: %.100s",
                   rc, hdr->qb.id, client->server_name, header->qb.id,
                   crm_ipc_buffer(client));
 
         if (reply) {
             *reply = string2xml(crm_ipc_buffer(client));
         }
 
     } else {
         crm_trace("No reply to %s IPC %d: rc=%d",
                   client->server_name, header->qb.id, rc);
     }
 
   send_cleanup:
     if (crm_ipc_connected(client) == FALSE) {
         crm_notice("Couldn't send %s IPC request %d: Connection closed "
                    CRM_XS " rc=%d", client->server_name, header->qb.id, rc);
 
     } else if (rc == -ETIMEDOUT) {
         crm_warn("%s IPC request %d failed: %s after %dms " CRM_XS " rc=%d",
                  client->server_name, header->qb.id, pcmk_strerror(rc),
                  ms_timeout, rc);
         crm_write_blackbox(0, NULL);
 
     } else if (rc <= 0) {
         crm_warn("%s IPC request %d failed: %s " CRM_XS " rc=%d",
                  client->server_name, header->qb.id,
                  ((rc == 0)? "No bytes sent" : pcmk_strerror(rc)), rc);
     }
 
     pcmk_free_ipc_event(iov);
     return rc;
 }
 
 int
 pcmk__crm_ipc_is_authentic_process(qb_ipcc_connection_t *qb_ipc, int sock, uid_t refuid, gid_t refgid,
                                    pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
 {
     int ret = 0;
     pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0;
 #if defined(US_AUTH_PEERCRED_UCRED)
     struct ucred ucred;
     socklen_t ucred_len = sizeof(ucred);
 #endif
 
 #ifdef HAVE_QB_IPCC_AUTH_GET
     if (qb_ipc && !qb_ipcc_auth_get(qb_ipc, &found_pid, &found_uid, &found_gid)) {
         goto do_checks;
     }
 #endif
 
 #if defined(US_AUTH_PEERCRED_UCRED)
     if (!getsockopt(sock, SOL_SOCKET, SO_PEERCRED,
                     &ucred, &ucred_len)
                 && ucred_len == sizeof(ucred)) {
         found_pid = ucred.pid; found_uid = ucred.uid; found_gid = ucred.gid;
 
 #elif defined(US_AUTH_PEERCRED_SOCKPEERCRED)
     struct sockpeercred sockpeercred;
     socklen_t sockpeercred_len = sizeof(sockpeercred);
 
     if (!getsockopt(sock, SOL_SOCKET, SO_PEERCRED,
                     &sockpeercred, &sockpeercred_len)
                 && sockpeercred_len == sizeof(sockpeercred_len)) {
         found_pid = sockpeercred.pid;
         found_uid = sockpeercred.uid; found_gid = sockpeercred.gid;
 
 #elif defined(US_AUTH_GETPEEREID)
     if (!getpeereid(sock, &found_uid, &found_gid)) {
         found_pid = PCMK__SPECIAL_PID;  /* cannot obtain PID (FreeBSD) */
 
 #elif defined(US_AUTH_GETPEERUCRED)
     ucred_t *ucred;
     if (!getpeerucred(sock, &ucred)) {
         errno = 0;
         found_pid = ucred_getpid(ucred);
         found_uid = ucred_geteuid(ucred); found_gid = ucred_getegid(ucred);
         ret = -errno;
         ucred_free(ucred);
         if (ret) {
             return (ret < 0) ? ret : -pcmk_err_generic;
         }
 
 #else
 #  error "No way to authenticate a Unix socket peer"
     errno = 0;
     if (0) {
 #endif
 #ifdef HAVE_QB_IPCC_AUTH_GET
     do_checks:
 #endif
         if (gotpid != NULL) {
             *gotpid = found_pid;
         }
         if (gotuid != NULL) {
             *gotuid = found_uid;
         }
         if (gotgid != NULL) {
             *gotgid = found_gid;
         }
         if (found_uid == 0 || found_uid == refuid || found_gid == refgid) {
 		ret = 0;
         } else {
                 ret = pcmk_rc_ipc_unauthorized;
         }
     } else {
         ret = (errno > 0) ? errno : pcmk_rc_error;
     }
     return ret;
 }
 
 int
 crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid,
                              pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
 {
     int ret  = pcmk__crm_ipc_is_authentic_process(NULL, sock, refuid, refgid,
                                                   gotpid, gotuid, gotgid);
 
     /* The old function had some very odd return codes*/
     if (ret == 0) {
         return 1;
     } else if (ret == pcmk_rc_ipc_unauthorized) {
         return 0;
     } else {
         return pcmk_rc2legacy(ret);
     }
 }
 
 int
 pcmk__ipc_is_authentic_process_active(const char *name, uid_t refuid,
                                       gid_t refgid, pid_t *gotpid)
 {
     static char last_asked_name[PATH_MAX / 2] = "";  /* log spam prevention */
     int fd;
     int rc = pcmk_rc_ipc_unresponsive;
     int auth_rc = 0;
     int32_t qb_rc;
     pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0;
     qb_ipcc_connection_t *c;
 #ifdef HAVE_QB_IPCC_CONNECT_ASYNC
     struct pollfd pollfd = { 0, };
     int poll_rc;
 
     c = qb_ipcc_connect_async(name, 0,
                               &(pollfd.fd));
 #else
     c = qb_ipcc_connect(name, 0);
 #endif
     if (c == NULL) {
         crm_info("Could not connect to %s IPC: %s", name, strerror(errno));
         rc = pcmk_rc_ipc_unresponsive;
         goto bail;
     }
 #ifdef HAVE_QB_IPCC_CONNECT_ASYNC
     pollfd.events = POLLIN;
     do {
         poll_rc = poll(&pollfd, 1, 2000);
     } while ((poll_rc == -1) && (errno == EINTR));
     if ((poll_rc <= 0) || (qb_ipcc_connect_continue(c) != 0)) {
         crm_info("Could not connect to %s IPC: %s", name,
                  (poll_rc == 0)?"timeout":strerror(errno));
         rc = pcmk_rc_ipc_unresponsive;
         if (poll_rc > 0) {
             c = NULL; // qb_ipcc_connect_continue cleaned up for us
         }
         goto bail;
     }
 #endif
 
     qb_rc = qb_ipcc_fd_get(c, &fd);
     if (qb_rc != 0) {
         rc = (int) -qb_rc; // System errno
         crm_err("Could not get fd from %s IPC: %s " CRM_XS " rc=%d",
                 name, pcmk_rc_str(rc), rc);
         goto bail;
     }
 
     auth_rc = pcmk__crm_ipc_is_authentic_process(c, fd, refuid, refgid, &found_pid,
                                                  &found_uid, &found_gid);
     if (auth_rc == pcmk_rc_ipc_unauthorized) {
         crm_err("Daemon (IPC %s) effectively blocked with unauthorized"
                 " process %lld (uid: %lld, gid: %lld)",
                 name, (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
                 (long long) found_uid, (long long) found_gid);
         rc = pcmk_rc_ipc_unauthorized;
         goto bail;
     }
 
     if (auth_rc != pcmk_rc_ok) {
         rc = auth_rc;
         crm_err("Could not get peer credentials from %s IPC: %s "
                 CRM_XS " rc=%d", name, pcmk_rc_str(rc), rc);
         goto bail;
     }
 
     if (gotpid != NULL) {
         *gotpid = found_pid;
     }
 
     rc = pcmk_rc_ok;
     if ((found_uid != refuid || found_gid != refgid)
             && strncmp(last_asked_name, name, sizeof(last_asked_name))) {
         if ((found_uid == 0) && (refuid != 0)) {
             crm_warn("Daemon (IPC %s) runs as root, whereas the expected"
                      " credentials are %lld:%lld, hazard of violating"
                      " the least privilege principle",
                      name, (long long) refuid, (long long) refgid);
         } else {
             crm_notice("Daemon (IPC %s) runs as %lld:%lld, whereas the"
                        " expected credentials are %lld:%lld, which may"
                        " mean a different set of privileges than expected",
                        name, (long long) found_uid, (long long) found_gid,
                        (long long) refuid, (long long) refgid);
         }
         memccpy(last_asked_name, name, '\0', sizeof(last_asked_name));
     }
 
 bail:
     if (c != NULL) {
         qb_ipcc_disconnect(c);
     }
     return rc;
 }