diff --git a/daemons/attrd/attrd_ipc.c b/daemons/attrd/attrd_ipc.c index 43e0f41824..be87c00fa0 100644 --- a/daemons/attrd/attrd_ipc.c +++ b/daemons/attrd/attrd_ipc.c @@ -1,657 +1,657 @@ /* * Copyright 2004-2025 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU General Public License version 2 * or later (GPLv2+) WITHOUT ANY WARRANTY. */ #include #include #include #include #include // PRIu32 #include #include #include #include #include #include #include #include #include #include #include "pacemaker-attrd.h" static qb_ipcs_service_t *ipcs = NULL; /*! * \internal * \brief Build the XML reply to a client query * * \param[in] attr Name of requested attribute * \param[in] host Name of requested host (or NULL for all hosts) * * \return New XML reply * \note Caller is responsible for freeing the resulting XML */ static xmlNode *build_query_reply(const char *attr, const char *host) { xmlNode *reply = pcmk__xe_create(NULL, __func__); attribute_t *a; crm_xml_add(reply, PCMK__XA_T, PCMK__VALUE_ATTRD); crm_xml_add(reply, PCMK__XA_SUBT, PCMK__ATTRD_CMD_QUERY); crm_xml_add(reply, PCMK__XA_ATTR_VERSION, ATTRD_PROTOCOL_VERSION); /* If desired attribute exists, add its value(s) to the reply */ a = g_hash_table_lookup(attributes, attr); if (a) { attribute_value_t *v; xmlNode *host_value; crm_xml_add(reply, PCMK__XA_ATTR_NAME, attr); /* Allow caller to use "localhost" to refer to local node */ if (pcmk__str_eq(host, "localhost", pcmk__str_casei)) { host = attrd_cluster->priv->node_name; crm_trace("Mapped localhost to %s", host); } /* If a specific node was requested, add its value */ if (host) { v = g_hash_table_lookup(a->values, host); host_value = pcmk__xe_create(reply, PCMK_XE_NODE); crm_xml_add(host_value, PCMK__XA_ATTR_HOST, host); crm_xml_add(host_value, PCMK__XA_ATTR_VALUE, (v? v->current : NULL)); /* Otherwise, add all nodes' values */ } else { GHashTableIter iter; g_hash_table_iter_init(&iter, a->values); while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &v)) { host_value = pcmk__xe_create(reply, PCMK_XE_NODE); crm_xml_add(host_value, PCMK__XA_ATTR_HOST, v->nodename); crm_xml_add(host_value, PCMK__XA_ATTR_VALUE, v->current); } } } return reply; } xmlNode * attrd_client_clear_failure(pcmk__request_t *request) { xmlNode *xml = request->xml; const char *rsc, *op, *interval_spec; if (minimum_protocol_version >= 2) { /* Propagate to all peers (including ourselves). * This ends up at attrd_peer_message(). */ attrd_send_message(NULL, xml, false); pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL); return NULL; } rsc = crm_element_value(xml, PCMK__XA_ATTR_RESOURCE); op = crm_element_value(xml, PCMK__XA_ATTR_CLEAR_OPERATION); interval_spec = crm_element_value(xml, PCMK__XA_ATTR_CLEAR_INTERVAL); /* Map this to an update */ crm_xml_add(xml, PCMK_XA_TASK, PCMK__ATTRD_CMD_UPDATE); /* Add regular expression matching desired attributes */ if (rsc) { char *pattern; if (op == NULL) { pattern = crm_strdup_printf(ATTRD_RE_CLEAR_ONE, rsc); } else { guint interval_ms = 0U; pcmk_parse_interval_spec(interval_spec, &interval_ms); pattern = crm_strdup_printf(ATTRD_RE_CLEAR_OP, rsc, op, interval_ms); } crm_xml_add(xml, PCMK__XA_ATTR_REGEX, pattern); free(pattern); } else { crm_xml_add(xml, PCMK__XA_ATTR_REGEX, ATTRD_RE_CLEAR_ALL); } /* Make sure attribute and value are not set, so we delete via regex */ pcmk__xe_remove_attr(xml, PCMK__XA_ATTR_NAME); pcmk__xe_remove_attr(xml, PCMK__XA_ATTR_VALUE); return attrd_client_update(request); } xmlNode * attrd_client_peer_remove(pcmk__request_t *request) { xmlNode *xml = request->xml; // Host and ID are not used in combination, rather host has precedence const char *host = crm_element_value(xml, PCMK__XA_ATTR_HOST); char *host_alloc = NULL; attrd_send_ack(request->ipc_client, request->ipc_id, request->ipc_flags); if (host == NULL) { int nodeid = 0; crm_element_value_int(xml, PCMK__XA_ATTR_HOST_ID, &nodeid); if (nodeid > 0) { pcmk__node_status_t *node = NULL; char *host_alloc = NULL; node = pcmk__search_node_caches(nodeid, NULL, NULL, pcmk__node_search_cluster_member); if ((node != NULL) && (node->name != NULL)) { // Use cached name if available host = node->name; } else { // Otherwise ask cluster layer host_alloc = pcmk__cluster_node_name(nodeid); host = host_alloc; } crm_xml_add(xml, PCMK__XA_ATTR_HOST, host); } } if (host) { crm_info("Client %s is requesting all values for %s be removed", pcmk__client_name(request->ipc_client), host); attrd_send_message(NULL, xml, false); /* ends up at attrd_peer_message() */ free(host_alloc); } else { crm_info("Ignoring request by client %s to remove all peer values without specifying peer", pcmk__client_name(request->ipc_client)); } pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL); return NULL; } xmlNode * attrd_client_query(pcmk__request_t *request) { xmlNode *query = request->xml; xmlNode *reply = NULL; const char *attr = NULL; crm_debug("Query arrived from %s", pcmk__client_name(request->ipc_client)); /* Request must specify attribute name to query */ attr = crm_element_value(query, PCMK__XA_ATTR_NAME); if (attr == NULL) { pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR, "Ignoring malformed query from %s (no attribute name given)", pcmk__client_name(request->ipc_client)); return NULL; } /* Build the XML reply */ reply = build_query_reply(attr, crm_element_value(query, PCMK__XA_ATTR_HOST)); if (reply == NULL) { pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR, "Could not respond to query from %s: could not create XML reply", pcmk__client_name(request->ipc_client)); return NULL; } else { pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL); } request->ipc_client->request_id = 0; return reply; } xmlNode * attrd_client_refresh(pcmk__request_t *request) { crm_info("Updating all attributes"); attrd_send_ack(request->ipc_client, request->ipc_id, request->ipc_flags); attrd_write_attributes(attrd_write_all|attrd_write_no_delay); pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL); return NULL; } static void handle_missing_host(xmlNode *xml) { if (crm_element_value(xml, PCMK__XA_ATTR_HOST) == NULL) { crm_trace("Inferring local node %s with XML ID %s", attrd_cluster->priv->node_name, attrd_cluster->priv->node_xml_id); crm_xml_add(xml, PCMK__XA_ATTR_HOST, attrd_cluster->priv->node_name); crm_xml_add(xml, PCMK__XA_ATTR_HOST_ID, attrd_cluster->priv->node_xml_id); } } /* Convert a single IPC message with a regex into one with multiple children, one * for each regex match. */ static int expand_regexes(xmlNode *xml, const char *attr, const char *value, const char *regex) { if (attr == NULL && regex) { bool matched = false; GHashTableIter aIter; regex_t r_patt; crm_debug("Setting %s to %s", regex, value); if (regcomp(&r_patt, regex, REG_EXTENDED|REG_NOSUB)) { return EINVAL; } g_hash_table_iter_init(&aIter, attributes); while (g_hash_table_iter_next(&aIter, (gpointer *) & attr, NULL)) { int status = regexec(&r_patt, attr, 0, NULL, 0); if (status == 0) { xmlNode *child = pcmk__xe_create(xml, PCMK_XE_OP); crm_trace("Matched %s with %s", attr, regex); matched = true; /* Copy all the non-conflicting attributes from the parent over, * but remove the regex and replace it with the name. */ pcmk__xe_copy_attrs(child, xml, pcmk__xaf_no_overwrite); pcmk__xe_remove_attr(child, PCMK__XA_ATTR_REGEX); crm_xml_add(child, PCMK__XA_ATTR_NAME, attr); } } regfree(&r_patt); /* Return a code if we never matched anything. This should not be treated * as an error. It indicates there was a regex, and it was a valid regex, * but simply did not match anything and the caller should not continue * doing any regex-related processing. */ if (!matched) { return pcmk_rc_op_unsatisfied; } } else if (attr == NULL) { return pcmk_rc_bad_nvpair; } return pcmk_rc_ok; } static int handle_regexes(pcmk__request_t *request) { xmlNode *xml = request->xml; int rc = pcmk_rc_ok; const char *attr = crm_element_value(xml, PCMK__XA_ATTR_NAME); const char *value = crm_element_value(xml, PCMK__XA_ATTR_VALUE); const char *regex = crm_element_value(xml, PCMK__XA_ATTR_REGEX); rc = expand_regexes(xml, attr, value, regex); if (rc == EINVAL) { pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR, "Bad regex '%s' for update from client %s", regex, pcmk__client_name(request->ipc_client)); } else if (rc == pcmk_rc_bad_nvpair) { crm_err("Update request did not specify attribute or regular expression"); pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR, "Client %s update request did not specify attribute or regular expression", pcmk__client_name(request->ipc_client)); } return rc; } static int handle_value_expansion(const char **value, xmlNode *xml, const char *op, const char *attr) { attribute_t *a = g_hash_table_lookup(attributes, attr); if (a == NULL && pcmk__str_eq(op, PCMK__ATTRD_CMD_UPDATE_DELAY, pcmk__str_none)) { return EINVAL; } if (*value && attrd_value_needs_expansion(*value)) { int int_value; attribute_value_t *v = NULL; if (a) { const char *host = crm_element_value(xml, PCMK__XA_ATTR_HOST); v = g_hash_table_lookup(a->values, host); } int_value = attrd_expand_value(*value, (v? v->current : NULL)); crm_info("Expanded %s=%s to %d", attr, *value, int_value); crm_xml_add_int(xml, PCMK__XA_ATTR_VALUE, int_value); /* Replacing the value frees the previous memory, so re-query it */ *value = crm_element_value(xml, PCMK__XA_ATTR_VALUE); } return pcmk_rc_ok; } static void send_update_msg_to_cluster(pcmk__request_t *request, xmlNode *xml) { if (pcmk__str_eq(attrd_request_sync_point(xml), PCMK__VALUE_CLUSTER, pcmk__str_none)) { /* The client is waiting on the cluster-wide sync point. In this case, * the response ACK is not sent until this attrd broadcasts the update * and receives its own confirmation back from all peers. */ attrd_expect_confirmations(request, attrd_cluster_sync_point_update); attrd_send_message(NULL, xml, true); /* ends up at attrd_peer_message() */ } else { /* The client is either waiting on the local sync point or was not * waiting on any sync point at all. For the local sync point, the * response ACK is sent in attrd_peer_update. For clients not * waiting on any sync point, the response ACK is sent in * handle_update_request immediately before this function was called. */ attrd_send_message(NULL, xml, false); /* ends up at attrd_peer_message() */ } } static int send_child_update(xmlNode *child, void *data) { pcmk__request_t *request = (pcmk__request_t *) data; /* Calling pcmk__set_result is handled by one of these calls to * attrd_client_update, so no need to do it again here. */ request->xml = child; attrd_client_update(request); return pcmk_rc_ok; } xmlNode * attrd_client_update(pcmk__request_t *request) { xmlNode *xml = NULL; const char *attr, *value, *regex; CRM_CHECK((request != NULL) && (request->xml != NULL), return NULL); xml = request->xml; /* If the message has children, that means it is a message from a newer * client that supports sending multiple operations at a time. There are * two ways we can handle that. */ if (xml->children != NULL) { if (ATTRD_SUPPORTS_MULTI_MESSAGE(minimum_protocol_version)) { /* First, if all peers support a certain protocol version, we can * just broadcast the big message and they'll handle it. However, * we also need to apply all the transformations in this function * to the children since they don't happen anywhere else. */ for (xmlNode *child = pcmk__xe_first_child(xml, PCMK_XE_OP, NULL, NULL); child != NULL; child = pcmk__xe_next(child, PCMK_XE_OP)) { attr = crm_element_value(child, PCMK__XA_ATTR_NAME); value = crm_element_value(child, PCMK__XA_ATTR_VALUE); handle_missing_host(child); if (handle_value_expansion(&value, child, request->op, attr) == EINVAL) { pcmk__format_result(&request->result, CRM_EX_NOSUCH, PCMK_EXEC_ERROR, "Attribute %s does not exist", attr); return NULL; } } send_update_msg_to_cluster(request, xml); pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL); } else { /* Save the original xml node pointer so it can be restored after iterating * over all the children. */ xmlNode *orig_xml = request->xml; /* Second, if they do not support that protocol version, split it * up into individual messages and call attrd_client_update on * each one. */ pcmk__xe_foreach_child(xml, PCMK_XE_OP, send_child_update, request); request->xml = orig_xml; } return NULL; } attr = crm_element_value(xml, PCMK__XA_ATTR_NAME); value = crm_element_value(xml, PCMK__XA_ATTR_VALUE); regex = crm_element_value(xml, PCMK__XA_ATTR_REGEX); if (handle_regexes(request) != pcmk_rc_ok) { /* Error handling was already dealt with in handle_regexes, so just return. */ return NULL; } else if (regex) { /* Recursively call attrd_client_update on the new message with regexes * expanded. If supported by the attribute daemon, this means that all * matches can also be handled atomically. */ return attrd_client_update(request); } handle_missing_host(xml); if (handle_value_expansion(&value, xml, request->op, attr) == EINVAL) { pcmk__format_result(&request->result, CRM_EX_NOSUCH, PCMK_EXEC_ERROR, "Attribute %s does not exist", attr); return NULL; } crm_debug("Broadcasting %s[%s]=%s%s", attr, crm_element_value(xml, PCMK__XA_ATTR_HOST), value, (attrd_election_won()? " (writer)" : "")); send_update_msg_to_cluster(request, xml); pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL); return NULL; } /*! * \internal * \brief Accept a new client IPC connection * * \param[in,out] c New connection * \param[in] uid Client user id * \param[in] gid Client group id * * \return pcmk_ok on success, -errno otherwise */ static int32_t attrd_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { crm_trace("New client connection %p", c); if (attrd_shutting_down(false)) { crm_info("Ignoring new connection from pid %d during shutdown", pcmk__client_pid(c)); return -ECONNREFUSED; } if (pcmk__new_client(c, uid, gid) == NULL) { return -ENOMEM; } return pcmk_ok; } /*! * \internal * \brief Destroy a client IPC connection * * \param[in] c Connection to destroy * * \return FALSE (i.e. do not re-run this callback) */ static int32_t attrd_ipc_closed(qb_ipcs_connection_t *c) { pcmk__client_t *client = pcmk__find_client(c); if (client == NULL) { crm_trace("Ignoring request to clean up unknown connection %p", c); } else { crm_trace("Cleaning up closed client connection %p", c); /* Remove the client from the sync point waitlist if it's present. */ attrd_remove_client_from_waitlist(client); /* And no longer wait for confirmations from any peers. */ attrd_do_not_wait_for_client(client); pcmk__free_client(client); } return FALSE; } /*! * \internal * \brief Destroy a client IPC connection * * \param[in,out] c Connection to destroy * * \note We handle a destroyed connection the same as a closed one, * but we need a separate handler because the return type is different. */ static void attrd_ipc_destroy(qb_ipcs_connection_t *c) { crm_trace("Destroying client connection %p", c); attrd_ipc_closed(c); } static int32_t attrd_ipc_dispatch(qb_ipcs_connection_t * c, void *data, size_t size) { int rc = pcmk_rc_ok; uint32_t id = 0; uint32_t flags = 0; pcmk__client_t *client = pcmk__find_client(c); xmlNode *xml = NULL; // Sanity-check, and parse XML from IPC data CRM_CHECK((c != NULL) && (client != NULL), return 0); if (data == NULL) { crm_debug("No IPC data from PID %d", pcmk__client_pid(c)); return 0; } rc = pcmk__ipc_msg_append(&client->buffer, data); if (rc == pcmk_rc_ipc_more) { /* We haven't read the complete message yet, so just return. */ return 0; } else if (rc == pcmk_rc_ok) { /* We've read the complete message and there's already a header on * the front. Pass it off for processing. */ xml = pcmk__client_data2xml(client, &id, &flags); g_byte_array_free(client->buffer, TRUE); client->buffer = NULL; } else { /* Some sort of error occurred reassembling the message. All we can * do is clean up, log an error and return. */ crm_err("Error when reading IPC message: %s", pcmk_rc_str(rc)); if (client->buffer != NULL) { g_byte_array_free(client->buffer, TRUE); client->buffer = NULL; } return 0; } if (xml == NULL) { crm_debug("Unrecognizable IPC data from PID %d", pcmk__client_pid(c)); - pcmk__ipc_send_ack(client, id, flags, PCMK__XE_ACK, NULL, + pcmk__ipc_send_ack(client, id, flags, PCMK__XE_NACK, NULL, CRM_EX_PROTOCOL); return 0; } else { pcmk__request_t request = { .ipc_client = client, .ipc_id = id, .ipc_flags = flags, .peer = NULL, .xml = xml, .call_options = 0, .result = PCMK__UNKNOWN_RESULT, }; pcmk__assert(client->user != NULL); pcmk__update_acl_user(xml, PCMK__XA_ATTR_USER, client->user); request.op = crm_element_value_copy(request.xml, PCMK_XA_TASK); CRM_CHECK(request.op != NULL, return 0); attrd_handle_request(&request); pcmk__reset_request(&request); } pcmk__xml_free(xml); return 0; } static struct qb_ipcs_service_handlers ipc_callbacks = { .connection_accept = attrd_ipc_accept, .connection_created = NULL, .msg_process = attrd_ipc_dispatch, .connection_closed = attrd_ipc_closed, .connection_destroyed = attrd_ipc_destroy }; void attrd_ipc_fini(void) { if (ipcs != NULL) { pcmk__drop_all_clients(ipcs); qb_ipcs_destroy(ipcs); ipcs = NULL; } attrd_unregister_handlers(); pcmk__client_cleanup(); } /*! * \internal * \brief Set up attrd IPC communication */ void attrd_init_ipc(void) { pcmk__serve_attrd_ipc(&ipcs, &ipc_callbacks); } diff --git a/daemons/attrd/attrd_sync.c b/daemons/attrd/attrd_sync.c index 23915a619d..86d9268cc5 100644 --- a/daemons/attrd/attrd_sync.c +++ b/daemons/attrd/attrd_sync.c @@ -1,571 +1,571 @@ /* - * Copyright 2022-2024 the Pacemaker project contributors + * Copyright 2022-2025 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU General Public License version 2 * or later (GPLv2+) WITHOUT ANY WARRANTY. */ #include #include #include #include "pacemaker-attrd.h" /* A hash table storing clients that are waiting on a sync point to be reached. * The key is waitlist_client - just a plain int. The obvious key would be * the IPC client's ID, but this is not guaranteed to be unique. A single client * could be waiting on a sync point for multiple attributes at the same time. * * It is not expected that this hash table will ever be especially large. */ static GHashTable *waitlist = NULL; static int waitlist_client = 0; struct waitlist_node { /* What kind of sync point does this node describe? */ enum attrd_sync_point sync_point; /* Information required to construct and send a reply to the client. */ char *client_id; uint32_t ipc_id; uint32_t flags; }; /* A hash table storing information on in-progress IPC requests that are awaiting * confirmations. These requests are currently being processed by peer attrds and * we are waiting to receive confirmation messages from each peer indicating that * processing is complete. * * Multiple requests could be waiting on confirmations at the same time. * * The key is the unique callid for the IPC request, and the value is a * confirmation_action struct. */ static GHashTable *expected_confirmations = NULL; /*! * \internal * \brief A structure describing a single IPC request that is awaiting confirmations */ struct confirmation_action { /*! * \brief A list of peer attrds that we are waiting to receive confirmation * messages from * * This list is dynamic - as confirmations arrive from peer attrds, they will * be removed from this list. When the list is empty, all peers have processed * the request and the associated confirmation action will be taken. */ GList *respondents; /*! * \brief A timer that will be used to remove the client should it time out * before receiving all confirmations */ mainloop_timer_t *timer; /*! * \brief A function to run when all confirmations have been received */ attrd_confirmation_action_fn fn; /*! * \brief Information required to construct and send a reply to the client */ char *client_id; uint32_t ipc_id; uint32_t flags; /*! * \brief The XML request containing the callid associated with this action */ void *xml; }; static void next_key(void) { do { waitlist_client++; if (waitlist_client < 0) { waitlist_client = 1; } } while (g_hash_table_contains(waitlist, GINT_TO_POINTER(waitlist_client))); } static void free_waitlist_node(gpointer data) { struct waitlist_node *wl = (struct waitlist_node *) data; free(wl->client_id); free(wl); } static const char * sync_point_str(enum attrd_sync_point sync_point) { if (sync_point == attrd_sync_point_local) { return PCMK__VALUE_LOCAL; } else if (sync_point == attrd_sync_point_cluster) { return PCMK__VALUE_CLUSTER; } else { return PCMK_VALUE_UNKNOWN; } } /*! * \internal * \brief Add a client to the attrd waitlist * * Typically, a client receives an ACK for its XML IPC request immediately. However, * some clients want to wait until their request has been processed and taken effect. * This is called a sync point. Any client placed on this waitlist will have its * ACK message delayed until either its requested sync point is hit, or until it * times out. * * The XML IPC request must specify the type of sync point it wants to wait for. * * \param[in,out] request The request describing the client to place on the waitlist. */ void attrd_add_client_to_waitlist(pcmk__request_t *request) { const char *sync_point = attrd_request_sync_point(request->xml); struct waitlist_node *wl = NULL; if (sync_point == NULL) { return; } if (waitlist == NULL) { waitlist = pcmk__intkey_table(free_waitlist_node); } wl = pcmk__assert_alloc(1, sizeof(struct waitlist_node)); if (pcmk__str_eq(sync_point, PCMK__VALUE_LOCAL, pcmk__str_none)) { wl->sync_point = attrd_sync_point_local; } else if (pcmk__str_eq(sync_point, PCMK__VALUE_CLUSTER, pcmk__str_none)) { wl->sync_point = attrd_sync_point_cluster; } else { free_waitlist_node(wl); return; } wl->client_id = pcmk__str_copy(request->ipc_client->id); wl->ipc_id = request->ipc_id; wl->flags = request->flags; next_key(); pcmk__intkey_table_insert(waitlist, waitlist_client, wl); crm_trace("Added client %s to waitlist for %s sync point", wl->client_id, sync_point_str(wl->sync_point)); crm_trace("%d clients now on waitlist", g_hash_table_size(waitlist)); /* And then add the key to the request XML so we can uniquely identify * it when it comes time to issue the ACK. */ crm_xml_add_int(request->xml, PCMK__XA_CALL_ID, waitlist_client); } /*! * \internal * \brief Free all memory associated with the waitlist. This is most typically * used when attrd shuts down. */ void attrd_free_waitlist(void) { if (waitlist == NULL) { return; } g_hash_table_destroy(waitlist); waitlist = NULL; } /*! * \internal * \brief Unconditionally remove a client from the waitlist, such as when the client * node disconnects from the cluster * * \param[in] client The client to remove */ void attrd_remove_client_from_waitlist(pcmk__client_t *client) { GHashTableIter iter; gpointer value; if (waitlist == NULL) { return; } g_hash_table_iter_init(&iter, waitlist); while (g_hash_table_iter_next(&iter, NULL, &value)) { struct waitlist_node *wl = (struct waitlist_node *) value; if (pcmk__str_eq(wl->client_id, client->id, pcmk__str_none)) { g_hash_table_iter_remove(&iter); crm_trace("%d clients now on waitlist", g_hash_table_size(waitlist)); } } } /*! * \internal * \brief Send an IPC ACK message to all awaiting clients * * This function will search the waitlist for all clients that are currently awaiting * an ACK indicating their attrd operation is complete. Only those clients with a * matching sync point type and callid from their original XML IPC request will be * ACKed. Once they have received an ACK, they will be removed from the waitlist. * * \param[in] sync_point What kind of sync point have we hit? * \param[in] xml The original XML IPC request. */ void attrd_ack_waitlist_clients(enum attrd_sync_point sync_point, const xmlNode *xml) { int callid; gpointer value; if (waitlist == NULL) { return; } if (crm_element_value_int(xml, PCMK__XA_CALL_ID, &callid) == -1) { crm_warn("Could not get callid from request XML"); return; } value = pcmk__intkey_table_lookup(waitlist, callid); if (value != NULL) { struct waitlist_node *wl = (struct waitlist_node *) value; pcmk__client_t *client = NULL; if (wl->sync_point != sync_point) { return; } crm_notice("Alerting client %s for reached %s sync point", wl->client_id, sync_point_str(wl->sync_point)); client = pcmk__find_client_by_id(wl->client_id); if (client == NULL) { return; } attrd_send_ack(client, wl->ipc_id, wl->flags | crm_ipc_client_response); /* And then remove the client so it doesn't get alerted again. */ pcmk__intkey_table_remove(waitlist, callid); crm_trace("%d clients now on waitlist", g_hash_table_size(waitlist)); } } /*! * \internal * \brief Action to take when a cluster sync point is hit for a * PCMK__ATTRD_CMD_UPDATE* message. * * \param[in] xml The request that should be passed along to * attrd_ack_waitlist_clients. This should be the original * IPC request containing the callid for this update message. */ int attrd_cluster_sync_point_update(xmlNode *xml) { crm_trace("Hit cluster sync point for attribute update"); attrd_ack_waitlist_clients(attrd_sync_point_cluster, xml); return pcmk_rc_ok; } /*! * \internal * \brief Return the sync point attribute for an IPC request * * This function will check both the top-level element of \p xml for a sync * point attribute, as well as all of its \p op children, if any. The latter * is useful for newer versions of attrd that can put multiple IPC requests * into a single message. * * \param[in] xml An XML IPC request * * \note It is assumed that if one child element has a sync point attribute, * all will have a sync point attribute and they will all be the same * sync point. No other configuration is supported. * * \return The sync point attribute of \p xml, or NULL if none. */ const char * attrd_request_sync_point(xmlNode *xml) { CRM_CHECK(xml != NULL, return NULL); if (xml->children != NULL) { xmlNode *child = pcmk__xe_first_child(xml, PCMK_XE_OP, PCMK__XA_ATTR_SYNC_POINT, NULL); if (child) { return crm_element_value(child, PCMK__XA_ATTR_SYNC_POINT); } else { return NULL; } } else { return crm_element_value(xml, PCMK__XA_ATTR_SYNC_POINT); } } /*! * \internal * \brief Does an IPC request contain any sync point attribute? * * \param[in] xml An XML IPC request * * \return true if there's a sync point attribute, false otherwise */ bool attrd_request_has_sync_point(xmlNode *xml) { return attrd_request_sync_point(xml) != NULL; } static void free_action(gpointer data) { struct confirmation_action *action = (struct confirmation_action *) data; g_list_free_full(action->respondents, free); mainloop_timer_del(action->timer); pcmk__xml_free(action->xml); free(action->client_id); free(action); } /* Remove an IPC request from the expected_confirmations table if the peer attrds * don't respond before the timeout is hit. We set the timeout to 15s. The exact * number isn't critical - we just want to make sure that the table eventually gets * cleared of things that didn't complete. */ static gboolean confirmation_timeout_cb(gpointer data) { struct confirmation_action *action = (struct confirmation_action *) data; GHashTableIter iter; gpointer value; if (expected_confirmations == NULL) { return G_SOURCE_REMOVE; } g_hash_table_iter_init(&iter, expected_confirmations); while (g_hash_table_iter_next(&iter, NULL, &value)) { if (value == action) { pcmk__client_t *client = pcmk__find_client_by_id(action->client_id); if (client == NULL) { return G_SOURCE_REMOVE; } crm_trace("Timed out waiting for confirmations for client %s", client->id); pcmk__ipc_send_ack(client, action->ipc_id, action->flags|crm_ipc_client_response, - PCMK__XE_ACK, ATTRD_PROTOCOL_VERSION, + PCMK__XE_NACK, ATTRD_PROTOCOL_VERSION, CRM_EX_TIMEOUT); g_hash_table_iter_remove(&iter); crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations)); break; } } return G_SOURCE_REMOVE; } /*! * \internal * \brief When a peer disconnects from the cluster, no longer wait for its confirmation * for any IPC action. If this peer is the last one being waited on, this will * trigger the confirmation action. * * \param[in] host The disconnecting peer attrd's uname */ void attrd_do_not_expect_from_peer(const char *host) { GList *keys = NULL; if (expected_confirmations == NULL) { return; } keys = g_hash_table_get_keys(expected_confirmations); crm_trace("Removing peer %s from expected confirmations", host); for (GList *node = keys; node != NULL; node = node->next) { int callid = *(int *) node->data; attrd_handle_confirmation(callid, host); } g_list_free(keys); } /*! * \internal * \brief When a client disconnects from the cluster, no longer wait on confirmations * for it. Because the peer attrds may still be processing the original IPC * message, they may still send us confirmations. However, we will take no * action on them. * * \param[in] client The disconnecting client */ void attrd_do_not_wait_for_client(pcmk__client_t *client) { GHashTableIter iter; gpointer value; if (expected_confirmations == NULL) { return; } g_hash_table_iter_init(&iter, expected_confirmations); while (g_hash_table_iter_next(&iter, NULL, &value)) { struct confirmation_action *action = (struct confirmation_action *) value; if (pcmk__str_eq(action->client_id, client->id, pcmk__str_none)) { crm_trace("Removing client %s from expected confirmations", client->id); g_hash_table_iter_remove(&iter); crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations)); break; } } } /*! * \internal * \brief Register some action to be taken when IPC request confirmations are * received * * When this function is called, a list of all peer attrds that support confirming * requests is generated. As confirmations from these peer attrds are received, * they are removed from this list. When the list is empty, the registered action * will be called. * * \note This function should always be called before attrd_send_message is called * to broadcast to the peers to ensure that we know what replies we are * waiting on. Otherwise, it is possible the peer could finish and confirm * before we know to expect it. * * \param[in] request The request that is awaiting confirmations * \param[in] fn A function to be run after all confirmations are received */ void attrd_expect_confirmations(pcmk__request_t *request, attrd_confirmation_action_fn fn) { struct confirmation_action *action = NULL; GHashTableIter iter; gpointer host, ver; GList *respondents = NULL; int callid; if (expected_confirmations == NULL) { expected_confirmations = pcmk__intkey_table((GDestroyNotify) free_action); } if (crm_element_value_int(request->xml, PCMK__XA_CALL_ID, &callid) == -1) { crm_err("Could not get callid from xml"); return; } if (pcmk__intkey_table_lookup(expected_confirmations, callid)) { crm_err("Already waiting on confirmations for call id %d", callid); return; } g_hash_table_iter_init(&iter, peer_protocol_vers); while (g_hash_table_iter_next(&iter, &host, &ver)) { if (ATTRD_SUPPORTS_CONFIRMATION(GPOINTER_TO_INT(ver))) { respondents = g_list_prepend(respondents, pcmk__str_copy((char *) host)); } } action = pcmk__assert_alloc(1, sizeof(struct confirmation_action)); action->respondents = respondents; action->fn = fn; action->xml = pcmk__xml_copy(NULL, request->xml); action->client_id = pcmk__str_copy(request->ipc_client->id); action->ipc_id = request->ipc_id; action->flags = request->flags; action->timer = mainloop_timer_add(NULL, 15000, FALSE, confirmation_timeout_cb, action); mainloop_timer_start(action->timer); pcmk__intkey_table_insert(expected_confirmations, callid, action); crm_trace("Callid %d now waiting on %d confirmations", callid, g_list_length(respondents)); crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations)); } void attrd_free_confirmations(void) { if (expected_confirmations != NULL) { g_hash_table_destroy(expected_confirmations); expected_confirmations = NULL; } } /*! * \internal * \brief Process a confirmation message from a peer attrd * * This function is called every time a PCMK__ATTRD_CMD_CONFIRM message is * received from a peer attrd. If this is the last confirmation we are waiting * on for a given operation, the registered action will be called. * * \param[in] callid The unique callid for the XML IPC request * \param[in] host The confirming peer attrd's uname */ void attrd_handle_confirmation(int callid, const char *host) { struct confirmation_action *action = NULL; GList *node = NULL; if (expected_confirmations == NULL) { return; } action = pcmk__intkey_table_lookup(expected_confirmations, callid); if (action == NULL) { return; } node = g_list_find_custom(action->respondents, host, (GCompareFunc) strcasecmp); if (node == NULL) { return; } action->respondents = g_list_remove(action->respondents, node->data); crm_trace("Callid %d now waiting on %d confirmations", callid, g_list_length(action->respondents)); if (action->respondents == NULL) { action->fn(action->xml); pcmk__intkey_table_remove(expected_confirmations, callid); crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations)); } } diff --git a/daemons/based/based_callbacks.c b/daemons/based/based_callbacks.c index 1d34055d9d..4efc2ae1ba 100644 --- a/daemons/based/based_callbacks.c +++ b/daemons/based/based_callbacks.c @@ -1,1433 +1,1440 @@ /* * Copyright 2004-2025 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU General Public License version 2 * or later (GPLv2+) WITHOUT ANY WARRANTY. */ #include #include #include #include #include #include #include // uint32_t, uint64_t, UINT64_C() #include #include #include // PRIu64 #include #include #include // xmlXPathObject, etc. #include #include #include #include #include #include #define EXIT_ESCALATION_MS 10000 qb_ipcs_service_t *ipcs_ro = NULL; qb_ipcs_service_t *ipcs_rw = NULL; qb_ipcs_service_t *ipcs_shm = NULL; static int cib_process_command(xmlNode *request, const cib__operation_t *operation, cib__op_fn_t op_function, xmlNode **reply, xmlNode **cib_diff, bool privileged); static gboolean cib_common_callback(qb_ipcs_connection_t *c, void *data, size_t size, gboolean privileged); static int32_t cib_ipc_accept(qb_ipcs_connection_t * c, uid_t uid, gid_t gid) { if (cib_shutdown_flag) { crm_info("Ignoring new IPC client [%d] during shutdown", pcmk__client_pid(c)); return -ECONNREFUSED; } if (pcmk__new_client(c, uid, gid) == NULL) { return -ENOMEM; } return 0; } static int32_t cib_ipc_dispatch_rw(qb_ipcs_connection_t * c, void *data, size_t size) { pcmk__client_t *client = pcmk__find_client(c); crm_trace("%p message from %s", c, client->id); return cib_common_callback(c, data, size, TRUE); } static int32_t cib_ipc_dispatch_ro(qb_ipcs_connection_t * c, void *data, size_t size) { pcmk__client_t *client = pcmk__find_client(c); crm_trace("%p message from %s", c, client->id); return cib_common_callback(c, data, size, FALSE); } /* Error code means? */ static int32_t cib_ipc_closed(qb_ipcs_connection_t * c) { pcmk__client_t *client = pcmk__find_client(c); if (client == NULL) { return 0; } crm_trace("Connection %p", c); pcmk__free_client(client); return 0; } static void cib_ipc_destroy(qb_ipcs_connection_t * c) { crm_trace("Connection %p", c); cib_ipc_closed(c); if (cib_shutdown_flag) { cib_shutdown(0); } } struct qb_ipcs_service_handlers ipc_ro_callbacks = { .connection_accept = cib_ipc_accept, .connection_created = NULL, .msg_process = cib_ipc_dispatch_ro, .connection_closed = cib_ipc_closed, .connection_destroyed = cib_ipc_destroy }; struct qb_ipcs_service_handlers ipc_rw_callbacks = { .connection_accept = cib_ipc_accept, .connection_created = NULL, .msg_process = cib_ipc_dispatch_rw, .connection_closed = cib_ipc_closed, .connection_destroyed = cib_ipc_destroy }; /*! * \internal * \brief Create reply XML for a CIB request * * \param[in] op CIB operation type * \param[in] call_id CIB call ID * \param[in] client_id CIB client ID * \param[in] call_options Group of enum cib_call_options flags * \param[in] rc Request return code * \param[in] call_data Request output data * * \return Reply XML (guaranteed not to be \c NULL) * * \note The caller is responsible for freeing the return value using * \p pcmk__xml_free(). */ static xmlNode * create_cib_reply(const char *op, const char *call_id, const char *client_id, uint32_t call_options, int rc, xmlNode *call_data) { xmlNode *reply = pcmk__xe_create(NULL, PCMK__XE_CIB_REPLY); crm_xml_add(reply, PCMK__XA_T, PCMK__VALUE_CIB); crm_xml_add(reply, PCMK__XA_CIB_OP, op); crm_xml_add(reply, PCMK__XA_CIB_CALLID, call_id); crm_xml_add(reply, PCMK__XA_CIB_CLIENTID, client_id); crm_xml_add_int(reply, PCMK__XA_CIB_CALLOPT, call_options); crm_xml_add_int(reply, PCMK__XA_CIB_RC, rc); if (call_data != NULL) { xmlNode *wrapper = pcmk__xe_create(reply, PCMK__XE_CIB_CALLDATA); crm_trace("Attaching reply output"); pcmk__xml_copy(wrapper, call_data); } crm_log_xml_explicit(reply, "cib:reply"); return reply; } static void do_local_notify(const xmlNode *notify_src, const char *client_id, bool sync_reply, bool from_peer) { int msg_id = 0; int rc = pcmk_rc_ok; pcmk__client_t *client_obj = NULL; uint32_t flags = crm_ipc_server_event; CRM_CHECK((notify_src != NULL) && (client_id != NULL), return); crm_element_value_int(notify_src, PCMK__XA_CIB_CALLID, &msg_id); client_obj = pcmk__find_client_by_id(client_id); if (client_obj == NULL) { crm_debug("Could not notify client %s%s %s of call %d result: " "client no longer exists", client_id, (from_peer? " (originator of delegated request)" : ""), (sync_reply? "synchronously" : "asynchronously"), msg_id); return; } if (sync_reply) { flags = crm_ipc_flags_none; if (client_obj->ipcs != NULL) { msg_id = client_obj->request_id; client_obj->request_id = 0; } } switch (PCMK__CLIENT_TYPE(client_obj)) { case pcmk__client_ipc: rc = pcmk__ipc_send_xml(client_obj, msg_id, notify_src, flags); break; case pcmk__client_tls: case pcmk__client_tcp: rc = pcmk__remote_send_xml(client_obj->remote, notify_src); break; default: rc = EPROTONOSUPPORT; break; } if (rc == pcmk_rc_ok) { crm_trace("Notified %s client %s%s %s of call %d result", pcmk__client_type_str(PCMK__CLIENT_TYPE(client_obj)), pcmk__client_name(client_obj), (from_peer? " (originator of delegated request)" : ""), (sync_reply? "synchronously" : "asynchronously"), msg_id); } else { crm_warn("Could not notify %s client %s%s %s of call %d result: %s", pcmk__client_type_str(PCMK__CLIENT_TYPE(client_obj)), pcmk__client_name(client_obj), (from_peer? " (originator of delegated request)" : ""), (sync_reply? "synchronously" : "asynchronously"), msg_id, pcmk_rc_str(rc)); } } void cib_common_callback_worker(uint32_t id, uint32_t flags, xmlNode * op_request, pcmk__client_t *cib_client, gboolean privileged) { const char *op = crm_element_value(op_request, PCMK__XA_CIB_OP); uint32_t call_options = cib_none; int rc = pcmk_rc_ok; rc = pcmk__xe_get_flags(op_request, PCMK__XA_CIB_CALLOPT, &call_options, cib_none); if (rc != pcmk_rc_ok) { crm_warn("Couldn't parse options from request: %s", pcmk_rc_str(rc)); } /* Requests with cib_transaction set should not be sent to based directly * (outside of a commit-transaction request) */ if (pcmk_is_set(call_options, cib_transaction)) { return; } if (pcmk__str_eq(op, CRM_OP_REGISTER, pcmk__str_none)) { if (flags & crm_ipc_client_response) { xmlNode *ack = pcmk__xe_create(NULL, __func__); crm_xml_add(ack, PCMK__XA_CIB_OP, CRM_OP_REGISTER); crm_xml_add(ack, PCMK__XA_CIB_CLIENTID, cib_client->id); pcmk__ipc_send_xml(cib_client, id, ack, flags); cib_client->request_id = 0; pcmk__xml_free(ack); } return; } else if (pcmk__str_eq(op, PCMK__VALUE_CIB_NOTIFY, pcmk__str_none)) { /* Update the notify filters for this client */ int on_off = 0; crm_exit_t status = CRM_EX_OK; uint64_t bit = UINT64_C(0); const char *type = crm_element_value(op_request, PCMK__XA_CIB_NOTIFY_TYPE); crm_element_value_int(op_request, PCMK__XA_CIB_NOTIFY_ACTIVATE, &on_off); crm_debug("Setting %s callbacks %s for client %s", type, (on_off? "on" : "off"), pcmk__client_name(cib_client)); if (pcmk__str_eq(type, PCMK__VALUE_CIB_POST_NOTIFY, pcmk__str_none)) { bit = cib_notify_post; } else if (pcmk__str_eq(type, PCMK__VALUE_CIB_PRE_NOTIFY, pcmk__str_none)) { bit = cib_notify_pre; } else if (pcmk__str_eq(type, PCMK__VALUE_CIB_UPDATE_CONFIRMATION, pcmk__str_none)) { bit = cib_notify_confirm; } else if (pcmk__str_eq(type, PCMK__VALUE_CIB_DIFF_NOTIFY, pcmk__str_none)) { bit = cib_notify_diff; } else { status = CRM_EX_INVALID_PARAM; } if (bit != 0) { if (on_off) { pcmk__set_client_flags(cib_client, bit); } else { pcmk__clear_client_flags(cib_client, bit); } } - pcmk__ipc_send_ack(cib_client, id, flags, PCMK__XE_ACK, NULL, status); + if (status == CRM_EX_OK) { + pcmk__ipc_send_ack(cib_client, id, flags, PCMK__XE_ACK, NULL, + status); + } else { + pcmk__ipc_send_ack(cib_client, id, flags, PCMK__XE_NACK, NULL, + status); + } + return; } cib_process_request(op_request, privileged, cib_client); } int32_t cib_common_callback(qb_ipcs_connection_t * c, void *data, size_t size, gboolean privileged) { int rc = pcmk_rc_ok; uint32_t id = 0; uint32_t flags = 0; uint32_t call_options = cib_none; pcmk__client_t *cib_client = pcmk__find_client(c); xmlNode *op_request = NULL; if (cib_client == NULL) { crm_trace("Invalid client %p", c); return 0; } rc = pcmk__ipc_msg_append(&cib_client->buffer, data); if (rc == pcmk_rc_ipc_more) { /* We haven't read the complete message yet, so just return. */ return 0; } else if (rc == pcmk_rc_ok) { /* We've read the complete message and there's already a header on * the front. Pass it off for processing. */ op_request = pcmk__client_data2xml(cib_client, &id, &flags); g_byte_array_free(cib_client->buffer, TRUE); cib_client->buffer = NULL; } else { /* Some sort of error occurred reassembling the message. All we can * do is clean up, log an error and return. */ crm_err("Error when reading IPC message: %s", pcmk_rc_str(rc)); if (cib_client->buffer != NULL) { g_byte_array_free(cib_client->buffer, TRUE); cib_client->buffer = NULL; } return 0; } if (op_request) { int rc = pcmk_rc_ok; rc = pcmk__xe_get_flags(op_request, PCMK__XA_CIB_CALLOPT, &call_options, cib_none); if (rc != pcmk_rc_ok) { crm_warn("Couldn't parse options from request: %s", pcmk_rc_str(rc)); } } if (op_request == NULL) { crm_trace("Invalid message from %p", c); pcmk__ipc_send_ack(cib_client, id, flags, PCMK__XE_NACK, NULL, CRM_EX_PROTOCOL); return 0; } if (pcmk_is_set(call_options, cib_sync_call)) { CRM_LOG_ASSERT(flags & crm_ipc_client_response); CRM_LOG_ASSERT(cib_client->request_id == 0); /* This means the client has two synchronous events in-flight */ cib_client->request_id = id; /* Reply only to the last one */ } if (cib_client->name == NULL) { const char *value = crm_element_value(op_request, PCMK__XA_CIB_CLIENTNAME); if (value == NULL) { cib_client->name = pcmk__itoa(cib_client->pid); } else { cib_client->name = pcmk__str_copy(value); if (pcmk__parse_server(value) != pcmk_ipc_unknown) { pcmk__set_client_flags(cib_client, cib_is_daemon); } } } /* Allow cluster daemons more leeway before being evicted */ if (pcmk_is_set(cib_client->flags, cib_is_daemon)) { const char *qmax = cib_config_lookup(PCMK_OPT_CLUSTER_IPC_LIMIT); pcmk__set_client_queue_max(cib_client, qmax); } crm_xml_add(op_request, PCMK__XA_CIB_CLIENTID, cib_client->id); crm_xml_add(op_request, PCMK__XA_CIB_CLIENTNAME, cib_client->name); CRM_LOG_ASSERT(cib_client->user != NULL); pcmk__update_acl_user(op_request, PCMK__XA_CIB_USER, cib_client->user); cib_common_callback_worker(id, flags, op_request, cib_client, privileged); pcmk__xml_free(op_request); return 0; } static uint64_t ping_seq = 0; static char *ping_digest = NULL; static bool ping_modified_since = FALSE; static gboolean cib_digester_cb(gpointer data) { if (based_is_primary) { char buffer[32]; xmlNode *ping = pcmk__xe_create(NULL, PCMK__XE_PING); ping_seq++; free(ping_digest); ping_digest = NULL; ping_modified_since = FALSE; pcmk__assert(snprintf(buffer, 32, "%" PRIu64, ping_seq) >= 0); crm_trace("Requesting peer digests (%s)", buffer); crm_xml_add(ping, PCMK__XA_T, PCMK__VALUE_CIB); crm_xml_add(ping, PCMK__XA_CIB_OP, CRM_OP_PING); crm_xml_add(ping, PCMK__XA_CIB_PING_ID, buffer); crm_xml_add(ping, PCMK_XA_CRM_FEATURE_SET, CRM_FEATURE_SET); pcmk__cluster_send_message(NULL, pcmk_ipc_based, ping); pcmk__xml_free(ping); } return FALSE; } static void process_ping_reply(xmlNode *reply) { uint64_t seq = 0; const char *host = crm_element_value(reply, PCMK__XA_SRC); xmlNode *wrapper = pcmk__xe_first_child(reply, PCMK__XE_CIB_CALLDATA, NULL, NULL); xmlNode *pong = pcmk__xe_first_child(wrapper, NULL, NULL, NULL); const char *seq_s = crm_element_value(pong, PCMK__XA_CIB_PING_ID); const char *digest = crm_element_value(pong, PCMK__XA_DIGEST); if (seq_s == NULL) { crm_debug("Ignoring ping reply with no " PCMK__XA_CIB_PING_ID); return; } else { long long seq_ll; int rc = pcmk__scan_ll(seq_s, &seq_ll, 0LL); if (rc != pcmk_rc_ok) { crm_debug("Ignoring ping reply with invalid " PCMK__XA_CIB_PING_ID " '%s': %s", seq_s, pcmk_rc_str(rc)); return; } seq = (uint64_t) seq_ll; } if(digest == NULL) { crm_trace("Ignoring ping reply %s from %s with no digest", seq_s, host); } else if(seq != ping_seq) { crm_trace("Ignoring out of sequence ping reply %s from %s", seq_s, host); } else if(ping_modified_since) { crm_trace("Ignoring ping reply %s from %s: cib updated since", seq_s, host); } else { if(ping_digest == NULL) { crm_trace("Calculating new digest"); ping_digest = pcmk__digest_xml(the_cib, true); } crm_trace("Processing ping reply %s from %s (%s)", seq_s, host, digest); if (!pcmk__str_eq(ping_digest, digest, pcmk__str_casei)) { xmlNode *wrapper = pcmk__xe_first_child(pong, PCMK__XE_CIB_CALLDATA, NULL, NULL); xmlNode *remote_cib = pcmk__xe_first_child(wrapper, NULL, NULL, NULL); const char *admin_epoch_s = NULL; const char *epoch_s = NULL; const char *num_updates_s = NULL; if (remote_cib != NULL) { admin_epoch_s = crm_element_value(remote_cib, PCMK_XA_ADMIN_EPOCH); epoch_s = crm_element_value(remote_cib, PCMK_XA_EPOCH); num_updates_s = crm_element_value(remote_cib, PCMK_XA_NUM_UPDATES); } crm_notice("Local CIB %s.%s.%s.%s differs from %s: %s.%s.%s.%s %p", crm_element_value(the_cib, PCMK_XA_ADMIN_EPOCH), crm_element_value(the_cib, PCMK_XA_EPOCH), crm_element_value(the_cib, PCMK_XA_NUM_UPDATES), ping_digest, host, pcmk__s(admin_epoch_s, "_"), pcmk__s(epoch_s, "_"), pcmk__s(num_updates_s, "_"), digest, remote_cib); if(remote_cib && remote_cib->children) { // Additional debug pcmk__xml_mark_changes(the_cib, remote_cib); pcmk__log_xml_changes(LOG_INFO, remote_cib); crm_trace("End of differences"); } pcmk__xml_free(remote_cib); sync_our_cib(reply, FALSE); } } } static void parse_local_options(const pcmk__client_t *cib_client, const cib__operation_t *operation, const char *host, const char *op, gboolean *local_notify, gboolean *needs_reply, gboolean *process, gboolean *needs_forward) { // Process locally and notify local client *process = TRUE; *needs_reply = FALSE; *local_notify = TRUE; *needs_forward = FALSE; if (pcmk_is_set(operation->flags, cib__op_attr_local)) { /* Always process locally if cib__op_attr_local is set. * * @COMPAT: Currently host is ignored. At a compatibility break, throw * an error (from cib_process_request() or earlier) if host is not NULL or * OUR_NODENAME. */ crm_trace("Processing always-local %s op from client %s", op, pcmk__client_name(cib_client)); if (!pcmk__str_eq(host, OUR_NODENAME, pcmk__str_casei|pcmk__str_null_matches)) { crm_warn("Operation '%s' is always local but its target host is " "set to '%s'", op, host); } return; } if (pcmk_is_set(operation->flags, cib__op_attr_modifies) || !pcmk__str_eq(host, OUR_NODENAME, pcmk__str_casei|pcmk__str_null_matches)) { // Forward modifying and non-local requests via cluster *process = FALSE; *needs_reply = FALSE; *local_notify = FALSE; *needs_forward = TRUE; crm_trace("%s op from %s needs to be forwarded to %s", op, pcmk__client_name(cib_client), pcmk__s(host, "all nodes")); return; } if (stand_alone) { crm_trace("Processing %s op from client %s (stand-alone)", op, pcmk__client_name(cib_client)); } else { crm_trace("Processing %saddressed %s op from client %s", ((host != NULL)? "locally " : "un"), op, pcmk__client_name(cib_client)); } } static gboolean parse_peer_options(const cib__operation_t *operation, xmlNode *request, gboolean *local_notify, gboolean *needs_reply, gboolean *process) { /* TODO: What happens when an update comes in after node A * requests the CIB from node B, but before it gets the reply (and * sends out the replace operation)? * * (This may no longer be relevant since legacy mode was dropped; need to * trace code more closely to check.) */ const char *host = NULL; const char *delegated = crm_element_value(request, PCMK__XA_CIB_DELEGATED_FROM); const char *op = crm_element_value(request, PCMK__XA_CIB_OP); const char *originator = crm_element_value(request, PCMK__XA_SRC); const char *reply_to = crm_element_value(request, PCMK__XA_CIB_ISREPLYTO); gboolean is_reply = pcmk__str_eq(reply_to, OUR_NODENAME, pcmk__str_casei); if (originator == NULL) { // Shouldn't be possible originator = "peer"; } if (pcmk__str_eq(op, PCMK__CIB_REQUEST_REPLACE, pcmk__str_none)) { // sync_our_cib() sets PCMK__XA_CIB_ISREPLYTO if (reply_to) { delegated = reply_to; } goto skip_is_reply; } else if (pcmk__str_eq(op, PCMK__CIB_REQUEST_SYNC_TO_ALL, pcmk__str_none)) { // Nothing to do } else if (is_reply && pcmk__str_eq(op, CRM_OP_PING, pcmk__str_casei)) { process_ping_reply(request); return FALSE; } else if (pcmk__str_eq(op, PCMK__CIB_REQUEST_UPGRADE, pcmk__str_none)) { /* Only the DC (node with the oldest software) should process * this operation if PCMK__XA_CIB_SCHEMA_MAX is unset. * * If the DC is happy it will then send out another * PCMK__CIB_REQUEST_UPGRADE which will tell all nodes to do the actual * upgrade. * * Except this time PCMK__XA_CIB_SCHEMA_MAX will be set which puts a * limit on how far newer nodes will go */ const char *max = crm_element_value(request, PCMK__XA_CIB_SCHEMA_MAX); const char *upgrade_rc = crm_element_value(request, PCMK__XA_CIB_UPGRADE_RC); crm_trace("Parsing upgrade %s for %s with max=%s and upgrade_rc=%s", (is_reply? "reply" : "request"), (based_is_primary? "primary" : "secondary"), pcmk__s(max, "none"), pcmk__s(upgrade_rc, "none")); if (upgrade_rc != NULL) { // Our upgrade request was rejected by DC, notify clients of result crm_xml_add(request, PCMK__XA_CIB_RC, upgrade_rc); } else if ((max == NULL) && based_is_primary) { /* We are the DC, check if this upgrade is allowed */ goto skip_is_reply; } else if(max) { /* Ok, go ahead and upgrade to 'max' */ goto skip_is_reply; } else { // Ignore broadcast client requests when we're not primary return FALSE; } } else if (pcmk__xe_attr_is_true(request, PCMK__XA_CIB_UPDATE)) { crm_info("Detected legacy %s global update from %s", op, originator); send_sync_request(NULL); return FALSE; } else if (is_reply && pcmk_is_set(operation->flags, cib__op_attr_modifies)) { crm_trace("Ignoring legacy %s reply sent from %s to local clients", op, originator); return FALSE; } else if (pcmk__str_eq(op, PCMK__CIB_REQUEST_SHUTDOWN, pcmk__str_none)) { *local_notify = FALSE; if (reply_to == NULL) { *process = TRUE; } else { // Not possible? crm_debug("Ignoring shutdown request from %s because reply_to=%s", originator, reply_to); } return *process; } if (is_reply) { crm_trace("Will notify local clients for %s reply from %s", op, originator); *process = FALSE; *needs_reply = FALSE; *local_notify = TRUE; return TRUE; } skip_is_reply: *process = TRUE; *needs_reply = FALSE; *local_notify = pcmk__str_eq(delegated, OUR_NODENAME, pcmk__str_casei); host = crm_element_value(request, PCMK__XA_CIB_HOST); if (pcmk__str_eq(host, OUR_NODENAME, pcmk__str_casei)) { crm_trace("Processing %s request sent to us from %s", op, originator); *needs_reply = TRUE; return TRUE; } else if (host != NULL) { crm_trace("Ignoring %s request intended for CIB manager on %s", op, host); return FALSE; } else if(is_reply == FALSE && pcmk__str_eq(op, CRM_OP_PING, pcmk__str_casei)) { *needs_reply = TRUE; } crm_trace("Processing %s request broadcast by %s call %s on %s " "(local clients will%s be notified)", op, pcmk__s(crm_element_value(request, PCMK__XA_CIB_CLIENTNAME), "client"), pcmk__s(crm_element_value(request, PCMK__XA_CIB_CALLID), "without ID"), originator, (*local_notify? "" : "not")); return TRUE; } /*! * \internal * \brief Forward a CIB request to the appropriate target host(s) * * \param[in] request CIB request to forward */ static void forward_request(xmlNode *request) { const char *op = crm_element_value(request, PCMK__XA_CIB_OP); const char *section = crm_element_value(request, PCMK__XA_CIB_SECTION); const char *host = crm_element_value(request, PCMK__XA_CIB_HOST); const char *originator = crm_element_value(request, PCMK__XA_SRC); const char *client_name = crm_element_value(request, PCMK__XA_CIB_CLIENTNAME); const char *call_id = crm_element_value(request, PCMK__XA_CIB_CALLID); pcmk__node_status_t *peer = NULL; int log_level = LOG_INFO; if (pcmk__str_eq(op, PCMK__CIB_REQUEST_NOOP, pcmk__str_none)) { log_level = LOG_DEBUG; } do_crm_log(log_level, "Forwarding %s operation for section %s to %s (origin=%s/%s/%s)", pcmk__s(op, "invalid"), pcmk__s(section, "all"), pcmk__s(host, "all"), pcmk__s(originator, "local"), pcmk__s(client_name, "unspecified"), pcmk__s(call_id, "unspecified")); crm_xml_add(request, PCMK__XA_CIB_DELEGATED_FROM, OUR_NODENAME); if (host != NULL) { peer = pcmk__get_node(0, host, NULL, pcmk__node_search_cluster_member); } pcmk__cluster_send_message(peer, pcmk_ipc_based, request); // Return the request to its original state pcmk__xe_remove_attr(request, PCMK__XA_CIB_DELEGATED_FROM); } static void send_peer_reply(xmlNode *msg, const char *originator) { const pcmk__node_status_t *node = NULL; if ((msg == NULL) || (originator == NULL)) { return; } // Send reply via cluster to originating node node = pcmk__get_node(0, originator, NULL, pcmk__node_search_cluster_member); crm_trace("Sending request result to %s only", originator); crm_xml_add(msg, PCMK__XA_CIB_ISREPLYTO, originator); pcmk__cluster_send_message(node, pcmk_ipc_based, msg); } /*! * \internal * \brief Handle an IPC or CPG message containing a request * * \param[in,out] request Request XML * \param[in] privileged Whether privileged commands may be run * (see cib_server_ops[] definition) * \param[in] cib_client IPC client that sent request (or NULL if CPG) * * \return Legacy Pacemaker return code */ int cib_process_request(xmlNode *request, gboolean privileged, const pcmk__client_t *cib_client) { // @TODO: Break into multiple smaller functions uint32_t call_options = cib_none; gboolean process = TRUE; // Whether to process request locally now gboolean is_update = TRUE; // Whether request would modify CIB gboolean needs_reply = TRUE; // Whether to build a reply gboolean local_notify = FALSE; // Whether to notify (local) requester gboolean needs_forward = FALSE; // Whether to forward request somewhere else xmlNode *op_reply = NULL; xmlNode *result_diff = NULL; int rc = pcmk_ok; const char *op = crm_element_value(request, PCMK__XA_CIB_OP); const char *originator = crm_element_value(request, PCMK__XA_SRC); const char *host = crm_element_value(request, PCMK__XA_CIB_HOST); const char *call_id = crm_element_value(request, PCMK__XA_CIB_CALLID); const char *client_id = crm_element_value(request, PCMK__XA_CIB_CLIENTID); const char *client_name = crm_element_value(request, PCMK__XA_CIB_CLIENTNAME); const char *reply_to = crm_element_value(request, PCMK__XA_CIB_ISREPLYTO); const cib__operation_t *operation = NULL; cib__op_fn_t op_function = NULL; rc = pcmk__xe_get_flags(request, PCMK__XA_CIB_CALLOPT, &call_options, cib_none); if (rc != pcmk_rc_ok) { crm_warn("Couldn't parse options from request: %s", pcmk_rc_str(rc)); } if ((host != NULL) && (*host == '\0')) { host = NULL; } if (cib_client == NULL) { crm_trace("Processing peer %s operation from %s/%s on %s intended for %s (reply=%s)", op, pcmk__s(client_name, "client"), call_id, originator, pcmk__s(host, "all"), reply_to); } else { crm_xml_add(request, PCMK__XA_SRC, OUR_NODENAME); crm_trace("Processing local %s operation from %s/%s intended for %s", op, pcmk__s(client_name, "client"), call_id, pcmk__s(host, "all")); } rc = cib__get_operation(op, &operation); rc = pcmk_rc2legacy(rc); if (rc != pcmk_ok) { /* TODO: construct error reply? */ crm_err("Pre-processing of command failed: %s", pcmk_strerror(rc)); return rc; } op_function = based_get_op_function(operation); if (op_function == NULL) { crm_err("Operation %s not supported by CIB manager", op); return -EOPNOTSUPP; } if (cib_client != NULL) { parse_local_options(cib_client, operation, host, op, &local_notify, &needs_reply, &process, &needs_forward); } else if (!parse_peer_options(operation, request, &local_notify, &needs_reply, &process)) { return rc; } if (pcmk_is_set(call_options, cib_transaction)) { /* All requests in a transaction are processed locally against a working * CIB copy, and we don't notify for individual requests because the * entire transaction is atomic. * * We still call the option parser functions above, for the sake of log * messages and checking whether we're the target for peer requests. */ process = TRUE; needs_reply = FALSE; local_notify = FALSE; needs_forward = FALSE; } is_update = pcmk_is_set(operation->flags, cib__op_attr_modifies); if (pcmk_is_set(call_options, cib_discard_reply)) { /* If the request will modify the CIB, and we are in legacy mode, we * need to build a reply so we can broadcast a diff, even if the * requester doesn't want one. */ needs_reply = FALSE; local_notify = FALSE; crm_trace("Client is not interested in the reply"); } if (needs_forward) { forward_request(request); return rc; } if (cib_status != pcmk_ok) { rc = cib_status; crm_err("Ignoring request because cluster configuration is invalid " "(please repair and restart): %s", pcmk_strerror(rc)); op_reply = create_cib_reply(op, call_id, client_id, call_options, rc, the_cib); } else if (process) { time_t finished = 0; time_t now = time(NULL); int level = LOG_INFO; const char *section = crm_element_value(request, PCMK__XA_CIB_SECTION); const char *admin_epoch_s = NULL; const char *epoch_s = NULL; const char *num_updates_s = NULL; rc = cib_process_command(request, operation, op_function, &op_reply, &result_diff, privileged); if (!is_update) { level = LOG_TRACE; } else if (pcmk__xe_attr_is_true(request, PCMK__XA_CIB_UPDATE)) { switch (rc) { case pcmk_ok: level = LOG_INFO; break; case -pcmk_err_old_data: case -pcmk_err_diff_resync: case -pcmk_err_diff_failed: level = LOG_TRACE; break; default: level = LOG_ERR; } } else if (rc != pcmk_ok) { level = LOG_WARNING; } if (the_cib != NULL) { admin_epoch_s = crm_element_value(the_cib, PCMK_XA_ADMIN_EPOCH); epoch_s = crm_element_value(the_cib, PCMK_XA_EPOCH); num_updates_s = crm_element_value(the_cib, PCMK_XA_NUM_UPDATES); } do_crm_log(level, "Completed %s operation for section %s: %s (rc=%d, origin=%s/%s/%s, version=%s.%s.%s)", op, section ? section : "'all'", pcmk_strerror(rc), rc, originator ? originator : "local", pcmk__s(client_name, "client"), call_id, pcmk__s(admin_epoch_s, "0"), pcmk__s(epoch_s, "0"), pcmk__s(num_updates_s, "0")); finished = time(NULL); if ((finished - now) > 3) { crm_trace("%s operation took %lds to complete", op, (long)(finished - now)); crm_write_blackbox(0, NULL); } if (op_reply == NULL && (needs_reply || local_notify)) { crm_err("Unexpected NULL reply to message"); crm_log_xml_err(request, "null reply"); needs_reply = FALSE; local_notify = FALSE; } } if (is_update) { crm_trace("Completed pre-sync update from %s/%s/%s%s", originator ? originator : "local", pcmk__s(client_name, "client"), call_id, local_notify?" with local notification":""); } else if (!needs_reply || stand_alone) { // This was a non-originating secondary update crm_trace("Completed update as secondary"); } else if ((cib_client == NULL) && !pcmk_is_set(call_options, cib_discard_reply)) { if (is_update == FALSE || result_diff == NULL) { crm_trace("Request not broadcast: R/O call"); } else if (rc != pcmk_ok) { crm_trace("Request not broadcast: call failed: %s", pcmk_strerror(rc)); } else { crm_trace("Directing reply to %s", originator); } send_peer_reply(op_reply, originator); } if (local_notify && client_id) { crm_trace("Performing local %ssync notification for %s", (pcmk_is_set(call_options, cib_sync_call)? "" : "a"), client_id); if (process == FALSE) { do_local_notify(request, client_id, pcmk_is_set(call_options, cib_sync_call), (cib_client == NULL)); } else { do_local_notify(op_reply, client_id, pcmk_is_set(call_options, cib_sync_call), (cib_client == NULL)); } } pcmk__xml_free(op_reply); pcmk__xml_free(result_diff); return rc; } /*! * \internal * \brief Get a CIB operation's input from the request XML * * \param[in] request CIB request XML * \param[in] type CIB operation type * \param[out] section Where to store CIB section name * * \return Input XML for CIB operation * * \note If not \c NULL, the return value is a non-const pointer to part of * \p request. The caller should not free it directly. */ static xmlNode * prepare_input(const xmlNode *request, enum cib__op_type type, const char **section) { xmlNode *wrapper = pcmk__xe_first_child(request, PCMK__XE_CIB_CALLDATA, NULL, NULL); xmlNode *input = pcmk__xe_first_child(wrapper, NULL, NULL, NULL); if (type == cib__op_apply_patch) { *section = NULL; } else { *section = crm_element_value(request, PCMK__XA_CIB_SECTION); } // Grab the specified section if ((*section != NULL) && pcmk__xe_is(input, PCMK_XE_CIB)) { input = pcmk_find_cib_element(input, *section); } return input; } #define XPATH_CONFIG_CHANGE \ "//" PCMK_XE_CHANGE \ "[contains(@" PCMK_XA_PATH ",'/" PCMK_XE_CRM_CONFIG "/')]" static bool contains_config_change(xmlNode *diff) { bool changed = false; if (diff) { xmlXPathObject *xpathObj = pcmk__xpath_search(diff->doc, XPATH_CONFIG_CHANGE); if (pcmk__xpath_num_results(xpathObj) > 0) { changed = true; } xmlXPathFreeObject(xpathObj); } return changed; } static int cib_process_command(xmlNode *request, const cib__operation_t *operation, cib__op_fn_t op_function, xmlNode **reply, xmlNode **cib_diff, bool privileged) { xmlNode *input = NULL; xmlNode *output = NULL; xmlNode *result_cib = NULL; uint32_t call_options = cib_none; const char *op = NULL; const char *section = NULL; const char *call_id = crm_element_value(request, PCMK__XA_CIB_CALLID); const char *client_id = crm_element_value(request, PCMK__XA_CIB_CLIENTID); const char *client_name = crm_element_value(request, PCMK__XA_CIB_CLIENTNAME); const char *originator = crm_element_value(request, PCMK__XA_SRC); int rc = pcmk_ok; bool config_changed = false; bool manage_counters = true; static mainloop_timer_t *digest_timer = NULL; pcmk__assert(cib_status == pcmk_ok); if(digest_timer == NULL) { digest_timer = mainloop_timer_add("digester", 5000, FALSE, cib_digester_cb, NULL); } *reply = NULL; *cib_diff = NULL; /* Start processing the request... */ op = crm_element_value(request, PCMK__XA_CIB_OP); rc = pcmk__xe_get_flags(request, PCMK__XA_CIB_CALLOPT, &call_options, cib_none); if (rc != pcmk_rc_ok) { crm_warn("Couldn't parse options from request: %s", pcmk_rc_str(rc)); } if (!privileged && pcmk_is_set(operation->flags, cib__op_attr_privileged)) { rc = -EACCES; crm_trace("Failed due to lack of privileges: %s", pcmk_strerror(rc)); goto done; } input = prepare_input(request, operation->type, §ion); if (!pcmk_is_set(operation->flags, cib__op_attr_modifies)) { rc = cib_perform_op(NULL, op, call_options, op_function, true, section, request, input, false, &config_changed, &the_cib, &result_cib, NULL, &output); CRM_CHECK(result_cib == NULL, pcmk__xml_free(result_cib)); goto done; } /* @COMPAT: Handle a valid write action (legacy) * * @TODO: Re-evaluate whether this is all truly legacy. The cib_force_diff * portion is. However, PCMK__XA_CIB_UPDATE may be set by a sync operation * even in non-legacy mode, and manage_counters tells xml_create_patchset() * whether to update version/epoch info. */ if (pcmk__xe_attr_is_true(request, PCMK__XA_CIB_UPDATE)) { manage_counters = false; cib__set_call_options(call_options, "call", cib_force_diff); crm_trace("Global update detected"); CRM_LOG_ASSERT(pcmk__str_any_of(op, PCMK__CIB_REQUEST_APPLY_PATCH, PCMK__CIB_REQUEST_REPLACE, NULL)); } ping_modified_since = TRUE; // result_cib must not be modified after cib_perform_op() returns rc = cib_perform_op(NULL, op, call_options, op_function, false, section, request, input, manage_counters, &config_changed, &the_cib, &result_cib, cib_diff, &output); /* Always write to disk for successful ops with the flag set. This also * negates the need to detect ordering changes. */ if ((rc == pcmk_ok) && pcmk_is_set(operation->flags, cib__op_attr_writes_through)) { config_changed = true; } if ((rc == pcmk_ok) && !pcmk_any_flags_set(call_options, cib_dryrun|cib_transaction)) { if (result_cib != the_cib) { if (pcmk_is_set(operation->flags, cib__op_attr_writes_through)) { config_changed = true; } crm_trace("Activating %s->%s%s", crm_element_value(the_cib, PCMK_XA_NUM_UPDATES), crm_element_value(result_cib, PCMK_XA_NUM_UPDATES), (config_changed? " changed" : "")); rc = activateCibXml(result_cib, config_changed, op); if (rc != pcmk_ok) { crm_err("Failed to activate new CIB: %s", pcmk_strerror(rc)); } } if ((rc == pcmk_ok) && contains_config_change(*cib_diff)) { cib_read_config(config_hash, result_cib); } /* @COMPAT Nodes older than feature set 3.19.0 don't support * transactions. In a mixed-version cluster with nodes <3.19.0, we must * sync the updated CIB, so that the older nodes receive the changes. * Any node that has already applied the transaction will ignore the * synced CIB. * * To ensure the updated CIB is synced from only one node, we sync it * from the originator. */ if ((operation->type == cib__op_commit_transact) && pcmk__str_eq(originator, OUR_NODENAME, pcmk__str_casei) && compare_version(crm_element_value(the_cib, PCMK_XA_CRM_FEATURE_SET), "3.19.0") < 0) { sync_our_cib(request, TRUE); } mainloop_timer_stop(digest_timer); mainloop_timer_start(digest_timer); } else if (rc == -pcmk_err_schema_validation) { pcmk__assert(result_cib != the_cib); if (output != NULL) { crm_log_xml_info(output, "cib:output"); pcmk__xml_free(output); } output = result_cib; } else { crm_trace("Not activating %d %d %s", rc, pcmk_is_set(call_options, cib_dryrun), crm_element_value(result_cib, PCMK_XA_NUM_UPDATES)); if (result_cib != the_cib) { pcmk__xml_free(result_cib); } } if (!pcmk_any_flags_set(call_options, cib_dryrun|cib_inhibit_notify|cib_transaction)) { crm_trace("Sending notifications %d", pcmk_is_set(call_options, cib_dryrun)); cib_diff_notify(op, rc, call_id, client_id, client_name, originator, input, *cib_diff); } pcmk__log_xml_patchset(LOG_TRACE, *cib_diff); done: if (!pcmk_is_set(call_options, cib_discard_reply)) { *reply = create_cib_reply(op, call_id, client_id, call_options, rc, output); } if (output != the_cib) { pcmk__xml_free(output); } crm_trace("done"); return rc; } void cib_peer_callback(xmlNode * msg, void *private_data) { const char *reason = NULL; const char *originator = crm_element_value(msg, PCMK__XA_SRC); if (pcmk__peer_cache == NULL) { reason = "membership not established"; goto bail; } if (crm_element_value(msg, PCMK__XA_CIB_CLIENTNAME) == NULL) { crm_xml_add(msg, PCMK__XA_CIB_CLIENTNAME, originator); } /* crm_log_xml_trace(msg, "Peer[inbound]"); */ cib_process_request(msg, TRUE, NULL); return; bail: if (reason) { const char *op = crm_element_value(msg, PCMK__XA_CIB_OP); crm_warn("Discarding %s message from %s: %s", op, originator, reason); } } static gboolean cib_force_exit(gpointer data) { crm_notice("Exiting immediately after %s without shutdown acknowledgment", pcmk__readable_interval(EXIT_ESCALATION_MS)); terminate_cib(CRM_EX_ERROR); return FALSE; } static void disconnect_remote_client(gpointer key, gpointer value, gpointer user_data) { pcmk__client_t *a_client = value; crm_err("Can't disconnect client %s: Not implemented", pcmk__client_name(a_client)); } static void initiate_exit(void) { int active = 0; xmlNode *leaving = NULL; active = pcmk__cluster_num_active_nodes(); if (active < 2) { // This is the last active node crm_info("Exiting without sending shutdown request (no active peers)"); terminate_cib(CRM_EX_OK); return; } crm_info("Sending shutdown request to %d peers", active); leaving = pcmk__xe_create(NULL, PCMK__XE_EXIT_NOTIFICATION); crm_xml_add(leaving, PCMK__XA_T, PCMK__VALUE_CIB); crm_xml_add(leaving, PCMK__XA_CIB_OP, PCMK__CIB_REQUEST_SHUTDOWN); pcmk__cluster_send_message(NULL, pcmk_ipc_based, leaving); pcmk__xml_free(leaving); pcmk__create_timer(EXIT_ESCALATION_MS, cib_force_exit, NULL); } void cib_shutdown(int nsig) { struct qb_ipcs_stats srv_stats; if (cib_shutdown_flag == FALSE) { int disconnects = 0; qb_ipcs_connection_t *c = NULL; cib_shutdown_flag = TRUE; c = qb_ipcs_connection_first_get(ipcs_rw); while (c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_rw, last); crm_debug("Disconnecting r/w client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } c = qb_ipcs_connection_first_get(ipcs_ro); while (c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_ro, last); crm_debug("Disconnecting r/o client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } c = qb_ipcs_connection_first_get(ipcs_shm); while (c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_shm, last); crm_debug("Disconnecting non-blocking r/w client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } disconnects += pcmk__ipc_client_count(); crm_debug("Disconnecting %d remote clients", pcmk__ipc_client_count()); pcmk__foreach_ipc_client(disconnect_remote_client, NULL); crm_info("Disconnected %d clients", disconnects); } qb_ipcs_stats_get(ipcs_rw, &srv_stats, QB_FALSE); if (pcmk__ipc_client_count() == 0) { crm_info("All clients disconnected (%d)", srv_stats.active_connections); initiate_exit(); } else { crm_info("Waiting on %d clients to disconnect (%d)", pcmk__ipc_client_count(), srv_stats.active_connections); } } extern int remote_fd; extern int remote_tls_fd; /*! * \internal * \brief Close remote sockets, free the global CIB and quit * * \param[in] exit_status What exit status to use (if -1, use CRM_EX_OK, but * skip disconnecting from the cluster layer) */ void terminate_cib(int exit_status) { if (remote_fd > 0) { close(remote_fd); remote_fd = 0; } if (remote_tls_fd > 0) { close(remote_tls_fd); remote_tls_fd = 0; } uninitializeCib(); // Exit immediately on error if (exit_status > CRM_EX_OK) { pcmk__stop_based_ipc(ipcs_ro, ipcs_rw, ipcs_shm); crm_exit(exit_status); return; } if ((mainloop != NULL) && g_main_loop_is_running(mainloop)) { /* Quit via returning from the main loop. If exit_status has the special * value -1, we skip the disconnect here, and it will be done when the * main loop returns (this allows the peer status callback to avoid * messing with the peer caches). */ if (exit_status == CRM_EX_OK) { pcmk_cluster_disconnect(crm_cluster); } g_main_loop_quit(mainloop); return; } /* Exit cleanly. Even the peer status callback can disconnect here, because * we're not returning control to the caller. */ pcmk_cluster_disconnect(crm_cluster); pcmk__stop_based_ipc(ipcs_ro, ipcs_rw, ipcs_shm); crm_exit(CRM_EX_OK); } diff --git a/daemons/controld/controld_control.c b/daemons/controld/controld_control.c index 220f830566..414a62755f 100644 --- a/daemons/controld/controld_control.c +++ b/daemons/controld/controld_control.c @@ -1,725 +1,725 @@ /* * Copyright 2004-2025 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU General Public License version 2 * or later (GPLv2+) WITHOUT ANY WARRANTY. */ #include #include #include #include #include #include #include #include #include #include static qb_ipcs_service_t *ipcs = NULL; static crm_trigger_t *config_read_trigger = NULL; #if SUPPORT_COROSYNC extern gboolean crm_connect_corosync(pcmk_cluster_t *cluster); #endif static void crm_shutdown(int nsig); static gboolean crm_read_options(gpointer user_data); /* A_HA_CONNECT */ void do_ha_control(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { gboolean registered = FALSE; if (controld_globals.cluster == NULL) { controld_globals.cluster = pcmk_cluster_new(); } if (action & A_HA_DISCONNECT) { pcmk_cluster_disconnect(controld_globals.cluster); crm_info("Disconnected from the cluster"); controld_set_fsa_input_flags(R_HA_DISCONNECTED); } if (action & A_HA_CONNECT) { pcmk__cluster_set_status_callback(&peer_update_callback); pcmk__cluster_set_autoreap(false); #if SUPPORT_COROSYNC if (pcmk_get_cluster_layer() == pcmk_cluster_layer_corosync) { registered = crm_connect_corosync(controld_globals.cluster); } #endif // SUPPORT_COROSYNC if (registered) { pcmk__node_status_t *node = controld_get_local_node_status(); controld_election_init(); free(controld_globals.our_uuid); controld_globals.our_uuid = pcmk__str_copy(pcmk__cluster_get_xml_id(node)); if (controld_globals.our_uuid == NULL) { crm_err("Could not obtain local uuid"); registered = FALSE; } } if (!registered) { controld_set_fsa_input_flags(R_HA_DISCONNECTED); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); return; } populate_cib_nodes(controld_node_update_none, __func__); controld_clear_fsa_input_flags(R_HA_DISCONNECTED); crm_info("Connected to the cluster"); } if (action & ~(A_HA_CONNECT | A_HA_DISCONNECT)) { crm_err("Unexpected action %s in %s", fsa_action2string(action), __func__); } } /* A_SHUTDOWN */ void do_shutdown(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { /* just in case */ controld_set_fsa_input_flags(R_SHUTDOWN); controld_disconnect_fencer(FALSE); } /* A_SHUTDOWN_REQ */ void do_shutdown_req(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { xmlNode *msg = NULL; controld_set_fsa_input_flags(R_SHUTDOWN); //controld_set_fsa_input_flags(R_STAYDOWN); crm_info("Sending shutdown request to all peers (DC is %s)", pcmk__s(controld_globals.dc_name, "not set")); msg = pcmk__new_request(pcmk_ipc_controld, CRM_SYSTEM_CRMD, NULL, CRM_SYSTEM_CRMD, CRM_OP_SHUTDOWN_REQ, NULL); if (!pcmk__cluster_send_message(NULL, pcmk_ipc_controld, msg)) { register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } pcmk__xml_free(msg); } void crmd_fast_exit(crm_exit_t exit_code) { if (pcmk_is_set(controld_globals.fsa_input_register, R_STAYDOWN)) { crm_warn("Inhibiting respawn " QB_XS " remapping exit code %d to %d", exit_code, CRM_EX_FATAL); exit_code = CRM_EX_FATAL; } else if ((exit_code == CRM_EX_OK) && pcmk_is_set(controld_globals.fsa_input_register, R_IN_RECOVERY)) { crm_err("Could not recover from internal error"); exit_code = CRM_EX_ERROR; } if (controld_globals.logger_out != NULL) { controld_globals.logger_out->finish(controld_globals.logger_out, exit_code, true, NULL); pcmk__output_free(controld_globals.logger_out); controld_globals.logger_out = NULL; } crm_exit(exit_code); } crm_exit_t crmd_exit(crm_exit_t exit_code) { GMainLoop *mloop = controld_globals.mainloop; static bool in_progress = FALSE; if (in_progress && (exit_code == CRM_EX_OK)) { crm_debug("Exit is already in progress"); return exit_code; } else if(in_progress) { crm_notice("Error during shutdown process, exiting now with status %d (%s)", exit_code, crm_exit_str(exit_code)); crm_write_blackbox(SIGTRAP, NULL); crmd_fast_exit(exit_code); } in_progress = TRUE; crm_trace("Preparing to exit with status %d (%s)", exit_code, crm_exit_str(exit_code)); /* Suppress secondary errors resulting from us disconnecting everything */ controld_set_fsa_input_flags(R_HA_DISCONNECTED); /* Close all IPC servers and clients to ensure any and all shared memory files are cleaned up */ if(ipcs) { crm_trace("Closing IPC server"); mainloop_del_ipc_server(ipcs); ipcs = NULL; } controld_close_attrd_ipc(); controld_shutdown_schedulerd_ipc(); controld_disconnect_fencer(TRUE); if ((exit_code == CRM_EX_OK) && (controld_globals.mainloop == NULL)) { crm_debug("No mainloop detected"); exit_code = CRM_EX_ERROR; } /* On an error, just get out. * * Otherwise, make the effort to have mainloop exit gracefully so * that it (mostly) cleans up after itself and valgrind has less * to report on - allowing real errors stand out */ if (exit_code != CRM_EX_OK) { crm_notice("Forcing immediate exit with status %d (%s)", exit_code, crm_exit_str(exit_code)); crm_write_blackbox(SIGTRAP, NULL); crmd_fast_exit(exit_code); } /* Clean up as much memory as possible for valgrind */ for (GList *iter = controld_globals.fsa_message_queue; iter != NULL; iter = iter->next) { fsa_data_t *fsa_data = (fsa_data_t *) iter->data; crm_info("Dropping %s: [ state=%s cause=%s origin=%s ]", fsa_input2string(fsa_data->fsa_input), fsa_state2string(controld_globals.fsa_state), fsa_cause2string(fsa_data->fsa_cause), fsa_data->origin); delete_fsa_input(fsa_data); } controld_clear_fsa_input_flags(R_MEMBERSHIP); g_list_free(controld_globals.fsa_message_queue); controld_globals.fsa_message_queue = NULL; controld_free_node_pending_timers(); election_reset(controld_globals.cluster); // Stop any election timer /* Tear down the CIB manager connection, but don't free it yet -- it could * be used when we drain the mainloop later. */ controld_disconnect_cib_manager(); verify_stopped(controld_globals.fsa_state, LOG_WARNING); controld_clear_fsa_input_flags(R_LRM_CONNECTED); lrm_state_destroy_all(); mainloop_destroy_trigger(config_read_trigger); config_read_trigger = NULL; controld_destroy_fsa_trigger(); controld_destroy_transition_trigger(); pcmk__client_cleanup(); pcmk__cluster_destroy_node_caches(); controld_free_fsa_timers(); te_cleanup_stonith_history_sync(NULL, TRUE); controld_free_sched_timer(); free(controld_globals.our_uuid); controld_globals.our_uuid = NULL; free(controld_globals.dc_name); controld_globals.dc_name = NULL; free(controld_globals.dc_version); controld_globals.dc_version = NULL; free(controld_globals.cluster_name); controld_globals.cluster_name = NULL; free(controld_globals.te_uuid); controld_globals.te_uuid = NULL; free_max_generation(); controld_destroy_failed_sync_table(); controld_destroy_outside_events_table(); mainloop_destroy_signal(SIGPIPE); mainloop_destroy_signal(SIGUSR1); mainloop_destroy_signal(SIGTERM); mainloop_destroy_signal(SIGTRAP); /* leave SIGCHLD engaged as we might still want to drain some service-actions */ if (mloop) { GMainContext *ctx = g_main_loop_get_context(controld_globals.mainloop); /* Don't re-enter this block */ controld_globals.mainloop = NULL; /* no signals on final draining anymore */ mainloop_destroy_signal(SIGCHLD); crm_trace("Draining mainloop %d %d", g_main_loop_is_running(mloop), g_main_context_pending(ctx)); { int lpc = 0; while((g_main_context_pending(ctx) && lpc < 10)) { lpc++; crm_trace("Iteration %d", lpc); g_main_context_dispatch(ctx); } } crm_trace("Closing mainloop %d %d", g_main_loop_is_running(mloop), g_main_context_pending(ctx)); g_main_loop_quit(mloop); /* Won't do anything yet, since we're inside it now */ g_main_loop_unref(mloop); } else { mainloop_destroy_signal(SIGCHLD); } cib_delete(controld_globals.cib_conn); controld_globals.cib_conn = NULL; throttle_fini(); pcmk_cluster_free(controld_globals.cluster); controld_globals.cluster = NULL; /* Graceful */ crm_trace("Done preparing for exit with status %d (%s)", exit_code, crm_exit_str(exit_code)); return exit_code; } /* A_EXIT_0, A_EXIT_1 */ void do_exit(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { crm_exit_t exit_code = CRM_EX_OK; if (pcmk_is_set(action, A_EXIT_1)) { exit_code = CRM_EX_ERROR; crm_err("Exiting now due to errors"); } verify_stopped(cur_state, LOG_ERR); crmd_exit(exit_code); } static void sigpipe_ignore(int nsig) { return; } /* A_STARTUP */ void do_startup(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { crm_debug("Registering Signal Handlers"); mainloop_add_signal(SIGTERM, crm_shutdown); mainloop_add_signal(SIGPIPE, sigpipe_ignore); config_read_trigger = mainloop_add_trigger(G_PRIORITY_HIGH, crm_read_options, NULL); controld_init_fsa_trigger(); controld_init_transition_trigger(); crm_debug("Creating CIB manager and executor objects"); controld_globals.cib_conn = cib_new(); lrm_state_init_local(); if (controld_init_fsa_timers() == FALSE) { register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } } // \return libqb error code (0 on success, -errno on error) static int32_t accept_controller_client(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { crm_trace("Accepting new IPC client connection"); if (pcmk__new_client(c, uid, gid) == NULL) { return -ENOMEM; } return 0; } // \return libqb error code (0 on success, -errno on error) static int32_t dispatch_controller_ipc(qb_ipcs_connection_t * c, void *data, size_t size) { int rc = pcmk_rc_ok; uint32_t id = 0; uint32_t flags = 0; pcmk__client_t *client = pcmk__find_client(c); xmlNode *msg = NULL; rc = pcmk__ipc_msg_append(&client->buffer, data); if (rc == pcmk_rc_ipc_more) { /* We haven't read the complete message yet, so just return. */ return 0; } else if (rc == pcmk_rc_ok) { /* We've read the complete message and there's already a header on * the front. Pass it off for processing. */ msg = pcmk__client_data2xml(client, &id, &flags); g_byte_array_free(client->buffer, TRUE); client->buffer = NULL; } else { /* Some sort of error occurred reassembling the message. All we can * do is clean up, log an error and return. */ crm_err("Error when reading IPC message: %s", pcmk_rc_str(rc)); if (client->buffer != NULL) { g_byte_array_free(client->buffer, TRUE); client->buffer = NULL; } return 0; } if (msg == NULL) { - pcmk__ipc_send_ack(client, id, flags, PCMK__XE_ACK, NULL, + pcmk__ipc_send_ack(client, id, flags, PCMK__XE_NACK, NULL, CRM_EX_PROTOCOL); return 0; } pcmk__ipc_send_ack(client, id, flags, PCMK__XE_ACK, NULL, CRM_EX_INDETERMINATE); pcmk__assert(client->user != NULL); pcmk__update_acl_user(msg, PCMK__XA_CRM_USER, client->user); crm_xml_add(msg, PCMK__XA_CRM_SYS_FROM, client->id); if (controld_authorize_ipc_message(msg, client, NULL)) { crm_trace("Processing IPC message from client %s", pcmk__client_name(client)); route_message(C_IPC_MESSAGE, msg); } controld_trigger_fsa(); pcmk__xml_free(msg); return 0; } static int32_t ipc_client_disconnected(qb_ipcs_connection_t *c) { pcmk__client_t *client = pcmk__find_client(c); if (client) { crm_trace("Disconnecting %sregistered client %s (%p/%p)", (client->userdata? "" : "un"), pcmk__client_name(client), c, client); free(client->userdata); pcmk__free_client(client); controld_trigger_fsa(); } return 0; } static void ipc_connection_destroyed(qb_ipcs_connection_t *c) { crm_trace("Connection %p", c); ipc_client_disconnected(c); } /* A_STOP */ void do_stop(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { crm_trace("Closing IPC server"); mainloop_del_ipc_server(ipcs); ipcs = NULL; register_fsa_input(C_FSA_INTERNAL, I_TERMINATE, NULL); } /* A_STARTED */ void do_started(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { static struct qb_ipcs_service_handlers crmd_callbacks = { .connection_accept = accept_controller_client, .connection_created = NULL, .msg_process = dispatch_controller_ipc, .connection_closed = ipc_client_disconnected, .connection_destroyed = ipc_connection_destroyed }; if (cur_state != S_STARTING) { crm_err("Start cancelled... %s", fsa_state2string(cur_state)); return; } else if (!pcmk_is_set(controld_globals.fsa_input_register, R_MEMBERSHIP)) { crm_info("Delaying start, no membership data (%.16llx)", R_MEMBERSHIP); crmd_fsa_stall(TRUE); return; } else if (!pcmk_is_set(controld_globals.fsa_input_register, R_LRM_CONNECTED)) { crm_info("Delaying start, not connected to executor (%.16llx)", R_LRM_CONNECTED); crmd_fsa_stall(TRUE); return; } else if (!pcmk_is_set(controld_globals.fsa_input_register, R_CIB_CONNECTED)) { crm_info("Delaying start, CIB not connected (%.16llx)", R_CIB_CONNECTED); crmd_fsa_stall(TRUE); return; } else if (!pcmk_is_set(controld_globals.fsa_input_register, R_READ_CONFIG)) { crm_info("Delaying start, Config not read (%.16llx)", R_READ_CONFIG); crmd_fsa_stall(TRUE); return; } else if (!pcmk_is_set(controld_globals.fsa_input_register, R_PEER_DATA)) { crm_info("Delaying start, No peer data (%.16llx)", R_PEER_DATA); crmd_fsa_stall(TRUE); return; } crm_debug("Init server comms"); ipcs = pcmk__serve_controld_ipc(&crmd_callbacks); if (ipcs == NULL) { crm_err("Failed to create IPC server: shutting down and inhibiting respawn"); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } else { crm_notice("Pacemaker controller successfully started and accepting connections"); } controld_set_fsa_input_flags(R_ST_REQUIRED); controld_timer_fencer_connect(GINT_TO_POINTER(TRUE)); controld_clear_fsa_input_flags(R_STARTING); register_fsa_input(msg_data->fsa_cause, I_PENDING, NULL); } /* A_RECOVER */ void do_recover(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { controld_set_fsa_input_flags(R_IN_RECOVERY); crm_warn("Fast-tracking shutdown in response to errors"); register_fsa_input(C_FSA_INTERNAL, I_TERMINATE, NULL); } static void config_query_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void *user_data) { const char *value = NULL; GHashTable *config_hash = NULL; crm_time_t *now = crm_time_new(NULL); xmlNode *crmconfig = NULL; xmlNode *alerts = NULL; pcmk_rule_input_t rule_input = { .now = now, }; if (rc != pcmk_ok) { fsa_data_t *msg_data = NULL; crm_err("Local CIB query resulted in an error: %s", pcmk_strerror(rc)); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); if (rc == -EACCES || rc == -pcmk_err_schema_validation) { crm_err("The cluster is mis-configured - shutting down and staying down"); controld_set_fsa_input_flags(R_STAYDOWN); } goto bail; } crmconfig = output; if ((crmconfig != NULL) && !pcmk__xe_is(crmconfig, PCMK_XE_CRM_CONFIG)) { crmconfig = pcmk__xe_first_child(crmconfig, PCMK_XE_CRM_CONFIG, NULL, NULL); } if (!crmconfig) { fsa_data_t *msg_data = NULL; crm_err("Local CIB query for " PCMK_XE_CRM_CONFIG " section failed"); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); goto bail; } crm_debug("Call %d : Parsing CIB options", call_id); config_hash = pcmk__strkey_table(free, free); pcmk_unpack_nvpair_blocks(crmconfig, PCMK_XE_CLUSTER_PROPERTY_SET, PCMK_VALUE_CIB_BOOTSTRAP_OPTIONS, &rule_input, config_hash, NULL); // Validate all options, and use defaults if not already present in hash pcmk__validate_cluster_options(config_hash); /* Validate the watchdog timeout in the context of the local node * environment. If invalid, the controller will exit with a fatal error. * * We do this via a wrapper in the controller, so that we call * pcmk__valid_stonith_watchdog_timeout() only if watchdog fencing is * enabled for the local node. Otherwise, we may exit unnecessarily. * * A validator function in libcrmcommon can't act as such a wrapper, because * it doesn't have a stonith API connection or the local node name. */ value = g_hash_table_lookup(config_hash, PCMK_OPT_STONITH_WATCHDOG_TIMEOUT); controld_verify_stonith_watchdog_timeout(value); value = g_hash_table_lookup(config_hash, PCMK_OPT_NO_QUORUM_POLICY); if (pcmk__strcase_any_of(value, PCMK_VALUE_FENCE, PCMK_VALUE_FENCE_LEGACY, NULL) && (pcmk__locate_sbd() != 0)) { controld_set_global_flags(controld_no_quorum_panic); } value = g_hash_table_lookup(config_hash, PCMK_OPT_SHUTDOWN_LOCK); if (crm_is_true(value)) { controld_set_global_flags(controld_shutdown_lock_enabled); } else { controld_clear_global_flags(controld_shutdown_lock_enabled); } value = g_hash_table_lookup(config_hash, PCMK_OPT_SHUTDOWN_LOCK_LIMIT); pcmk_parse_interval_spec(value, &controld_globals.shutdown_lock_limit); controld_globals.shutdown_lock_limit /= 1000; value = g_hash_table_lookup(config_hash, PCMK_OPT_NODE_PENDING_TIMEOUT); pcmk_parse_interval_spec(value, &controld_globals.node_pending_timeout); controld_globals.node_pending_timeout /= 1000; value = g_hash_table_lookup(config_hash, PCMK_OPT_CLUSTER_NAME); pcmk__str_update(&(controld_globals.cluster_name), value); // Let subcomponents initialize their own static variables controld_configure_election(config_hash); controld_configure_fencing(config_hash); controld_configure_fsa_timers(config_hash); controld_configure_throttle(config_hash); alerts = pcmk__xe_first_child(output, PCMK_XE_ALERTS, NULL, NULL); crmd_unpack_alerts(alerts); controld_set_fsa_input_flags(R_READ_CONFIG); controld_trigger_fsa(); g_hash_table_destroy(config_hash); bail: crm_time_free(now); } /*! * \internal * \brief Trigger read and processing of the configuration * * \param[in] fn Calling function name * \param[in] line Line number where call occurred */ void controld_trigger_config_as(const char *fn, int line) { if (config_read_trigger != NULL) { crm_trace("%s:%d - Triggered config processing", fn, line); mainloop_set_trigger(config_read_trigger); } } gboolean crm_read_options(gpointer user_data) { cib_t *cib_conn = controld_globals.cib_conn; int call_id = cib_conn->cmds->query(cib_conn, "//" PCMK_XE_CRM_CONFIG " | //" PCMK_XE_ALERTS, NULL, cib_xpath); fsa_register_cib_callback(call_id, NULL, config_query_callback); crm_trace("Querying the CIB... call %d", call_id); return TRUE; } /* A_READCONFIG */ void do_read_config(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { throttle_init(); controld_trigger_config(); } static void crm_shutdown(int nsig) { const char *value = NULL; guint default_period_ms = 0; if ((controld_globals.mainloop == NULL) || !g_main_loop_is_running(controld_globals.mainloop)) { crmd_exit(CRM_EX_OK); return; } if (pcmk_is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) { crm_err("Escalating shutdown"); register_fsa_input_before(C_SHUTDOWN, I_ERROR, NULL); return; } controld_set_fsa_input_flags(R_SHUTDOWN); register_fsa_input(C_SHUTDOWN, I_SHUTDOWN, NULL); /* If shutdown timer doesn't have a period set, use the default * * @TODO: Evaluate whether this is still necessary. As long as * config_query_callback() has been run at least once, it doesn't look like * anything could have changed the timer period since then. */ value = pcmk__cluster_option(NULL, PCMK_OPT_SHUTDOWN_ESCALATION); pcmk_parse_interval_spec(value, &default_period_ms); controld_shutdown_start_countdown(default_period_ms); } diff --git a/daemons/pacemakerd/pcmkd_messages.c b/daemons/pacemakerd/pcmkd_messages.c index 21ebb7d8b3..341ed5c06c 100644 --- a/daemons/pacemakerd/pcmkd_messages.c +++ b/daemons/pacemakerd/pcmkd_messages.c @@ -1,309 +1,309 @@ /* * Copyright 2010-2025 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU General Public License version 2 * or later (GPLv2+) WITHOUT ANY WARRANTY. */ #include #include "pacemakerd.h" #include #include #include #include #include #include #include #include static GHashTable *pcmkd_handlers = NULL; static xmlNode * handle_node_cache_request(pcmk__request_t *request) { crm_trace("Ignoring request from client %s to purge node " "because peer cache is not used", pcmk__client_name(request->ipc_client)); pcmk__ipc_send_ack(request->ipc_client, request->ipc_id, request->ipc_flags, PCMK__XE_ACK, NULL, CRM_EX_OK); return NULL; } static xmlNode * handle_ping_request(pcmk__request_t *request) { xmlNode *msg = request->xml; const char *value = NULL; xmlNode *ping = NULL; xmlNode *reply = NULL; const char *from = crm_element_value(msg, PCMK__XA_CRM_SYS_FROM); /* Pinged for status */ crm_trace("Pinged from " PCMK__XA_CRM_SYS_FROM "='%s' " PCMK_XA_ORIGIN "='%s'", pcmk__s(from, ""), pcmk__s(crm_element_value(msg, PCMK_XA_ORIGIN), "")); pcmk__ipc_send_ack(request->ipc_client, request->ipc_id, request->ipc_flags, PCMK__XE_ACK, NULL, CRM_EX_INDETERMINATE); ping = pcmk__xe_create(NULL, PCMK__XE_PING_RESPONSE); value = crm_element_value(msg, PCMK__XA_CRM_SYS_TO); crm_xml_add(ping, PCMK__XA_CRM_SUBSYSTEM, value); crm_xml_add(ping, PCMK__XA_PACEMAKERD_STATE, pacemakerd_state); crm_xml_add_ll(ping, PCMK_XA_CRM_TIMESTAMP, (long long) subdaemon_check_progress); crm_xml_add(ping, PCMK_XA_RESULT, "ok"); reply = pcmk__new_reply(msg, ping); pcmk__xml_free(ping); if (reply == NULL) { pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR, "Failed building ping reply for client %s", pcmk__client_name(request->ipc_client)); } else { pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL); } /* just proceed state on sbd pinging us */ if (from && strstr(from, "sbd")) { if (pcmk__str_eq(pacemakerd_state, PCMK__VALUE_SHUTDOWN_COMPLETE, pcmk__str_none)) { if (pcmk__get_sbd_sync_resource_startup()) { crm_notice("Shutdown-complete-state passed to SBD."); } shutdown_complete_state_reported_to = request->ipc_client->pid; } else if (pcmk__str_eq(pacemakerd_state, PCMK__VALUE_WAIT_FOR_PING, pcmk__str_none)) { crm_notice("Received startup-trigger from SBD."); pacemakerd_state = PCMK__VALUE_STARTING_DAEMONS; mainloop_set_trigger(startup_trigger); } } return reply; } static xmlNode * handle_shutdown_request(pcmk__request_t *request) { xmlNode *msg = request->xml; xmlNode *shutdown = NULL; xmlNode *reply = NULL; /* Only allow privileged users (i.e. root or hacluster) to shut down * Pacemaker from the command line (or direct IPC), so that other users * are forced to go through the CIB and have ACLs applied. */ bool allowed = pcmk_is_set(request->ipc_client->flags, pcmk__client_privileged); pcmk__ipc_send_ack(request->ipc_client, request->ipc_id, request->ipc_flags, PCMK__XE_ACK, NULL, CRM_EX_INDETERMINATE); shutdown = pcmk__xe_create(NULL, PCMK__XE_SHUTDOWN); if (allowed) { crm_notice("Shutting down in response to IPC request %s from %s", crm_element_value(msg, PCMK_XA_REFERENCE), crm_element_value(msg, PCMK_XA_ORIGIN)); crm_xml_add_int(shutdown, PCMK__XA_OP_STATUS, CRM_EX_OK); } else { crm_warn("Ignoring shutdown request from unprivileged client %s", pcmk__client_name(request->ipc_client)); crm_xml_add_int(shutdown, PCMK__XA_OP_STATUS, CRM_EX_INSUFFICIENT_PRIV); } reply = pcmk__new_reply(msg, shutdown); pcmk__xml_free(shutdown); if (reply == NULL) { pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR, "Failed building shutdown reply for client %s", pcmk__client_name(request->ipc_client)); } else { pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL); } if (allowed) { pcmk_shutdown(15); } return reply; } static xmlNode * handle_unknown_request(pcmk__request_t *request) { pcmk__ipc_send_ack(request->ipc_client, request->ipc_id, request->ipc_flags, - PCMK__XE_ACK, NULL, CRM_EX_PROTOCOL); + PCMK__XE_NACK, NULL, CRM_EX_PROTOCOL); pcmk__format_result(&request->result, CRM_EX_PROTOCOL, PCMK_EXEC_INVALID, "Unknown IPC request type '%s' (bug?)", pcmk__s(request->op, "")); return NULL; } static void pcmkd_register_handlers(void) { pcmk__server_command_t handlers[] = { { CRM_OP_RM_NODE_CACHE, handle_node_cache_request }, { CRM_OP_PING, handle_ping_request }, { CRM_OP_QUIT, handle_shutdown_request }, { NULL, handle_unknown_request }, }; pcmkd_handlers = pcmk__register_handlers(handlers); } static int32_t pcmk_ipc_accept(qb_ipcs_connection_t * c, uid_t uid, gid_t gid) { crm_trace("Connection %p", c); if (pcmk__new_client(c, uid, gid) == NULL) { return -ENOMEM; } return 0; } /* Error code means? */ static int32_t pcmk_ipc_closed(qb_ipcs_connection_t * c) { pcmk__client_t *client = pcmk__find_client(c); if (client == NULL) { return 0; } crm_trace("Connection %p", c); if (shutdown_complete_state_reported_to == client->pid) { shutdown_complete_state_reported_client_closed = TRUE; if (shutdown_trigger) { mainloop_set_trigger(shutdown_trigger); } } pcmk__free_client(client); return 0; } static void pcmk_ipc_destroy(qb_ipcs_connection_t * c) { crm_trace("Connection %p", c); pcmk_ipc_closed(c); } /* Exit code means? */ static int32_t pcmk_ipc_dispatch(qb_ipcs_connection_t * qbc, void *data, size_t size) { int rc = pcmk_rc_ok; uint32_t id = 0; uint32_t flags = 0; xmlNode *msg = NULL; pcmk__client_t *c = pcmk__find_client(qbc); CRM_CHECK(c != NULL, return 0); if (pcmkd_handlers == NULL) { pcmkd_register_handlers(); } rc = pcmk__ipc_msg_append(&c->buffer, data); if (rc == pcmk_rc_ipc_more) { /* We haven't read the complete message yet, so just return. */ return 0; } else if (rc == pcmk_rc_ok) { /* We've read the complete message and there's already a header on * the front. Pass it off for processing. */ msg = pcmk__client_data2xml(c, &id, &flags); g_byte_array_free(c->buffer, TRUE); c->buffer = NULL; } else { /* Some sort of error occurred reassembling the message. All we can * do is clean up, log an error and return. */ crm_err("Error when reading IPC message: %s", pcmk_rc_str(rc)); if (c->buffer != NULL) { g_byte_array_free(c->buffer, TRUE); c->buffer = NULL; } return 0; } if (msg == NULL) { - pcmk__ipc_send_ack(c, id, flags, PCMK__XE_ACK, NULL, CRM_EX_PROTOCOL); + pcmk__ipc_send_ack(c, id, flags, PCMK__XE_NACK, NULL, CRM_EX_PROTOCOL); return 0; } else { char *log_msg = NULL; const char *reason = NULL; xmlNode *reply = NULL; pcmk__request_t request = { .ipc_client = c, .ipc_id = id, .ipc_flags = flags, .peer = NULL, .xml = msg, .call_options = 0, .result = PCMK__UNKNOWN_RESULT, }; request.op = crm_element_value_copy(request.xml, PCMK__XA_CRM_TASK); CRM_CHECK(request.op != NULL, return 0); reply = pcmk__process_request(&request, pcmkd_handlers); if (reply != NULL) { pcmk__ipc_send_xml(c, id, reply, crm_ipc_server_event); pcmk__xml_free(reply); } reason = request.result.exit_reason; log_msg = crm_strdup_printf("Processed %s request from %s %s: %s%s%s%s", request.op, pcmk__request_origin_type(&request), pcmk__request_origin(&request), pcmk_exec_status_str(request.result.execution_status), (reason == NULL)? "" : " (", (reason == NULL)? "" : reason, (reason == NULL)? "" : ")"); if (!pcmk__result_ok(&request.result)) { crm_warn("%s", log_msg); } else { crm_debug("%s", log_msg); } free(log_msg); pcmk__reset_request(&request); } pcmk__xml_free(msg); return 0; } struct qb_ipcs_service_handlers pacemakerd_ipc_callbacks = { .connection_accept = pcmk_ipc_accept, .connection_created = NULL, .msg_process = pcmk_ipc_dispatch, .connection_closed = pcmk_ipc_closed, .connection_destroyed = pcmk_ipc_destroy }; diff --git a/daemons/schedulerd/schedulerd_messages.c b/daemons/schedulerd/schedulerd_messages.c index fd17e067fe..1d83d7d9ac 100644 --- a/daemons/schedulerd/schedulerd_messages.c +++ b/daemons/schedulerd/schedulerd_messages.c @@ -1,362 +1,362 @@ /* * Copyright 2004-2025 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU General Public License version 2 * or later (GPLv2+) WITHOUT ANY WARRANTY. */ #include #include #include #include #include #include #include #include #include "pacemaker-schedulerd.h" static GHashTable *schedulerd_handlers = NULL; static pcmk_scheduler_t * init_scheduler(void) { pcmk_scheduler_t *scheduler = pcmk_new_scheduler(); pcmk__mem_assert(scheduler); scheduler->priv->out = logger_out; return scheduler; } static xmlNode * handle_pecalc_request(pcmk__request_t *request) { static struct series_s { const char *name; const char *param; /* Maximum number of inputs of this kind to save to disk. * If -1, save all; if 0, save none. */ int wrap; } series[] = { { "pe-error", PCMK_OPT_PE_ERROR_SERIES_MAX, -1 }, { "pe-warn", PCMK_OPT_PE_WARN_SERIES_MAX, 5000 }, { "pe-input", PCMK_OPT_PE_INPUT_SERIES_MAX, 4000 }, }; xmlNode *msg = request->xml; xmlNode *wrapper = pcmk__xe_first_child(msg, PCMK__XE_CRM_XML, NULL, NULL); xmlNode *xml_data = pcmk__xe_first_child(wrapper, NULL, NULL, NULL); static char *last_digest = NULL; static char *filename = NULL; unsigned int seq = 0U; int series_id = 0; int series_wrap = 0; char *digest = NULL; const char *value = NULL; time_t execution_date = time(NULL); xmlNode *converted = NULL; xmlNode *reply = NULL; bool is_repoke = false; bool process = true; pcmk_scheduler_t *scheduler = init_scheduler(); pcmk__ipc_send_ack(request->ipc_client, request->ipc_id, request->ipc_flags, PCMK__XE_ACK, NULL, CRM_EX_INDETERMINATE); digest = pcmk__digest_xml(xml_data, false); converted = pcmk__xml_copy(NULL, xml_data); if (pcmk__update_configured_schema(&converted, true) != pcmk_rc_ok) { scheduler->priv->graph = pcmk__xe_create(NULL, PCMK__XE_TRANSITION_GRAPH); crm_xml_add_int(scheduler->priv->graph, "transition_id", 0); crm_xml_add_int(scheduler->priv->graph, PCMK_OPT_CLUSTER_DELAY, 0); process = false; free(digest); } else if (pcmk__str_eq(digest, last_digest, pcmk__str_casei)) { is_repoke = true; free(digest); } else { free(last_digest); last_digest = digest; } if (process) { scheduler->input = converted; pcmk__set_scheduler_flags(scheduler, pcmk__sched_no_counts |pcmk__sched_show_utilization); pcmk__schedule_actions(scheduler); // Don't free converted as part of scheduler scheduler->input = NULL; } // Get appropriate index into series[] array if (pcmk_is_set(scheduler->flags, pcmk__sched_processing_error) || pcmk__config_has_error) { series_id = 0; } else if (pcmk_is_set(scheduler->flags, pcmk__sched_processing_warning) || pcmk__config_has_warning) { series_id = 1; } else { series_id = 2; } value = pcmk__cluster_option(scheduler->priv->options, series[series_id].param); if ((value == NULL) || (pcmk__scan_min_int(value, &series_wrap, -1) != pcmk_rc_ok)) { series_wrap = series[series_id].wrap; } if (pcmk__read_series_sequence(PCMK_SCHEDULER_INPUT_DIR, series[series_id].name, &seq) != pcmk_rc_ok) { // @TODO maybe handle errors better ... seq = 0U; } crm_trace("Series %s: wrap=%d, seq=%u, pref=%s", series[series_id].name, series_wrap, seq, value); reply = pcmk__new_reply(msg, scheduler->priv->graph); if (reply == NULL) { pcmk__format_result(&request->result, CRM_EX_ERROR, PCMK_EXEC_ERROR, "Failed building ping reply for client %s", pcmk__client_name(request->ipc_client)); goto done; } if (series_wrap == 0) { // Don't save any inputs of this kind free(filename); filename = NULL; } else if (!is_repoke) { // Input changed, save to disk free(filename); filename = pcmk__series_filename(PCMK_SCHEDULER_INPUT_DIR, series[series_id].name, seq, true); } crm_xml_add(reply, PCMK__XA_CRM_TGRAPH_IN, filename); pcmk__log_transition_summary(scheduler, filename); if (series_wrap == 0) { crm_debug("Not saving input to disk (disabled by configuration)"); } else if (is_repoke) { crm_info("Input has not changed since last time, not saving to disk"); } else { unlink(filename); crm_xml_add_ll(xml_data, PCMK_XA_EXECUTION_DATE, (long long) execution_date); pcmk__xml_write_file(xml_data, filename, true); pcmk__write_series_sequence(PCMK_SCHEDULER_INPUT_DIR, series[series_id].name, ++seq, series_wrap); } pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL); done: pcmk__xml_free(converted); pcmk_free_scheduler(scheduler); return reply; } static xmlNode * handle_unknown_request(pcmk__request_t *request) { pcmk__ipc_send_ack(request->ipc_client, request->ipc_id, request->ipc_flags, - PCMK__XE_ACK, NULL, CRM_EX_PROTOCOL); + PCMK__XE_NACK, NULL, CRM_EX_PROTOCOL); pcmk__format_result(&request->result, CRM_EX_PROTOCOL, PCMK_EXEC_INVALID, "Unknown IPC request type '%s' (bug?)", pcmk__s(request->op, "")); return NULL; } static xmlNode * handle_hello_request(pcmk__request_t *request) { pcmk__ipc_send_ack(request->ipc_client, request->ipc_id, request->ipc_flags, PCMK__XE_ACK, NULL, CRM_EX_INDETERMINATE); crm_trace("Received IPC hello from %s", pcmk__client_name(request->ipc_client)); pcmk__set_result(&request->result, CRM_EX_OK, PCMK_EXEC_DONE, NULL); return NULL; } static void schedulerd_register_handlers(void) { pcmk__server_command_t handlers[] = { { CRM_OP_HELLO, handle_hello_request }, { CRM_OP_PECALC, handle_pecalc_request }, { NULL, handle_unknown_request }, }; schedulerd_handlers = pcmk__register_handlers(handlers); } static int32_t pe_ipc_accept(qb_ipcs_connection_t * c, uid_t uid, gid_t gid) { crm_trace("Connection %p", c); if (pcmk__new_client(c, uid, gid) == NULL) { return -ENOMEM; } return 0; } static int32_t pe_ipc_dispatch(qb_ipcs_connection_t * qbc, void *data, size_t size) { int rc = pcmk_rc_ok; uint32_t id = 0; uint32_t flags = 0; xmlNode *msg = NULL; pcmk__client_t *c = pcmk__find_client(qbc); const char *sys_to = NULL; CRM_CHECK(c != NULL, return 0); if (schedulerd_handlers == NULL) { schedulerd_register_handlers(); } rc = pcmk__ipc_msg_append(&c->buffer, data); if (rc == pcmk_rc_ipc_more) { /* We haven't read the complete message yet, so just return. */ return 0; } else if (rc == pcmk_rc_ok) { /* We've read the complete message and there's already a header on * the front. Pass it off for processing. */ msg = pcmk__client_data2xml(c, &id, &flags); g_byte_array_free(c->buffer, TRUE); c->buffer = NULL; } else { /* Some sort of error occurred reassembling the message. All we can * do is clean up, log an error and return. */ crm_err("Error when reading IPC message: %s", pcmk_rc_str(rc)); if (c->buffer != NULL) { g_byte_array_free(c->buffer, TRUE); c->buffer = NULL; } return 0; } if (msg == NULL) { - pcmk__ipc_send_ack(c, id, flags, PCMK__XE_ACK, NULL, CRM_EX_PROTOCOL); + pcmk__ipc_send_ack(c, id, flags, PCMK__XE_NACK, NULL, CRM_EX_PROTOCOL); return 0; } sys_to = crm_element_value(msg, PCMK__XA_CRM_SYS_TO); if (pcmk__str_eq(crm_element_value(msg, PCMK__XA_SUBT), PCMK__VALUE_RESPONSE, pcmk__str_none)) { pcmk__ipc_send_ack(c, id, flags, PCMK__XE_ACK, NULL, CRM_EX_INDETERMINATE); crm_info("Ignoring IPC reply from %s", pcmk__client_name(c)); } else if (!pcmk__str_eq(sys_to, CRM_SYSTEM_PENGINE, pcmk__str_none)) { pcmk__ipc_send_ack(c, id, flags, PCMK__XE_ACK, NULL, CRM_EX_INDETERMINATE); crm_info("Ignoring invalid IPC message: to '%s' not " CRM_SYSTEM_PENGINE, pcmk__s(sys_to, "")); } else { char *log_msg = NULL; const char *reason = NULL; xmlNode *reply = NULL; pcmk__request_t request = { .ipc_client = c, .ipc_id = id, .ipc_flags = flags, .peer = NULL, .xml = msg, .call_options = 0, .result = PCMK__UNKNOWN_RESULT, }; request.op = crm_element_value_copy(request.xml, PCMK__XA_CRM_TASK); CRM_CHECK(request.op != NULL, return 0); reply = pcmk__process_request(&request, schedulerd_handlers); if (reply != NULL) { pcmk__ipc_send_xml(c, id, reply, crm_ipc_server_event); pcmk__xml_free(reply); } reason = request.result.exit_reason; log_msg = crm_strdup_printf("Processed %s request from %s %s: %s%s%s%s", request.op, pcmk__request_origin_type(&request), pcmk__request_origin(&request), pcmk_exec_status_str(request.result.execution_status), (reason == NULL)? "" : " (", (reason == NULL)? "" : reason, (reason == NULL)? "" : ")"); if (!pcmk__result_ok(&request.result)) { crm_warn("%s", log_msg); } else { crm_debug("%s", log_msg); } free(log_msg); pcmk__reset_request(&request); } pcmk__xml_free(msg); return 0; } /* Error code means? */ static int32_t pe_ipc_closed(qb_ipcs_connection_t * c) { pcmk__client_t *client = pcmk__find_client(c); if (client == NULL) { return 0; } crm_trace("Connection %p", c); pcmk__free_client(client); return 0; } static void pe_ipc_destroy(qb_ipcs_connection_t * c) { crm_trace("Connection %p", c); pe_ipc_closed(c); } struct qb_ipcs_service_handlers ipc_callbacks = { .connection_accept = pe_ipc_accept, .connection_created = NULL, .msg_process = pe_ipc_dispatch, .connection_closed = pe_ipc_closed, .connection_destroyed = pe_ipc_destroy };