diff --git a/lib/common/ipc_client.c b/lib/common/ipc_client.c index cb73efdeba..30cdcaa3ca 100644 --- a/lib/common/ipc_client.c +++ b/lib/common/ipc_client.c @@ -1,1679 +1,1626 @@ /* * Copyright 2004-2025 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #include #if defined(HAVE_UCRED) || defined(HAVE_SOCKPEERCRED) #include #elif defined(HAVE_GETPEERUCRED) #include #endif #include #include #include #include #include /* indirectly: pcmk_err_generic */ #include #include #include #include "crmcommon_private.h" static int is_ipc_provider_expected(qb_ipcc_connection_t *qb_ipc, int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid); /*! * \brief Create a new object for using Pacemaker daemon IPC * * \param[out] api Where to store new IPC object * \param[in] server Which Pacemaker daemon the object is for * * \return Standard Pacemaker result code * * \note The caller is responsible for freeing *api using pcmk_free_ipc_api(). * \note This is intended to supersede crm_ipc_new() but currently only * supports the controller, pacemakerd, and schedulerd IPC API. */ int pcmk_new_ipc_api(pcmk_ipc_api_t **api, enum pcmk_ipc_server server) { if (api == NULL) { return EINVAL; } *api = calloc(1, sizeof(pcmk_ipc_api_t)); if (*api == NULL) { return errno; } (*api)->server = server; if (pcmk_ipc_name(*api, false) == NULL) { pcmk_free_ipc_api(*api); *api = NULL; return EOPNOTSUPP; } // Set server methods switch (server) { case pcmk_ipc_attrd: (*api)->cmds = pcmk__attrd_api_methods(); break; case pcmk_ipc_based: break; case pcmk_ipc_controld: (*api)->cmds = pcmk__controld_api_methods(); break; case pcmk_ipc_execd: break; case pcmk_ipc_fenced: break; case pcmk_ipc_pacemakerd: (*api)->cmds = pcmk__pacemakerd_api_methods(); break; case pcmk_ipc_schedulerd: (*api)->cmds = pcmk__schedulerd_api_methods(); break; default: // pcmk_ipc_unknown pcmk_free_ipc_api(*api); *api = NULL; return EINVAL; } if ((*api)->cmds == NULL) { pcmk_free_ipc_api(*api); *api = NULL; return ENOMEM; } (*api)->ipc = crm_ipc_new(pcmk_ipc_name(*api, false), 0); if ((*api)->ipc == NULL) { pcmk_free_ipc_api(*api); *api = NULL; return ENOMEM; } // If daemon API has its own data to track, allocate it if ((*api)->cmds->new_data != NULL) { if ((*api)->cmds->new_data(*api) != pcmk_rc_ok) { pcmk_free_ipc_api(*api); *api = NULL; return ENOMEM; } } crm_trace("Created %s API IPC object", pcmk_ipc_name(*api, true)); return pcmk_rc_ok; } static void free_daemon_specific_data(pcmk_ipc_api_t *api) { if ((api != NULL) && (api->cmds != NULL)) { if ((api->cmds->free_data != NULL) && (api->api_data != NULL)) { api->cmds->free_data(api->api_data); api->api_data = NULL; } free(api->cmds); api->cmds = NULL; } } /*! * \internal * \brief Call an IPC API event callback, if one is registed * * \param[in,out] api IPC API connection * \param[in] event_type The type of event that occurred * \param[in] status Event status * \param[in,out] event_data Event-specific data */ void pcmk__call_ipc_callback(pcmk_ipc_api_t *api, enum pcmk_ipc_event event_type, crm_exit_t status, void *event_data) { if ((api != NULL) && (api->cb != NULL)) { api->cb(api, event_type, status, event_data, api->user_data); } } /*! * \internal * \brief Clean up after an IPC disconnect * * \param[in,out] user_data IPC API connection that disconnected * * \note This function can be used as a main loop IPC destroy callback. */ static void ipc_post_disconnect(gpointer user_data) { pcmk_ipc_api_t *api = user_data; crm_info("Disconnected from %s", pcmk_ipc_name(api, true)); // Perform any daemon-specific handling needed if ((api->cmds != NULL) && (api->cmds->post_disconnect != NULL)) { api->cmds->post_disconnect(api); } // Call client's registered event callback pcmk__call_ipc_callback(api, pcmk_ipc_event_disconnect, CRM_EX_DISCONNECT, NULL); /* If this is being called from a running main loop, mainloop_gio_destroy() * will free ipc and mainloop_io immediately after calling this function. * If this is called from a stopped main loop, these will leak, so the best * practice is to close the connection before stopping the main loop. */ api->ipc = NULL; api->mainloop_io = NULL; if (api->free_on_disconnect) { /* pcmk_free_ipc_api() has already been called, but did not free api * or api->cmds because this function needed them. Do that now. */ free_daemon_specific_data(api); crm_trace("Freeing IPC API object after disconnect"); free(api); } } /*! * \brief Free the contents of an IPC API object * * \param[in,out] api IPC API object to free */ void pcmk_free_ipc_api(pcmk_ipc_api_t *api) { bool free_on_disconnect = false; if (api == NULL) { return; } crm_debug("Releasing %s IPC API", pcmk_ipc_name(api, true)); if (api->ipc != NULL) { if (api->mainloop_io != NULL) { /* We need to keep the api pointer itself around, because it is the * user data for the IPC client destroy callback. That will be * triggered by the pcmk_disconnect_ipc() call below, but it might * happen later in the main loop (if still running). * * This flag tells the destroy callback to free the object. It can't * do that unconditionally, because the application might call this * function after a disconnect that happened by other means. */ free_on_disconnect = api->free_on_disconnect = true; } pcmk_disconnect_ipc(api); // Frees api if free_on_disconnect is true } if (!free_on_disconnect) { free_daemon_specific_data(api); crm_trace("Freeing IPC API object"); free(api); } } /*! * \brief Get the IPC name used with an IPC API connection * * \param[in] api IPC API connection * \param[in] for_log If true, return human-friendly name instead of IPC name * * \return IPC API's human-friendly or connection name, or if none is available, * "Pacemaker" if for_log is true and NULL if for_log is false */ const char * pcmk_ipc_name(const pcmk_ipc_api_t *api, bool for_log) { if (api == NULL) { return for_log? "Pacemaker" : NULL; } if (for_log) { const char *name = pcmk__server_log_name(api->server); return pcmk__s(name, "Pacemaker"); } switch (api->server) { // These servers do not have pcmk_ipc_api_t implementations yet case pcmk_ipc_based: case pcmk_ipc_execd: case pcmk_ipc_fenced: return NULL; default: return pcmk__server_ipc_name(api->server); } } /*! * \brief Check whether an IPC API connection is active * * \param[in,out] api IPC API connection * * \return true if IPC is connected, false otherwise */ bool pcmk_ipc_is_connected(pcmk_ipc_api_t *api) { return (api != NULL) && crm_ipc_connected(api->ipc); } /*! * \internal * \brief Call the daemon-specific API's dispatch function * * Perform daemon-specific handling of IPC reply dispatch. It is the daemon * method's responsibility to call the client's registered event callback, as * well as allocate and free any event data. * * \param[in,out] api IPC API connection * \param[in,out] message IPC reply XML to dispatch */ static bool call_api_dispatch(pcmk_ipc_api_t *api, xmlNode *message) { crm_log_xml_trace(message, "ipc-received"); if ((api->cmds != NULL) && (api->cmds->dispatch != NULL)) { return api->cmds->dispatch(api, message); } return false; } /*! * \internal * \brief Dispatch previously read IPC data * * \param[in] buffer Data read from IPC * \param[in,out] api IPC object * * \return Standard Pacemaker return code. In particular: * * pcmk_rc_ok: There are no more messages expected from the server. Quit * reading. * EINPROGRESS: There are more messages expected from the server. Keep reading. * * All other values indicate an error. */ static int dispatch_ipc_data(const char *buffer, pcmk_ipc_api_t *api) { bool more = false; xmlNode *msg; if (buffer == NULL) { crm_warn("Empty message received from %s IPC", pcmk_ipc_name(api, true)); return ENOMSG; } msg = pcmk__xml_parse(buffer); if (msg == NULL) { crm_warn("Malformed message received from %s IPC", pcmk_ipc_name(api, true)); return EPROTO; } more = call_api_dispatch(api, msg); pcmk__xml_free(msg); if (more) { return EINPROGRESS; } else { return pcmk_rc_ok; } } /*! * \internal * \brief Dispatch data read from IPC source * * \param[in] buffer Data read from IPC * \param[in] length Number of bytes of data in buffer (ignored) * \param[in,out] user_data IPC object * * \return Always 0 (meaning connection is still required) * * \note This function can be used as a main loop IPC dispatch callback. */ static int dispatch_ipc_source_data(const char *buffer, ssize_t length, gpointer user_data) { pcmk_ipc_api_t *api = user_data; CRM_CHECK(api != NULL, return 0); dispatch_ipc_data(buffer, api); return 0; } /*! * \brief Check whether an IPC connection has data available (without main loop) * * \param[in] api IPC API connection * \param[in] timeout_ms If less than 0, poll indefinitely; if 0, poll once * and return immediately; otherwise, poll for up to * this many milliseconds * * \return Standard Pacemaker return code * * \note Callers of pcmk_connect_ipc() using pcmk_ipc_dispatch_poll should call * this function to check whether IPC data is available. Return values of * interest include pcmk_rc_ok meaning data is available, and EAGAIN * meaning no data is available; all other values indicate errors. * \todo This does not allow the caller to poll multiple file descriptors at * once. If there is demand for that, we could add a wrapper for * pcmk__ipc_fd(api->ipc), so the caller can call poll() themselves. */ int pcmk_poll_ipc(const pcmk_ipc_api_t *api, int timeout_ms) { int rc; struct pollfd pollfd = { 0, }; if ((api == NULL) || (api->dispatch_type != pcmk_ipc_dispatch_poll)) { return EINVAL; } rc = pcmk__ipc_fd(api->ipc, &(pollfd.fd)); if (rc != pcmk_rc_ok) { crm_debug("Could not obtain file descriptor for %s IPC: %s", pcmk_ipc_name(api, true), pcmk_rc_str(rc)); return rc; } pollfd.events = POLLIN; rc = poll(&pollfd, 1, timeout_ms); if (rc < 0) { /* Some UNIX systems return negative and set EAGAIN for failure to * allocate memory; standardize the return code in that case */ return (errno == EAGAIN)? ENOMEM : errno; } else if (rc == 0) { return EAGAIN; } return pcmk_rc_ok; } /*! * \brief Dispatch available messages on an IPC connection (without main loop) * * \param[in,out] api IPC API connection * * \return Standard Pacemaker return code * * \note Callers of pcmk_connect_ipc() using pcmk_ipc_dispatch_poll should call * this function when IPC data is available. */ void pcmk_dispatch_ipc(pcmk_ipc_api_t *api) { if (api == NULL) { return; } while (crm_ipc_ready(api->ipc) > 0) { if (crm_ipc_read(api->ipc) > 0) { dispatch_ipc_data(crm_ipc_buffer(api->ipc), api); } } } // \return Standard Pacemaker return code static int connect_with_main_loop(pcmk_ipc_api_t *api) { int rc; struct ipc_client_callbacks callbacks = { .dispatch = dispatch_ipc_source_data, .destroy = ipc_post_disconnect, }; rc = pcmk__add_mainloop_ipc(api->ipc, G_PRIORITY_DEFAULT, api, &callbacks, &(api->mainloop_io)); if (rc != pcmk_rc_ok) { return rc; } crm_debug("Connected to %s IPC (attached to main loop)", pcmk_ipc_name(api, true)); /* After this point, api->mainloop_io owns api->ipc, so api->ipc * should not be explicitly freed. */ return pcmk_rc_ok; } // \return Standard Pacemaker return code static int connect_without_main_loop(pcmk_ipc_api_t *api) { int rc = pcmk__connect_generic_ipc(api->ipc); if (rc != pcmk_rc_ok) { crm_ipc_close(api->ipc); } else { crm_debug("Connected to %s IPC (without main loop)", pcmk_ipc_name(api, true)); } return rc; } /*! * \internal * \brief Connect to a Pacemaker daemon via IPC (retrying after soft errors) * * \param[in,out] api IPC API instance * \param[in] dispatch_type How IPC replies should be dispatched * \param[in] attempts How many times to try (in case of soft error) * * \return Standard Pacemaker return code */ int pcmk__connect_ipc(pcmk_ipc_api_t *api, enum pcmk_ipc_dispatch dispatch_type, int attempts) { int rc = pcmk_rc_ok; if ((api == NULL) || (attempts < 1)) { return EINVAL; } if (api->ipc == NULL) { api->ipc = crm_ipc_new(pcmk_ipc_name(api, false), 0); if (api->ipc == NULL) { return ENOMEM; } } if (crm_ipc_connected(api->ipc)) { crm_trace("Already connected to %s", pcmk_ipc_name(api, true)); return pcmk_rc_ok; } api->dispatch_type = dispatch_type; crm_debug("Attempting connection to %s (up to %d time%s)", pcmk_ipc_name(api, true), attempts, pcmk__plural_s(attempts)); for (int remaining = attempts - 1; remaining >= 0; --remaining) { switch (dispatch_type) { case pcmk_ipc_dispatch_main: rc = connect_with_main_loop(api); break; case pcmk_ipc_dispatch_sync: case pcmk_ipc_dispatch_poll: rc = connect_without_main_loop(api); break; } if ((remaining == 0) || ((rc != EAGAIN) && (rc != EALREADY))) { break; // Result is final } // Retry after soft error (interrupted by signal, etc.) pcmk__sleep_ms((attempts - remaining) * 500); crm_debug("Re-attempting connection to %s (%d attempt%s remaining)", pcmk_ipc_name(api, true), remaining, pcmk__plural_s(remaining)); } if (rc != pcmk_rc_ok) { return rc; } if ((api->cmds != NULL) && (api->cmds->post_connect != NULL)) { rc = api->cmds->post_connect(api); if (rc != pcmk_rc_ok) { crm_ipc_close(api->ipc); } } return rc; } /*! * \brief Connect to a Pacemaker daemon via IPC * * \param[in,out] api IPC API instance * \param[in] dispatch_type How IPC replies should be dispatched * * \return Standard Pacemaker return code */ int pcmk_connect_ipc(pcmk_ipc_api_t *api, enum pcmk_ipc_dispatch dispatch_type) { int rc = pcmk__connect_ipc(api, dispatch_type, 2); if (rc != pcmk_rc_ok) { crm_err("Connection to %s failed: %s", pcmk_ipc_name(api, true), pcmk_rc_str(rc)); } return rc; } /*! * \brief Disconnect an IPC API instance * * \param[in,out] api IPC API connection * * \return Standard Pacemaker return code * * \note If the connection is attached to a main loop, this function should be * called before quitting the main loop, to ensure that all memory is * freed. */ void pcmk_disconnect_ipc(pcmk_ipc_api_t *api) { if ((api == NULL) || (api->ipc == NULL)) { return; } switch (api->dispatch_type) { case pcmk_ipc_dispatch_main: { mainloop_io_t *mainloop_io = api->mainloop_io; // Make sure no code with access to api can use these again api->mainloop_io = NULL; api->ipc = NULL; mainloop_del_ipc_client(mainloop_io); // After this point api might have already been freed } break; case pcmk_ipc_dispatch_poll: case pcmk_ipc_dispatch_sync: { crm_ipc_t *ipc = api->ipc; // Make sure no code with access to api can use ipc again api->ipc = NULL; // This should always be the case already, but to be safe api->free_on_disconnect = false; crm_ipc_close(ipc); crm_ipc_destroy(ipc); ipc_post_disconnect(api); } break; } } /*! * \brief Register a callback for IPC API events * * \param[in,out] api IPC API connection * \param[in] callback Callback to register * \param[in] userdata Caller data to pass to callback * * \note This function may be called multiple times to update the callback * and/or user data. The caller remains responsible for freeing * userdata in any case (after the IPC is disconnected, if the * user data is still registered with the IPC). */ void pcmk_register_ipc_callback(pcmk_ipc_api_t *api, pcmk_ipc_callback_t cb, void *user_data) { if (api == NULL) { return; } api->cb = cb; api->user_data = user_data; } /*! * \internal * \brief Send an XML request across an IPC API connection * * \param[in,out] api IPC API connection * \param[in] request XML request to send * * \return Standard Pacemaker return code * * \note Daemon-specific IPC API functions should call this function to send * requests, because it handles different dispatch types appropriately. */ int pcmk__send_ipc_request(pcmk_ipc_api_t *api, const xmlNode *request) { int rc; xmlNode *reply = NULL; enum crm_ipc_flags flags = crm_ipc_flags_none; if ((api == NULL) || (api->ipc == NULL) || (request == NULL)) { return EINVAL; } crm_log_xml_trace(request, "ipc-sent"); // Synchronous dispatch requires waiting for a reply if ((api->dispatch_type == pcmk_ipc_dispatch_sync) && (api->cmds != NULL) && (api->cmds->reply_expected != NULL) && (api->cmds->reply_expected(api, request))) { flags = crm_ipc_client_response; } /* The 0 here means a default timeout of 5 seconds * * @TODO Maybe add a timeout_ms member to pcmk_ipc_api_t and a * pcmk_set_ipc_timeout() setter for it, then use it here. */ rc = crm_ipc_send(api->ipc, request, flags, 0, &reply); if (rc < 0) { return pcmk_legacy2rc(rc); } else if (rc == 0) { return ENODATA; } // With synchronous dispatch, we dispatch any reply now if (reply != NULL) { bool more = call_api_dispatch(api, reply); pcmk__xml_free(reply); while (more) { rc = crm_ipc_read(api->ipc); if (rc == -EAGAIN) { continue; } else if (rc == -ENOMSG || rc == pcmk_ok) { return pcmk_rc_ok; } else if (rc < 0) { return -rc; } rc = dispatch_ipc_data(crm_ipc_buffer(api->ipc), api); if (rc == pcmk_rc_ok) { more = false; } else if (rc == EINPROGRESS) { more = true; } else { continue; } } } return pcmk_rc_ok; } /*! * \internal * \brief Create the XML for an IPC request to purge a node from the peer cache * * \param[in] api IPC API connection * \param[in] node_name If not NULL, name of node to purge * \param[in] nodeid If not 0, node ID of node to purge * * \return Newly allocated IPC request XML * * \note The controller, fencer, and pacemakerd use the same request syntax, but * the attribute manager uses a different one. The CIB manager doesn't * have any syntax for it. The executor and scheduler don't connect to the * cluster layer and thus don't have or need any syntax for it. * * \todo Modify the attribute manager to accept the common syntax (as well * as its current one, for compatibility with older clients). Modify * the CIB manager to accept and honor the common syntax. Modify the * executor and scheduler to accept the syntax (immediately returning * success), just for consistency. Modify this function to use the * common syntax with all daemons if their version supports it. */ static xmlNode * create_purge_node_request(const pcmk_ipc_api_t *api, const char *node_name, uint32_t nodeid) { xmlNode *request = NULL; const char *client = crm_system_name? crm_system_name : "client"; switch (api->server) { case pcmk_ipc_attrd: request = pcmk__xe_create(NULL, __func__); crm_xml_add(request, PCMK__XA_T, PCMK__VALUE_ATTRD); crm_xml_add(request, PCMK__XA_SRC, crm_system_name); crm_xml_add(request, PCMK_XA_TASK, PCMK__ATTRD_CMD_PEER_REMOVE); pcmk__xe_set_bool_attr(request, PCMK__XA_REAP, true); crm_xml_add(request, PCMK__XA_ATTR_HOST, node_name); if (nodeid > 0) { crm_xml_add_int(request, PCMK__XA_ATTR_HOST_ID, nodeid); } break; case pcmk_ipc_controld: case pcmk_ipc_fenced: case pcmk_ipc_pacemakerd: request = pcmk__new_request(api->server, client, NULL, pcmk_ipc_name(api, false), CRM_OP_RM_NODE_CACHE, NULL); if (nodeid > 0) { crm_xml_add_ll(request, PCMK_XA_ID, (long long) nodeid); } crm_xml_add(request, PCMK_XA_UNAME, node_name); break; case pcmk_ipc_based: case pcmk_ipc_execd: case pcmk_ipc_schedulerd: break; default: // pcmk_ipc_unknown (shouldn't be possible) return NULL; } return request; } /*! * \brief Ask a Pacemaker daemon to purge a node from its peer cache * * \param[in,out] api IPC API connection * \param[in] node_name If not NULL, name of node to purge * \param[in] nodeid If not 0, node ID of node to purge * * \return Standard Pacemaker return code * * \note At least one of node_name or nodeid must be specified. */ int pcmk_ipc_purge_node(pcmk_ipc_api_t *api, const char *node_name, uint32_t nodeid) { int rc = 0; xmlNode *request = NULL; if (api == NULL) { return EINVAL; } if ((node_name == NULL) && (nodeid == 0)) { return EINVAL; } request = create_purge_node_request(api, node_name, nodeid); if (request == NULL) { return EOPNOTSUPP; } rc = pcmk__send_ipc_request(api, request); pcmk__xml_free(request); crm_debug("%s peer cache purge of node %s[%lu]: rc=%d", pcmk_ipc_name(api, true), node_name, (unsigned long) nodeid, rc); return rc; } /* * Generic IPC API (to eventually be deprecated as public API and made internal) */ struct crm_ipc_s { struct pollfd pfd; unsigned int max_buf_size; // maximum bytes we can send or receive over IPC unsigned int buf_size; // size of allocated buffer int msg_size; int need_reply; char *buffer; char *server_name; // server IPC name being connected to qb_ipcc_connection_t *ipc; }; /*! * \brief Create a new (legacy) object for using Pacemaker daemon IPC * * \param[in] name IPC system name to connect to * \param[in] max_size Use a maximum IPC buffer size of at least this size * * \return Newly allocated IPC object on success, NULL otherwise * * \note The caller is responsible for freeing the result using * crm_ipc_destroy(). * \note This should be considered deprecated for use with daemons supported by * pcmk_new_ipc_api(). * \note @COMPAT Since 3.0.1, \p max_size is ignored and the default given by * \c crm_ipc_default_buffer_size() will be used instead. */ crm_ipc_t * crm_ipc_new(const char *name, size_t max_size) { crm_ipc_t *client = NULL; client = calloc(1, sizeof(crm_ipc_t)); if (client == NULL) { crm_err("Could not create IPC connection: %s", strerror(errno)); return NULL; } client->server_name = strdup(name); if (client->server_name == NULL) { crm_err("Could not create %s IPC connection: %s", name, strerror(errno)); free(client); return NULL; } client->buf_size = crm_ipc_default_buffer_size(); client->buffer = malloc(client->buf_size); if (client->buffer == NULL) { crm_err("Could not create %s IPC connection: %s", name, strerror(errno)); free(client->server_name); free(client); return NULL; } /* Clients initiating connection pick the max buf size */ client->max_buf_size = client->buf_size; client->pfd.fd = -1; client->pfd.events = POLLIN; client->pfd.revents = 0; return client; } /*! * \internal * \brief Connect a generic (not daemon-specific) IPC object * * \param[in,out] ipc Generic IPC object to connect * * \return Standard Pacemaker return code */ int pcmk__connect_generic_ipc(crm_ipc_t *ipc) { uid_t cl_uid = 0; gid_t cl_gid = 0; pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0; int rc = pcmk_rc_ok; if (ipc == NULL) { return EINVAL; } ipc->need_reply = FALSE; ipc->ipc = qb_ipcc_connect(ipc->server_name, ipc->buf_size); if (ipc->ipc == NULL) { return errno; } rc = qb_ipcc_fd_get(ipc->ipc, &ipc->pfd.fd); if (rc < 0) { // -errno crm_ipc_close(ipc); return -rc; } rc = pcmk_daemon_user(&cl_uid, &cl_gid); rc = pcmk_legacy2rc(rc); if (rc != pcmk_rc_ok) { crm_ipc_close(ipc); return rc; } rc = is_ipc_provider_expected(ipc->ipc, ipc->pfd.fd, cl_uid, cl_gid, &found_pid, &found_uid, &found_gid); if (rc != pcmk_rc_ok) { if (rc == pcmk_rc_ipc_unauthorized) { crm_info("%s IPC provider authentication failed: process %lld has " "uid %lld (expected %lld) and gid %lld (expected %lld)", ipc->server_name, (long long) PCMK__SPECIAL_PID_AS_0(found_pid), (long long) found_uid, (long long) cl_uid, (long long) found_gid, (long long) cl_gid); } crm_ipc_close(ipc); return rc; } ipc->max_buf_size = qb_ipcc_get_buffer_size(ipc->ipc); if (ipc->max_buf_size > ipc->buf_size) { free(ipc->buffer); ipc->buffer = calloc(ipc->max_buf_size, sizeof(char)); if (ipc->buffer == NULL) { rc = errno; crm_ipc_close(ipc); return rc; } ipc->buf_size = ipc->max_buf_size; } return pcmk_rc_ok; } void crm_ipc_close(crm_ipc_t * client) { if (client) { if (client->ipc) { qb_ipcc_connection_t *ipc = client->ipc; client->ipc = NULL; qb_ipcc_disconnect(ipc); } } } void crm_ipc_destroy(crm_ipc_t * client) { if (client) { if (client->ipc && qb_ipcc_is_connected(client->ipc)) { crm_notice("Destroying active %s IPC connection", client->server_name); /* The next line is basically unsafe * * If this connection was attached to mainloop and mainloop is active, * the 'disconnected' callback will end up back here and we'll end * up free'ing the memory twice - something that can still happen * even without this if we destroy a connection and it closes before * we call exit */ /* crm_ipc_close(client); */ } else { crm_trace("Destroying inactive %s IPC connection", client->server_name); } free(client->buffer); free(client->server_name); free(client); } } /*! * \internal * \brief Get the file descriptor for a generic IPC object * * \param[in,out] ipc Generic IPC object to get file descriptor for * \param[out] fd Where to store file descriptor * * \return Standard Pacemaker return code */ int pcmk__ipc_fd(crm_ipc_t *ipc, int *fd) { if ((ipc == NULL) || (fd == NULL)) { return EINVAL; } if ((ipc->ipc == NULL) || (ipc->pfd.fd < 0)) { return ENOTCONN; } *fd = ipc->pfd.fd; return pcmk_rc_ok; } int crm_ipc_get_fd(crm_ipc_t * client) { int fd = -1; if (pcmk__ipc_fd(client, &fd) != pcmk_rc_ok) { crm_err("Could not obtain file descriptor for %s IPC", ((client == NULL)? "unspecified" : client->server_name)); errno = EINVAL; return -EINVAL; } return fd; } bool crm_ipc_connected(crm_ipc_t * client) { bool rc = FALSE; if (client == NULL) { crm_trace("No client"); return FALSE; } else if (client->ipc == NULL) { crm_trace("No connection"); return FALSE; } else if (client->pfd.fd < 0) { crm_trace("Bad descriptor"); return FALSE; } rc = qb_ipcc_is_connected(client->ipc); if (rc == FALSE) { client->pfd.fd = -EINVAL; } return rc; } /*! * \brief Check whether an IPC connection is ready to be read * * \param[in,out] client Connection to check * * \return Positive value if ready to be read, 0 if not ready, -errno on error */ int crm_ipc_ready(crm_ipc_t *client) { int rc; pcmk__assert(client != NULL); if (!crm_ipc_connected(client)) { return -ENOTCONN; } client->pfd.revents = 0; rc = poll(&(client->pfd), 1, 0); return (rc < 0)? -errno : rc; } -// \return Standard Pacemaker return code -static int -crm_ipc_decompress(crm_ipc_t * client) -{ - pcmk__ipc_header_t *header = (pcmk__ipc_header_t *)(void*)client->buffer; - - if (header->size_compressed) { - int rc = 0; - unsigned int size_u = 1 + header->size_uncompressed; - /* never let buf size fall below our max size required for ipc reads. */ - unsigned int new_buf_size = QB_MAX((sizeof(pcmk__ipc_header_t) + size_u), client->max_buf_size); - char *uncompressed = pcmk__assert_alloc(1, new_buf_size); - - crm_trace("Decompressing message data %u bytes into %u bytes", - header->size_compressed, size_u); - - rc = BZ2_bzBuffToBuffDecompress(uncompressed + sizeof(pcmk__ipc_header_t), &size_u, - client->buffer + sizeof(pcmk__ipc_header_t), header->size_compressed, 1, 0); - rc = pcmk__bzlib2rc(rc); - - if (rc != pcmk_rc_ok) { - crm_err("Decompression failed: %s " QB_XS " rc=%d", - pcmk_rc_str(rc), rc); - free(uncompressed); - return rc; - } - - pcmk__assert(size_u == header->size_uncompressed); - - memcpy(uncompressed, client->buffer, sizeof(pcmk__ipc_header_t)); /* Preserve the header */ - header = (pcmk__ipc_header_t *)(void*)uncompressed; - - free(client->buffer); - client->buf_size = new_buf_size; - client->buffer = uncompressed; - } - - pcmk__assert(client->buffer[sizeof(pcmk__ipc_header_t) - + header->size_uncompressed - 1] == 0); - return pcmk_rc_ok; -} - long crm_ipc_read(crm_ipc_t * client) { pcmk__ipc_header_t *header = NULL; pcmk__assert((client != NULL) && (client->ipc != NULL) && (client->buffer != NULL)); client->buffer[0] = 0; client->msg_size = qb_ipcc_event_recv(client->ipc, client->buffer, client->buf_size, 0); if (client->msg_size >= 0) { - int rc = crm_ipc_decompress(client); - - if (rc != pcmk_rc_ok) { - return pcmk_rc2legacy(rc); - } - header = (pcmk__ipc_header_t *)(void*)client->buffer; if (!pcmk__valid_ipc_header(header)) { return -EBADMSG; } crm_trace("Received %s IPC event %d size=%u rc=%d text='%.100s'", client->server_name, header->qb.id, header->qb.size, client->msg_size, client->buffer + sizeof(pcmk__ipc_header_t)); } else { crm_trace("No message received from %s IPC: %s", client->server_name, pcmk_strerror(client->msg_size)); if (client->msg_size == -EAGAIN) { return -EAGAIN; } } if (!crm_ipc_connected(client) || client->msg_size == -ENOTCONN) { crm_err("Connection to %s IPC failed", client->server_name); } if (header) { /* Data excluding the header */ return header->size_uncompressed; } return -ENOMSG; } const char * crm_ipc_buffer(crm_ipc_t * client) { pcmk__assert(client != NULL); return client->buffer + sizeof(pcmk__ipc_header_t); } uint32_t crm_ipc_buffer_flags(crm_ipc_t * client) { pcmk__ipc_header_t *header = NULL; pcmk__assert(client != NULL); if (client->buffer == NULL) { return 0; } header = (pcmk__ipc_header_t *)(void*)client->buffer; return header->flags; } const char * crm_ipc_name(crm_ipc_t * client) { pcmk__assert(client != NULL); return client->server_name; } // \return Standard Pacemaker return code static int internal_ipc_get_reply(crm_ipc_t *client, int request_id, int ms_timeout, ssize_t *bytes, xmlNode **reply) { pcmk__ipc_header_t *hdr = NULL; time_t timeout = 0; int32_t qb_timeout = -1; int rc = pcmk_rc_ok; if (ms_timeout > 0) { timeout = time(NULL) + 1 + pcmk__timeout_ms2s(ms_timeout); qb_timeout = 1000; } /* get the reply */ crm_trace("Expecting reply to %s IPC message %d", client->server_name, request_id); do { xmlNode *xml = NULL; *bytes = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, qb_timeout); if (*bytes <= 0) { if (!crm_ipc_connected(client)) { crm_err("%s IPC provider disconnected while waiting for message %d", client->server_name, request_id); break; } continue; } - rc = crm_ipc_decompress(client); - if (rc != pcmk_rc_ok) { - return rc; - } - hdr = (pcmk__ipc_header_t *)(void*) client->buffer; if (hdr->qb.id == request_id) { /* Got the reply we were expecting. */ break; } xml = pcmk__xml_parse(crm_ipc_buffer(client)); if (hdr->qb.id < request_id) { crm_err("Discarding old reply %d (need %d)", hdr->qb.id, request_id); crm_log_xml_notice(xml, "OldIpcReply"); } else if (hdr->qb.id > request_id) { crm_err("Discarding newer reply %d (need %d)", hdr->qb.id, request_id); crm_log_xml_notice(xml, "ImpossibleReply"); pcmk__assert(hdr->qb.id <= request_id); } } while (time(NULL) < timeout || (timeout == 0 && *bytes == -EAGAIN)); if (*bytes > 0) { crm_trace("Received %zd-byte reply %" PRId32 " to %s IPC %d: %.100s", *bytes, hdr->qb.id, client->server_name, request_id, crm_ipc_buffer(client)); if (reply != NULL) { *reply = pcmk__xml_parse(crm_ipc_buffer(client)); } } else if (*bytes < 0) { rc = (int) -*bytes; // System errno crm_trace("No reply to %s IPC %d: %s " QB_XS " rc=%d", client->server_name, request_id, pcmk_rc_str(rc), rc); } /* If bytes == 0, we'll return that to crm_ipc_send which will interpret * that as pcmk_rc_ok, log that the IPC request failed (since we did not * give it a valid reply), and return that 0 to its callers. It's up to * the callers to take appropriate action after that. */ return rc; } /*! * \brief Send an IPC XML message * * \param[in,out] client Connection to IPC server * \param[in] message XML message to send * \param[in] flags Bitmask of crm_ipc_flags * \param[in] ms_timeout Give up if not sent within this much time * (5 seconds if 0, or no timeout if negative) * \param[out] reply Reply from server (or NULL if none) * * \return Negative errno on error, otherwise size of reply received in bytes * if reply was needed, otherwise number of bytes sent */ int crm_ipc_send(crm_ipc_t *client, const xmlNode *message, enum crm_ipc_flags flags, int32_t ms_timeout, xmlNode **reply) { int rc = 0; time_t timeout = 0; ssize_t qb_rc = 0; ssize_t bytes = 0; struct iovec *iov; static uint32_t id = 0; pcmk__ipc_header_t *header; if (client == NULL) { crm_notice("Can't send IPC request without connection (bug?): %.100s", message); return -ENOTCONN; } else if (!crm_ipc_connected(client)) { /* Don't even bother */ crm_notice("Can't send %s IPC requests: Connection closed", client->server_name); return -ENOTCONN; } if (ms_timeout == 0) { ms_timeout = 5000; } if (client->need_reply) { qb_rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, ms_timeout); if (qb_rc < 0) { crm_warn("Sending %s IPC disabled until pending reply received", client->server_name); return -EALREADY; } else { crm_notice("Sending %s IPC re-enabled after pending reply received", client->server_name); client->need_reply = FALSE; } } id++; CRM_LOG_ASSERT(id != 0); /* Crude wrap-around detection */ rc = pcmk__ipc_prepare_iov(id, message, &iov, &bytes); if (rc != pcmk_rc_ok) { crm_warn("Couldn't prepare %s IPC request: %s " QB_XS " rc=%d", client->server_name, pcmk_rc_str(rc), rc); return pcmk_rc2legacy(rc); } header = iov[0].iov_base; pcmk__set_ipc_flags(header->flags, client->server_name, flags); if (pcmk_is_set(flags, crm_ipc_proxied)) { /* Don't look for a synchronous response */ pcmk__clear_ipc_flags(flags, "client", crm_ipc_client_response); } crm_trace("Sending %s IPC request %d of %u bytes using %dms timeout", client->server_name, header->qb.id, header->qb.size, ms_timeout); /* Send the IPC request, respecting any timeout we were passed */ if (ms_timeout > 0) { timeout = time(NULL) + 1 + pcmk__timeout_ms2s(ms_timeout); } do { qb_rc = qb_ipcc_sendv(client->ipc, iov, 2); } while ((qb_rc == -EAGAIN) && ((timeout == 0) || (time(NULL) < timeout))); rc = (int) qb_rc; // Negative of system errno, or bytes sent if (qb_rc <= 0) { goto send_cleanup; } /* If we should not wait for a response, bail now */ if (!pcmk_is_set(flags, crm_ipc_client_response)) { crm_trace("Not waiting for reply to %s IPC request %d", client->server_name, header->qb.id); goto send_cleanup; } rc = internal_ipc_get_reply(client, header->qb.id, ms_timeout, &bytes, reply); if (rc == pcmk_rc_ok) { rc = (int) bytes; // Size of reply received } else { /* rc is either a positive system errno or a negative standard Pacemaker * return code. If it's an errno, we need to convert it back to a * negative number for comparison and return at the end of this function. */ rc = pcmk_rc2legacy(rc); if (ms_timeout > 0) { /* We didn't get the reply in time, so disable future sends for now. * The only alternative would be to close the connection since we * don't know how to detect and discard out-of-sequence replies. * * @TODO Implement out-of-sequence detection */ client->need_reply = TRUE; } } send_cleanup: if (!crm_ipc_connected(client)) { crm_notice("Couldn't send %s IPC request %d: Connection closed " QB_XS " rc=%d", client->server_name, header->qb.id, rc); } else if (rc == -ETIMEDOUT) { crm_warn("%s IPC request %d failed: %s after %dms " QB_XS " rc=%d", client->server_name, header->qb.id, pcmk_strerror(rc), ms_timeout, rc); crm_write_blackbox(0, NULL); } else if (rc <= 0) { crm_warn("%s IPC request %d failed: %s " QB_XS " rc=%d", client->server_name, header->qb.id, ((rc == 0)? "No bytes sent" : pcmk_strerror(rc)), rc); } pcmk_free_ipc_event(iov); return rc; } /*! * \brief Ensure an IPC provider has expected user or group * * \param[in] qb_ipc libqb client connection if available * \param[in] sock Connected Unix socket for IPC * \param[in] refuid Expected user ID * \param[in] refgid Expected group ID * \param[out] gotpid If not NULL, where to store provider's actual process ID * (or 1 on platforms where ID is not available) * \param[out] gotuid If not NULL, where to store provider's actual user ID * \param[out] gotgid If not NULL, where to store provider's actual group ID * * \return Standard Pacemaker return code * \note An actual user ID of 0 (root) will always be considered authorized, * regardless of the expected values provided. The caller can use the * output arguments to be stricter than this function. */ static int is_ipc_provider_expected(qb_ipcc_connection_t *qb_ipc, int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid) { int rc = EOPNOTSUPP; pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0; #ifdef HAVE_QB_IPCC_AUTH_GET if (qb_ipc != NULL) { rc = qb_ipcc_auth_get(qb_ipc, &found_pid, &found_uid, &found_gid); rc = -rc; // libqb returns 0 or -errno if (rc == pcmk_rc_ok) { goto found; } } #endif #ifdef HAVE_UCRED { struct ucred ucred; socklen_t ucred_len = sizeof(ucred); if (getsockopt(sock, SOL_SOCKET, SO_PEERCRED, &ucred, &ucred_len) < 0) { rc = errno; } else if (ucred_len != sizeof(ucred)) { rc = EOPNOTSUPP; } else { found_pid = ucred.pid; found_uid = ucred.uid; found_gid = ucred.gid; goto found; } } #endif #ifdef HAVE_SOCKPEERCRED { struct sockpeercred sockpeercred; socklen_t sockpeercred_len = sizeof(sockpeercred); if (getsockopt(sock, SOL_SOCKET, SO_PEERCRED, &sockpeercred, &sockpeercred_len) < 0) { rc = errno; } else if (sockpeercred_len != sizeof(sockpeercred)) { rc = EOPNOTSUPP; } else { found_pid = sockpeercred.pid; found_uid = sockpeercred.uid; found_gid = sockpeercred.gid; goto found; } } #endif #ifdef HAVE_GETPEEREID // For example, FreeBSD if (getpeereid(sock, &found_uid, &found_gid) < 0) { rc = errno; } else { found_pid = PCMK__SPECIAL_PID; goto found; } #endif #ifdef HAVE_GETPEERUCRED { ucred_t *ucred = NULL; if (getpeerucred(sock, &ucred) < 0) { rc = errno; } else { found_pid = ucred_getpid(ucred); found_uid = ucred_geteuid(ucred); found_gid = ucred_getegid(ucred); ucred_free(ucred); goto found; } } #endif return rc; // If we get here, nothing succeeded found: if (gotpid != NULL) { *gotpid = found_pid; } if (gotuid != NULL) { *gotuid = found_uid; } if (gotgid != NULL) { *gotgid = found_gid; } if ((found_uid != 0) && (found_uid != refuid) && (found_gid != refgid)) { return pcmk_rc_ipc_unauthorized; } return pcmk_rc_ok; } int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid) { int ret = is_ipc_provider_expected(NULL, sock, refuid, refgid, gotpid, gotuid, gotgid); /* The old function had some very odd return codes*/ if (ret == 0) { return 1; } else if (ret == pcmk_rc_ipc_unauthorized) { return 0; } else { return pcmk_rc2legacy(ret); } } int pcmk__ipc_is_authentic_process_active(const char *name, uid_t refuid, gid_t refgid, pid_t *gotpid) { static char last_asked_name[PATH_MAX / 2] = ""; /* log spam prevention */ int fd; int rc = pcmk_rc_ipc_unresponsive; int auth_rc = 0; int32_t qb_rc; pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0; qb_ipcc_connection_t *c; #ifdef HAVE_QB_IPCC_CONNECT_ASYNC struct pollfd pollfd = { 0, }; int poll_rc; c = qb_ipcc_connect_async(name, 0, &(pollfd.fd)); #else c = qb_ipcc_connect(name, 0); #endif if (c == NULL) { crm_info("Could not connect to %s IPC: %s", name, strerror(errno)); rc = pcmk_rc_ipc_unresponsive; goto bail; } #ifdef HAVE_QB_IPCC_CONNECT_ASYNC pollfd.events = POLLIN; do { poll_rc = poll(&pollfd, 1, 2000); } while ((poll_rc == -1) && (errno == EINTR)); /* If poll() failed, given that disconnect function is not registered yet, * qb_ipcc_disconnect() won't clean up the socket. In any case, call * qb_ipcc_connect_continue() here so that it may fail and do the cleanup * for us. */ if (qb_ipcc_connect_continue(c) != 0) { crm_info("Could not connect to %s IPC: %s", name, (poll_rc == 0)?"timeout":strerror(errno)); rc = pcmk_rc_ipc_unresponsive; c = NULL; // qb_ipcc_connect_continue cleaned up for us goto bail; } #endif qb_rc = qb_ipcc_fd_get(c, &fd); if (qb_rc != 0) { rc = (int) -qb_rc; // System errno crm_err("Could not get fd from %s IPC: %s " QB_XS " rc=%d", name, pcmk_rc_str(rc), rc); goto bail; } auth_rc = is_ipc_provider_expected(c, fd, refuid, refgid, &found_pid, &found_uid, &found_gid); if (auth_rc == pcmk_rc_ipc_unauthorized) { crm_err("Daemon (IPC %s) effectively blocked with unauthorized" " process %lld (uid: %lld, gid: %lld)", name, (long long) PCMK__SPECIAL_PID_AS_0(found_pid), (long long) found_uid, (long long) found_gid); rc = pcmk_rc_ipc_unauthorized; goto bail; } if (auth_rc != pcmk_rc_ok) { rc = auth_rc; crm_err("Could not get peer credentials from %s IPC: %s " QB_XS " rc=%d", name, pcmk_rc_str(rc), rc); goto bail; } if (gotpid != NULL) { *gotpid = found_pid; } rc = pcmk_rc_ok; if ((found_uid != refuid || found_gid != refgid) && strncmp(last_asked_name, name, sizeof(last_asked_name))) { if ((found_uid == 0) && (refuid != 0)) { crm_warn("Daemon (IPC %s) runs as root, whereas the expected" " credentials are %lld:%lld, hazard of violating" " the least privilege principle", name, (long long) refuid, (long long) refgid); } else { crm_notice("Daemon (IPC %s) runs as %lld:%lld, whereas the" " expected credentials are %lld:%lld, which may" " mean a different set of privileges than expected", name, (long long) found_uid, (long long) found_gid, (long long) refuid, (long long) refgid); } memccpy(last_asked_name, name, '\0', sizeof(last_asked_name)); } bail: if (c != NULL) { qb_ipcc_disconnect(c); } return rc; } // Deprecated functions kept only for backward API compatibility // LCOV_EXCL_START #include bool crm_ipc_connect(crm_ipc_t *client) { int rc = pcmk__connect_generic_ipc(client); if (rc == pcmk_rc_ok) { return true; } if ((client != NULL) && (client->ipc == NULL)) { errno = (rc > 0)? rc : ENOTCONN; crm_debug("Could not establish %s IPC connection: %s (%d)", client->server_name, pcmk_rc_str(errno), errno); } else if (rc == pcmk_rc_ipc_unauthorized) { crm_err("%s IPC provider authentication failed", (client == NULL)? "Pacemaker" : client->server_name); errno = ECONNABORTED; } else { crm_err("Could not verify authenticity of %s IPC provider", (client == NULL)? "Pacemaker" : client->server_name); errno = ENOTCONN; } return false; } // LCOV_EXCL_STOP // End deprecated API diff --git a/lib/common/ipc_server.c b/lib/common/ipc_server.c index 90a1c26c30..20b2cd2170 100644 --- a/lib/common/ipc_server.c +++ b/lib/common/ipc_server.c @@ -1,975 +1,946 @@ /* * Copyright 2004-2025 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #include #include #include #include #include #include #include #include #include #include #include "crmcommon_private.h" /* Evict clients whose event queue grows this large (by default) */ #define PCMK_IPC_DEFAULT_QUEUE_MAX 500 static GHashTable *client_connections = NULL; /*! * \internal * \brief Count IPC clients * * \return Number of active IPC client connections */ guint pcmk__ipc_client_count(void) { return client_connections? g_hash_table_size(client_connections) : 0; } /*! * \internal * \brief Execute a function for each active IPC client connection * * \param[in] func Function to call * \param[in,out] user_data Pointer to pass to function * * \note The parameters are the same as for g_hash_table_foreach(). */ void pcmk__foreach_ipc_client(GHFunc func, gpointer user_data) { if ((func != NULL) && (client_connections != NULL)) { g_hash_table_foreach(client_connections, func, user_data); } } pcmk__client_t * pcmk__find_client(const qb_ipcs_connection_t *c) { if (client_connections) { return g_hash_table_lookup(client_connections, c); } crm_trace("No client found for %p", c); return NULL; } pcmk__client_t * pcmk__find_client_by_id(const char *id) { if ((client_connections != NULL) && (id != NULL)) { gpointer key; pcmk__client_t *client = NULL; GHashTableIter iter; g_hash_table_iter_init(&iter, client_connections); while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) { if (strcmp(client->id, id) == 0) { return client; } } } crm_trace("No client found with id='%s'", pcmk__s(id, "")); return NULL; } /*! * \internal * \brief Get a client identifier for use in log messages * * \param[in] c Client * * \return Client's name, client's ID, or a string literal, as available * \note This is intended to be used in format strings like "client %s". */ const char * pcmk__client_name(const pcmk__client_t *c) { if (c == NULL) { return "(unspecified)"; } else if (c->name != NULL) { return c->name; } else if (c->id != NULL) { return c->id; } else { return "(unidentified)"; } } void pcmk__client_cleanup(void) { if (client_connections != NULL) { int active = g_hash_table_size(client_connections); if (active > 0) { crm_warn("Exiting with %d active IPC client%s", active, pcmk__plural_s(active)); } g_hash_table_destroy(client_connections); client_connections = NULL; } } void pcmk__drop_all_clients(qb_ipcs_service_t *service) { qb_ipcs_connection_t *c = NULL; if (service == NULL) { return; } c = qb_ipcs_connection_first_get(service); while (c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(service, last); /* There really shouldn't be anyone connected at this point */ crm_notice("Disconnecting client %p, pid=%d...", last, pcmk__client_pid(last)); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); } } /*! * \internal * \brief Allocate a new pcmk__client_t object based on an IPC connection * * \param[in] c IPC connection (NULL to allocate generic client) * \param[in] key Connection table key (NULL to use sane default) * \param[in] uid_client UID corresponding to c (ignored if c is NULL) * * \return Pointer to new pcmk__client_t (guaranteed not to be \c NULL) */ static pcmk__client_t * client_from_connection(qb_ipcs_connection_t *c, void *key, uid_t uid_client) { pcmk__client_t *client = pcmk__assert_alloc(1, sizeof(pcmk__client_t)); if (c) { client->user = pcmk__uid2username(uid_client); if (client->user == NULL) { client->user = pcmk__str_copy("#unprivileged"); crm_err("Unable to enforce ACLs for user ID %d, assuming unprivileged", uid_client); } client->ipcs = c; pcmk__set_client_flags(client, pcmk__client_ipc); client->pid = pcmk__client_pid(c); if (key == NULL) { key = c; } } client->id = crm_generate_uuid(); if (key == NULL) { key = client->id; } if (client_connections == NULL) { crm_trace("Creating IPC client table"); client_connections = g_hash_table_new(g_direct_hash, g_direct_equal); } g_hash_table_insert(client_connections, key, client); return client; } /*! * \brief Allocate a new pcmk__client_t object and generate its ID * * \param[in] key What to use as connections hash table key (NULL to use ID) * * \return Pointer to new pcmk__client_t (asserts on failure) */ pcmk__client_t * pcmk__new_unauth_client(void *key) { return client_from_connection(NULL, key, 0); } pcmk__client_t * pcmk__new_client(qb_ipcs_connection_t *c, uid_t uid_client, gid_t gid_client) { gid_t uid_cluster = 0; gid_t gid_cluster = 0; pcmk__client_t *client = NULL; CRM_CHECK(c != NULL, return NULL); if (pcmk_daemon_user(&uid_cluster, &gid_cluster) < 0) { static bool need_log = TRUE; if (need_log) { crm_warn("Could not find user and group IDs for user %s", CRM_DAEMON_USER); need_log = FALSE; } } if (uid_client != 0) { crm_trace("Giving group %u access to new IPC connection", gid_cluster); /* Passing -1 to chown(2) means don't change */ qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); } /* TODO: Do our own auth checking, return NULL if unauthorized */ client = client_from_connection(c, NULL, uid_client); if ((uid_client == 0) || (uid_client == uid_cluster)) { /* Remember when a connection came from root or hacluster */ pcmk__set_client_flags(client, pcmk__client_privileged); } crm_debug("New IPC client %s for PID %u with uid %d and gid %d", client->id, client->pid, uid_client, gid_client); return client; } static struct iovec * pcmk__new_ipc_event(void) { return (struct iovec *) pcmk__assert_alloc(2, sizeof(struct iovec)); } /*! * \brief Free an I/O vector created by pcmk__ipc_prepare_iov() * * \param[in,out] event I/O vector to free */ void pcmk_free_ipc_event(struct iovec *event) { if (event != NULL) { free(event[0].iov_base); free(event[1].iov_base); free(event); } } static void free_event(gpointer data) { pcmk_free_ipc_event((struct iovec *) data); } static void add_event(pcmk__client_t *c, struct iovec *iov) { if (c->event_queue == NULL) { c->event_queue = g_queue_new(); } g_queue_push_tail(c->event_queue, iov); } void pcmk__free_client(pcmk__client_t *c) { if (c == NULL) { return; } if (client_connections) { if (c->ipcs) { crm_trace("Destroying %p/%p (%d remaining)", c, c->ipcs, g_hash_table_size(client_connections) - 1); g_hash_table_remove(client_connections, c->ipcs); } else { crm_trace("Destroying remote connection %p (%d remaining)", c, g_hash_table_size(client_connections) - 1); g_hash_table_remove(client_connections, c->id); } } if (c->event_timer) { g_source_remove(c->event_timer); } if (c->event_queue) { crm_debug("Destroying %d events", g_queue_get_length(c->event_queue)); g_queue_free_full(c->event_queue, free_event); } free(c->id); free(c->name); free(c->user); if (c->remote) { if (c->remote->auth_timeout) { g_source_remove(c->remote->auth_timeout); } if (c->remote->tls_session != NULL) { /* @TODO Reduce duplication at callers. Put here everything * necessary to tear down and free tls_session. */ gnutls_deinit(c->remote->tls_session); } free(c->remote->buffer); free(c->remote); } free(c); } /*! * \internal * \brief Raise IPC eviction threshold for a client, if allowed * * \param[in,out] client Client to modify * \param[in] qmax New threshold */ void pcmk__set_client_queue_max(pcmk__client_t *client, const char *qmax) { int rc = pcmk_rc_ok; long long qmax_ll = 0LL; unsigned int orig_value = 0U; CRM_CHECK(client != NULL, return); orig_value = client->queue_max; if (pcmk_is_set(client->flags, pcmk__client_privileged)) { rc = pcmk__scan_ll(qmax, &qmax_ll, 0LL); if (rc == pcmk_rc_ok) { if ((qmax_ll <= 0LL) || (qmax_ll > UINT_MAX)) { rc = ERANGE; } else { client->queue_max = (unsigned int) qmax_ll; } } } else { rc = EACCES; } if (rc != pcmk_rc_ok) { crm_info("Could not set IPC threshold for client %s[%u] to %s: %s", pcmk__client_name(client), client->pid, pcmk__s(qmax, "default"), pcmk_rc_str(rc)); } else if (client->queue_max != orig_value) { crm_debug("IPC threshold for client %s[%u] is now %u (was %u)", pcmk__client_name(client), client->pid, client->queue_max, orig_value); } } int pcmk__client_pid(qb_ipcs_connection_t *c) { struct qb_ipcs_connection_stats stats; stats.client_pid = 0; qb_ipcs_connection_stats_get(c, &stats, 0); return stats.client_pid; } /*! * \internal * \brief Retrieve message XML from data read from client IPC * * \param[in,out] c IPC client connection * \param[in] data Data read from client connection * \param[out] id Where to store message ID from libqb header * \param[out] flags Where to store flags from libqb header * * \return Message XML on success, NULL otherwise */ xmlNode * pcmk__client_data2xml(pcmk__client_t *c, void *data, uint32_t *id, uint32_t *flags) { xmlNode *xml = NULL; - char *uncompressed = NULL; char *text = ((char *)data) + sizeof(pcmk__ipc_header_t); pcmk__ipc_header_t *header = data; if (!pcmk__valid_ipc_header(header)) { return NULL; } if (id) { *id = ((struct qb_ipc_response_header *)data)->id; } if (flags) { *flags = header->flags; } if (pcmk_is_set(header->flags, crm_ipc_proxied)) { /* Mark this client as being the endpoint of a proxy connection. * Proxy connections responses are sent on the event channel, to avoid * blocking the controller serving as proxy. */ pcmk__set_client_flags(c, pcmk__client_proxied); } - if (header->size_compressed) { - int rc = 0; - unsigned int size_u = 1 + header->size_uncompressed; - uncompressed = pcmk__assert_alloc(1, size_u); - - crm_trace("Decompressing message data %u bytes into %u bytes", - header->size_compressed, size_u); - - rc = BZ2_bzBuffToBuffDecompress(uncompressed, &size_u, text, header->size_compressed, 1, 0); - text = uncompressed; - - rc = pcmk__bzlib2rc(rc); - - if (rc != pcmk_rc_ok) { - crm_err("Decompression failed: %s " QB_XS " rc=%d", - pcmk_rc_str(rc), rc); - free(uncompressed); - return NULL; - } - } - pcmk__assert(text[header->size_uncompressed - 1] == 0); xml = pcmk__xml_parse(text); crm_log_xml_trace(xml, "[IPC received]"); - - free(uncompressed); return xml; } static int crm_ipcs_flush_events(pcmk__client_t *c); static gboolean crm_ipcs_flush_events_cb(gpointer data) { pcmk__client_t *c = data; c->event_timer = 0; crm_ipcs_flush_events(c); return FALSE; } /*! * \internal * \brief Add progressive delay before next event queue flush * * \param[in,out] c Client connection to add delay to * \param[in] queue_len Current event queue length */ static inline void delay_next_flush(pcmk__client_t *c, unsigned int queue_len) { /* Delay a maximum of 1.5 seconds */ guint delay = (queue_len < 5)? (1000 + 100 * queue_len) : 1500; c->event_timer = pcmk__create_timer(delay, crm_ipcs_flush_events_cb, c); } /*! * \internal * \brief Send client any messages in its queue * * \param[in,out] c Client to flush * * \return Standard Pacemaker return value */ static int crm_ipcs_flush_events(pcmk__client_t *c) { int rc = pcmk_rc_ok; ssize_t qb_rc = 0; unsigned int sent = 0; unsigned int queue_len = 0; if (c == NULL) { return rc; } else if (c->event_timer) { /* There is already a timer, wait until it goes off */ crm_trace("Timer active for %p - %d", c->ipcs, c->event_timer); return rc; } if (c->event_queue) { queue_len = g_queue_get_length(c->event_queue); } while (sent < 100) { pcmk__ipc_header_t *header = NULL; struct iovec *event = NULL; if (c->event_queue) { // We don't pop unless send is successful event = g_queue_peek_head(c->event_queue); } if (event == NULL) { // Queue is empty break; } qb_rc = qb_ipcs_event_sendv(c->ipcs, event, 2); if (qb_rc < 0) { rc = (int) -qb_rc; break; } event = g_queue_pop_head(c->event_queue); sent++; header = event[0].iov_base; - if (header->size_compressed) { - crm_trace("Event %" PRId32 " to %p[%u] (%zd compressed bytes) sent", - header->qb.id, c->ipcs, c->pid, qb_rc); - } else { - crm_trace("Event %" PRId32 " to %p[%u] (%zd bytes) sent: %.120s", - header->qb.id, c->ipcs, c->pid, qb_rc, - (char *) (event[1].iov_base)); - } + crm_trace("Event %" PRId32 " to %p[%u] (%zd bytes) sent: %.120s", + header->qb.id, c->ipcs, c->pid, qb_rc, + (char *) (event[1].iov_base)); pcmk_free_ipc_event(event); } queue_len -= sent; if (sent > 0 || queue_len) { crm_trace("Sent %u events (%u remaining) for %p[%d]: %s (%zd)", sent, queue_len, c->ipcs, c->pid, pcmk_rc_str(rc), qb_rc); } if (queue_len) { /* Allow clients to briefly fall behind on processing incoming messages, * but drop completely unresponsive clients so the connection doesn't * consume resources indefinitely. */ if (queue_len > QB_MAX(c->queue_max, PCMK_IPC_DEFAULT_QUEUE_MAX)) { if ((c->queue_backlog <= 1) || (queue_len < c->queue_backlog)) { /* Don't evict for a new or shrinking backlog */ crm_warn("Client with process ID %u has a backlog of %u messages " QB_XS " %p", c->pid, queue_len, c->ipcs); } else { crm_err("Evicting client with process ID %u due to backlog of %u messages " QB_XS " %p", c->pid, queue_len, c->ipcs); c->queue_backlog = 0; qb_ipcs_disconnect(c->ipcs); return rc; } } c->queue_backlog = queue_len; delay_next_flush(c, queue_len); } else { /* Event queue is empty, there is no backlog */ c->queue_backlog = 0; } return rc; } /*! * \internal * \brief Create an I/O vector for sending an IPC XML message * * \param[in] request Identifier for libqb response header * \param[in] message XML message to send * \param[out] result Where to store prepared I/O vector * \param[out] bytes Size of prepared data in bytes * * \return Standard Pacemaker return code */ int pcmk__ipc_prepare_iov(uint32_t request, const xmlNode *message, struct iovec **result, ssize_t *bytes) { struct iovec *iov; unsigned int total = 0; unsigned int max_send_size = crm_ipc_default_buffer_size(); GString *buffer = NULL; pcmk__ipc_header_t *header = NULL; int rc = pcmk_rc_ok; if ((message == NULL) || (result == NULL)) { rc = EINVAL; goto done; } header = calloc(1, sizeof(pcmk__ipc_header_t)); if (header == NULL) { rc = ENOMEM; goto done; } buffer = g_string_sized_new(1024); pcmk__xml_string(message, 0, buffer, 0); *result = NULL; iov = pcmk__new_ipc_event(); iov[0].iov_len = sizeof(pcmk__ipc_header_t); iov[0].iov_base = header; header->version = PCMK__IPC_VERSION; header->size_uncompressed = 1 + buffer->len; total = iov[0].iov_len + header->size_uncompressed; if (total >= max_send_size) { crm_log_xml_trace(message, "EMSGSIZE"); crm_err("Could not transmit message; message size %" PRIu32" bytes is " "larger than the maximum of %" PRIu32, header->size_uncompressed, max_send_size); rc = EMSGSIZE; goto done; } iov[1].iov_base = pcmk__str_copy(buffer->str); iov[1].iov_len = header->size_uncompressed; header->qb.size = iov[0].iov_len + iov[1].iov_len; header->qb.id = (int32_t)request; /* Replying to a specific request */ *result = iov; pcmk__assert(header->qb.size > 0); if (bytes != NULL) { *bytes = header->qb.size; } done: if (buffer != NULL) { g_string_free(buffer, TRUE); } return rc; } int pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags) { int rc = pcmk_rc_ok; static uint32_t id = 1; pcmk__ipc_header_t *header = iov[0].iov_base; if (c->flags & pcmk__client_proxied) { /* _ALL_ replies to proxied connections need to be sent as events */ if (!pcmk_is_set(flags, crm_ipc_server_event)) { /* The proxied flag lets us know this was originally meant to be a * response, even though we're sending it over the event channel. */ pcmk__set_ipc_flags(flags, "server event", crm_ipc_server_event |crm_ipc_proxied_relay_response); } } pcmk__set_ipc_flags(header->flags, "server event", flags); if (flags & crm_ipc_server_event) { header->qb.id = id++; /* We don't really use it, but doesn't hurt to set one */ if (flags & crm_ipc_server_free) { crm_trace("Sending the original to %p[%d]", c->ipcs, c->pid); add_event(c, iov); } else { struct iovec *iov_copy = pcmk__new_ipc_event(); crm_trace("Sending a copy to %p[%d]", c->ipcs, c->pid); iov_copy[0].iov_len = iov[0].iov_len; iov_copy[0].iov_base = malloc(iov[0].iov_len); memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len); iov_copy[1].iov_len = iov[1].iov_len; iov_copy[1].iov_base = malloc(iov[1].iov_len); memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len); add_event(c, iov_copy); } } else { ssize_t qb_rc; CRM_LOG_ASSERT(header->qb.id != 0); /* Replying to a specific request */ qb_rc = qb_ipcs_response_sendv(c->ipcs, iov, 2); if (qb_rc < header->qb.size) { if (qb_rc < 0) { rc = (int) -qb_rc; } crm_notice("Response %" PRId32 " to pid %u failed: %s " QB_XS " bytes=%" PRId32 " rc=%zd ipcs=%p", header->qb.id, c->pid, pcmk_rc_str(rc), header->qb.size, qb_rc, c->ipcs); } else { crm_trace("Response %" PRId32 " sent, %zd bytes to %p[%u]", header->qb.id, qb_rc, c->ipcs, c->pid); } if (flags & crm_ipc_server_free) { pcmk_free_ipc_event(iov); } } if (flags & crm_ipc_server_event) { rc = crm_ipcs_flush_events(c); } else { crm_ipcs_flush_events(c); } if ((rc == EPIPE) || (rc == ENOTCONN)) { crm_trace("Client %p disconnected", c->ipcs); } return rc; } int pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message, uint32_t flags) { struct iovec *iov = NULL; int rc = pcmk_rc_ok; if (c == NULL) { return EINVAL; } rc = pcmk__ipc_prepare_iov(request, message, &iov, NULL); if (rc == pcmk_rc_ok) { pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free); rc = pcmk__ipc_send_iov(c, iov, flags); } else { pcmk_free_ipc_event(iov); crm_notice("IPC message to pid %d failed: %s " QB_XS " rc=%d", c->pid, pcmk_rc_str(rc), rc); } return rc; } /*! * \internal * \brief Create an acknowledgement with a status code to send to a client * * \param[in] function Calling function * \param[in] line Source file line within calling function * \param[in] flags IPC flags to use when sending * \param[in] tag Element name to use for acknowledgement * \param[in] ver IPC protocol version (can be NULL) * \param[in] status Exit status code to add to ack * * \return Newly created XML for ack * * \note The caller is responsible for freeing the return value with * \c pcmk__xml_free(). */ xmlNode * pcmk__ipc_create_ack_as(const char *function, int line, uint32_t flags, const char *tag, const char *ver, crm_exit_t status) { xmlNode *ack = NULL; if (pcmk_is_set(flags, crm_ipc_client_response)) { ack = pcmk__xe_create(NULL, tag); crm_xml_add(ack, PCMK_XA_FUNCTION, function); crm_xml_add_int(ack, PCMK__XA_LINE, line); crm_xml_add_int(ack, PCMK_XA_STATUS, (int) status); crm_xml_add(ack, PCMK__XA_IPC_PROTO_VERSION, ver); } return ack; } /*! * \internal * \brief Send an acknowledgement with a status code to a client * * \param[in] function Calling function * \param[in] line Source file line within calling function * \param[in] c Client to send ack to * \param[in] request Request ID being replied to * \param[in] flags IPC flags to use when sending * \param[in] tag Element name to use for acknowledgement * \param[in] ver IPC protocol version (can be NULL) * \param[in] status Status code to send with acknowledgement * * \return Standard Pacemaker return code */ int pcmk__ipc_send_ack_as(const char *function, int line, pcmk__client_t *c, uint32_t request, uint32_t flags, const char *tag, const char *ver, crm_exit_t status) { int rc = pcmk_rc_ok; xmlNode *ack = pcmk__ipc_create_ack_as(function, line, flags, tag, ver, status); if (ack != NULL) { crm_trace("Ack'ing IPC message from client %s as <%s status=%d>", pcmk__client_name(c), tag, status); crm_log_xml_trace(ack, "sent-ack"); c->request_id = 0; rc = pcmk__ipc_send_xml(c, request, ack, flags); pcmk__xml_free(ack); } return rc; } /*! * \internal * \brief Add an IPC server to the main loop for the CIB manager API * * \param[out] ipcs_ro New IPC server for read-only CIB manager API * \param[out] ipcs_rw New IPC server for read/write CIB manager API * \param[out] ipcs_shm New IPC server for shared-memory CIB manager API * \param[in] ro_cb IPC callbacks for read-only API * \param[in] rw_cb IPC callbacks for read/write and shared-memory APIs * * \note This function exits fatally if unable to create the servers. * \note There is no actual difference between the three IPC endpoints other * than their names. */ void pcmk__serve_based_ipc(qb_ipcs_service_t **ipcs_ro, qb_ipcs_service_t **ipcs_rw, qb_ipcs_service_t **ipcs_shm, struct qb_ipcs_service_handlers *ro_cb, struct qb_ipcs_service_handlers *rw_cb) { *ipcs_ro = mainloop_add_ipc_server(PCMK__SERVER_BASED_RO, QB_IPC_NATIVE, ro_cb); *ipcs_rw = mainloop_add_ipc_server(PCMK__SERVER_BASED_RW, QB_IPC_NATIVE, rw_cb); *ipcs_shm = mainloop_add_ipc_server(PCMK__SERVER_BASED_SHM, QB_IPC_SHM, rw_cb); if (*ipcs_ro == NULL || *ipcs_rw == NULL || *ipcs_shm == NULL) { crm_err("Failed to create the CIB manager: exiting and inhibiting respawn"); crm_warn("Verify pacemaker and pacemaker_remote are not both enabled"); crm_exit(CRM_EX_FATAL); } } /*! * \internal * \brief Destroy IPC servers for the CIB manager API * * \param[out] ipcs_ro IPC server for read-only the CIB manager API * \param[out] ipcs_rw IPC server for read/write the CIB manager API * \param[out] ipcs_shm IPC server for shared-memory the CIB manager API * * \note This is a convenience function for calling qb_ipcs_destroy() for each * argument. */ void pcmk__stop_based_ipc(qb_ipcs_service_t *ipcs_ro, qb_ipcs_service_t *ipcs_rw, qb_ipcs_service_t *ipcs_shm) { qb_ipcs_destroy(ipcs_ro); qb_ipcs_destroy(ipcs_rw); qb_ipcs_destroy(ipcs_shm); } /*! * \internal * \brief Add an IPC server to the main loop for the controller API * * \param[in] cb IPC callbacks * * \return Newly created IPC server */ qb_ipcs_service_t * pcmk__serve_controld_ipc(struct qb_ipcs_service_handlers *cb) { return mainloop_add_ipc_server(CRM_SYSTEM_CRMD, QB_IPC_NATIVE, cb); } /*! * \internal * \brief Add an IPC server to the main loop for the attribute manager API * * \param[out] ipcs Where to store newly created IPC server * \param[in] cb IPC callbacks * * \note This function exits fatally if unable to create the servers. */ void pcmk__serve_attrd_ipc(qb_ipcs_service_t **ipcs, struct qb_ipcs_service_handlers *cb) { *ipcs = mainloop_add_ipc_server(PCMK__VALUE_ATTRD, QB_IPC_NATIVE, cb); if (*ipcs == NULL) { crm_crit("Exiting fatally because unable to serve " PCMK__SERVER_ATTRD " IPC (verify pacemaker and pacemaker_remote are not both " "enabled)"); crm_exit(CRM_EX_FATAL); } } /*! * \internal * \brief Add an IPC server to the main loop for the fencer API * * \param[out] ipcs Where to store newly created IPC server * \param[in] cb IPC callbacks * * \note This function exits fatally if unable to create the servers. */ void pcmk__serve_fenced_ipc(qb_ipcs_service_t **ipcs, struct qb_ipcs_service_handlers *cb) { *ipcs = mainloop_add_ipc_server_with_prio("stonith-ng", QB_IPC_NATIVE, cb, QB_LOOP_HIGH); if (*ipcs == NULL) { crm_err("Failed to create fencer: exiting and inhibiting respawn."); crm_warn("Verify pacemaker and pacemaker_remote are not both enabled."); crm_exit(CRM_EX_FATAL); } } /*! * \internal * \brief Add an IPC server to the main loop for the pacemakerd API * * \param[out] ipcs Where to store newly created IPC server * \param[in] cb IPC callbacks * * \note This function exits with CRM_EX_OSERR if unable to create the servers. */ void pcmk__serve_pacemakerd_ipc(qb_ipcs_service_t **ipcs, struct qb_ipcs_service_handlers *cb) { *ipcs = mainloop_add_ipc_server(CRM_SYSTEM_MCP, QB_IPC_NATIVE, cb); if (*ipcs == NULL) { crm_err("Couldn't start pacemakerd IPC server"); crm_warn("Verify pacemaker and pacemaker_remote are not both enabled."); /* sub-daemons are observed by pacemakerd. Thus we exit CRM_EX_FATAL * if we want to prevent pacemakerd from restarting them. * With pacemakerd we leave the exit-code shown to e.g. systemd * to what it was prior to moving the code here from pacemakerd.c */ crm_exit(CRM_EX_OSERR); } } /*! * \internal * \brief Add an IPC server to the main loop for the scheduler API * * \param[in] cb IPC callbacks * * \return Newly created IPC server * \note This function exits fatally if unable to create the servers. */ qb_ipcs_service_t * pcmk__serve_schedulerd_ipc(struct qb_ipcs_service_handlers *cb) { return mainloop_add_ipc_server(CRM_SYSTEM_PENGINE, QB_IPC_NATIVE, cb); }