diff --git a/include/crm/common/mainloop.h b/include/crm/common/mainloop.h index ebdc6b092a..30bde33111 100644 --- a/include/crm/common/mainloop.h +++ b/include/crm/common/mainloop.h @@ -1,200 +1,201 @@ /* - * Copyright 2009-2024 the Pacemaker project contributors + * Copyright 2009-2025 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #ifndef PCMK__CRM_COMMON_MAINLOOP__H #define PCMK__CRM_COMMON_MAINLOOP__H #include // bool #include // sighandler_t #include // pid_t, ssize_t #include // gpointer, gboolean, guint, GSourceFunc, GMainLoop #include // qb_ipcs_service_t, etc. #include #ifdef __cplusplus extern "C" { #endif /** * \file * \brief Wrappers for and extensions to glib mainloop * \ingroup core */ enum mainloop_child_flags { /* don't kill pid group on timeout, only kill the pid */ mainloop_leave_pid_group = 0x01, }; // NOTE: sbd (as of at least 1.5.2) uses this typedef struct trigger_s crm_trigger_t; typedef struct mainloop_io_s mainloop_io_t; typedef struct mainloop_child_s mainloop_child_t; // NOTE: sbd (as of at least 1.5.2) uses this typedef struct mainloop_timer_s mainloop_timer_t; void mainloop_cleanup(void); // NOTE: sbd (as of at least 1.5.2) uses this crm_trigger_t *mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data), gpointer userdata); // NOTE: sbd (as of at least 1.5.2) uses this void mainloop_set_trigger(crm_trigger_t * source); void mainloop_trigger_complete(crm_trigger_t * trig); gboolean mainloop_destroy_trigger(crm_trigger_t * source); #ifndef HAVE_SIGHANDLER_T typedef void (*sighandler_t)(int); #endif sighandler_t crm_signal_handler(int sig, sighandler_t dispatch); // NOTE: sbd (as of at least 1.5.2) uses this gboolean mainloop_add_signal(int sig, void (*dispatch) (int sig)); gboolean mainloop_destroy_signal(int sig); bool mainloop_timer_running(mainloop_timer_t *t); // NOTE: sbd (as of at least 1.5.2) uses this void mainloop_timer_start(mainloop_timer_t *t); // NOTE: sbd (as of at least 1.5.2) uses this void mainloop_timer_stop(mainloop_timer_t *t); guint mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms); // NOTE: sbd (as of at least 1.5.2) uses this mainloop_timer_t *mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata); void mainloop_timer_del(mainloop_timer_t *t); struct ipc_client_callbacks { /*! * \brief Dispatch function for an IPC connection used as mainloop source * * \param[in] buffer Message read from IPC connection * \param[in] length Number of bytes in \p buffer * \param[in] userdata User data passed when creating mainloop source * * \return Negative value to remove source, anything else to keep it */ int (*dispatch) (const char *buffer, ssize_t length, gpointer userdata); /*! * \brief Destroy function for mainloop IPC connection client data * * \param[in,out] userdata User data passed when creating mainloop source */ void (*destroy) (gpointer userdata); }; qb_ipcs_service_t *mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks); /*! * \brief Start server-side API end-point, hooked into the internal event loop * * \param[in] name name of the IPC end-point ("address" for the client) * \param[in] type selects libqb's IPC back-end (or use #QB_IPC_NATIVE) * \param[in] callbacks defines libqb's IPC service-level handlers * \param[in] priority priority relative to other events handled in the * abstract handling loop, use #QB_LOOP_MED when unsure * * \return libqb's opaque handle to the created service abstraction * * \note For portability concerns, do not use this function if you keep * \p priority as #QB_LOOP_MED, stick with #mainloop_add_ipc_server * (with exactly such semantics) instead (once you link with this new * symbol employed, you can't downgrade the library freely anymore). * * \note The intended effect will only get fully reflected when run-time * linked to patched libqb: https://github.com/ClusterLabs/libqb/pull/352 */ qb_ipcs_service_t *mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks, enum qb_loop_priority prio); void mainloop_del_ipc_server(qb_ipcs_service_t * server); +// @COMPAT max_size parameter is deprecated and unused since 3.0.1 mainloop_io_t *mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks); void mainloop_del_ipc_client(mainloop_io_t * client); crm_ipc_t *mainloop_get_ipc_client(mainloop_io_t * client); struct mainloop_fd_callbacks { /*! * \brief Dispatch function for mainloop file descriptor with data ready * * \param[in,out] userdata User data passed when creating mainloop source * * \return Negative value to remove source, anything else to keep it */ int (*dispatch) (gpointer userdata); /*! * \brief Destroy function for mainloop file descriptor client data * * \param[in,out] userdata User data passed when creating mainloop source */ void (*destroy) (gpointer userdata); }; mainloop_io_t *mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks); void mainloop_del_fd(mainloop_io_t * client); /* * Create a new tracked process * To track a process group, use -pid */ void mainloop_child_add(pid_t pid, int timeout, const char *desc, void *userdata, void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode)); void mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *userdata, enum mainloop_child_flags, void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode)); void *mainloop_child_userdata(mainloop_child_t * child); int mainloop_child_timeout(mainloop_child_t * child); const char *mainloop_child_name(mainloop_child_t * child); pid_t mainloop_child_pid(mainloop_child_t * child); void mainloop_clear_child_userdata(mainloop_child_t * child); gboolean mainloop_child_kill(pid_t pid); void pcmk_quit_main_loop(GMainLoop *mloop, unsigned int n); void pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool (*check)(guint)); #define G_PRIORITY_MEDIUM (G_PRIORITY_HIGH/2) #ifdef __cplusplus } #endif #endif diff --git a/lib/cib/cib_native.c b/lib/cib/cib_native.c index 274c43b9d1..0182be4002 100644 --- a/lib/cib/cib_native.c +++ b/lib/cib/cib_native.c @@ -1,486 +1,486 @@ /* * Copyright 2004 International Business Machines - * Later changes copyright 2004-2024 the Pacemaker project contributors + * Later changes copyright 2004-2025 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #include #include #include #include #include #include #include #include #include #include #include #include #include typedef struct cib_native_opaque_s { char *token; crm_ipc_t *ipc; void (*dnotify_fn) (gpointer user_data); mainloop_io_t *source; } cib_native_opaque_t; static int cib_native_perform_op_delegate(cib_t *cib, const char *op, const char *host, const char *section, xmlNode *data, xmlNode **output_data, int call_options, const char *user_name) { int rc = pcmk_ok; int reply_id = 0; enum crm_ipc_flags ipc_flags = crm_ipc_flags_none; xmlNode *op_msg = NULL; xmlNode *op_reply = NULL; cib_native_opaque_t *native = cib->variant_opaque; if (cib->state == cib_disconnected) { return -ENOTCONN; } if (output_data != NULL) { *output_data = NULL; } if (op == NULL) { crm_err("No operation specified"); return -EINVAL; } if (call_options & cib_sync_call) { pcmk__set_ipc_flags(ipc_flags, "client", crm_ipc_client_response); } rc = cib__create_op(cib, op, host, section, data, call_options, user_name, NULL, &op_msg); if (rc != pcmk_ok) { return rc; } if (pcmk_is_set(call_options, cib_transaction)) { rc = cib__extend_transaction(cib, op_msg); goto done; } crm_trace("Sending %s message to the CIB manager (timeout=%ds)", op, cib->call_timeout); rc = crm_ipc_send(native->ipc, op_msg, ipc_flags, cib->call_timeout * 1000, &op_reply); if (rc < 0) { crm_err("Couldn't perform %s operation (timeout=%ds): %s (%d)", op, cib->call_timeout, pcmk_strerror(rc), rc); rc = -ECOMM; goto done; } crm_log_xml_trace(op_reply, "Reply"); if (!(call_options & cib_sync_call)) { crm_trace("Async call, returning %d", cib->call_id); CRM_CHECK(cib->call_id != 0, rc = -ENOMSG; goto done); rc = cib->call_id; goto done; } rc = pcmk_ok; crm_element_value_int(op_reply, PCMK__XA_CIB_CALLID, &reply_id); if (reply_id == cib->call_id) { xmlNode *wrapper = pcmk__xe_first_child(op_reply, PCMK__XE_CIB_CALLDATA, NULL, NULL); xmlNode *tmp = pcmk__xe_first_child(wrapper, NULL, NULL, NULL); crm_trace("Synchronous reply %d received", reply_id); if (crm_element_value_int(op_reply, PCMK__XA_CIB_RC, &rc) != 0) { rc = -EPROTO; } if (output_data == NULL || (call_options & cib_discard_reply)) { crm_trace("Discarding reply"); } else { *output_data = pcmk__xml_copy(NULL, tmp); } } else if (reply_id <= 0) { crm_err("Received bad reply: No id set"); crm_log_xml_err(op_reply, "Bad reply"); rc = -ENOMSG; goto done; } else { crm_err("Received bad reply: %d (wanted %d)", reply_id, cib->call_id); crm_log_xml_err(op_reply, "Old reply"); rc = -ENOMSG; goto done; } if (op_reply == NULL && cib->state == cib_disconnected) { rc = -ENOTCONN; } else if (rc == pcmk_ok && op_reply == NULL) { rc = -ETIME; } switch (rc) { case pcmk_ok: case -EPERM: break; /* This is an internal value that clients do not and should not care about */ case -pcmk_err_diff_resync: rc = pcmk_ok; break; /* These indicate internal problems */ case -EPROTO: case -ENOMSG: crm_err("Call failed: %s", pcmk_strerror(rc)); if (op_reply) { crm_log_xml_err(op_reply, "Invalid reply"); } break; default: if (!pcmk__str_eq(op, PCMK__CIB_REQUEST_QUERY, pcmk__str_none)) { crm_warn("Call failed: %s", pcmk_strerror(rc)); } } done: if (!crm_ipc_connected(native->ipc)) { crm_err("The CIB manager disconnected"); cib->state = cib_disconnected; } pcmk__xml_free(op_msg); pcmk__xml_free(op_reply); return rc; } static int cib_native_dispatch_internal(const char *buffer, ssize_t length, gpointer userdata) { const char *type = NULL; xmlNode *msg = NULL; cib_t *cib = userdata; crm_trace("dispatching %p", userdata); if (cib == NULL) { crm_err("No CIB!"); return 0; } msg = pcmk__xml_parse(buffer); if (msg == NULL) { crm_warn("Received a NULL message from the CIB manager"); return 0; } /* do callbacks */ type = crm_element_value(msg, PCMK__XA_T); crm_trace("Activating %s callbacks...", type); crm_log_xml_explicit(msg, "cib-reply"); if (pcmk__str_eq(type, PCMK__VALUE_CIB, pcmk__str_none)) { cib_native_callback(cib, msg, 0, 0); } else if (pcmk__str_eq(type, PCMK__VALUE_CIB_NOTIFY, pcmk__str_none)) { g_list_foreach(cib->notify_list, cib_native_notify, msg); } else { crm_err("Unknown message type: %s", type); } pcmk__xml_free(msg); return 0; } static void cib_native_destroy(void *userdata) { cib_t *cib = userdata; cib_native_opaque_t *native = cib->variant_opaque; crm_trace("destroying %p", userdata); cib->state = cib_disconnected; native->source = NULL; native->ipc = NULL; if (native->dnotify_fn) { native->dnotify_fn(userdata); } } static int cib_native_signoff(cib_t *cib) { cib_native_opaque_t *native = cib->variant_opaque; crm_debug("Disconnecting from the CIB manager"); cib_free_notify(cib); remove_cib_op_callback(0, TRUE); if (native->source != NULL) { /* Attached to mainloop */ mainloop_del_ipc_client(native->source); native->source = NULL; native->ipc = NULL; } else if (native->ipc) { /* Not attached to mainloop */ crm_ipc_t *ipc = native->ipc; native->ipc = NULL; crm_ipc_close(ipc); crm_ipc_destroy(ipc); } cib->cmds->end_transaction(cib, false, cib_none); cib->state = cib_disconnected; cib->type = cib_no_connection; return pcmk_ok; } static int cib_native_signon(cib_t *cib, const char *name, enum cib_conn_type type) { int rc = pcmk_ok; const char *channel = NULL; cib_native_opaque_t *native = cib->variant_opaque; xmlNode *hello = NULL; struct ipc_client_callbacks cib_callbacks = { .dispatch = cib_native_dispatch_internal, .destroy = cib_native_destroy }; if (name == NULL) { name = pcmk__s(crm_system_name, "client"); } cib->call_timeout = PCMK__IPC_TIMEOUT; if (type == cib_command) { cib->state = cib_connected_command; channel = PCMK__SERVER_BASED_RW; } else if (type == cib_command_nonblocking) { cib->state = cib_connected_command; channel = PCMK__SERVER_BASED_SHM; } else if (type == cib_query) { cib->state = cib_connected_query; channel = PCMK__SERVER_BASED_RO; } else { return -ENOTCONN; } crm_trace("Connecting %s channel", channel); - native->source = mainloop_add_ipc_client(channel, G_PRIORITY_HIGH, - 512 * 1024, cib, &cib_callbacks); + native->source = mainloop_add_ipc_client(channel, G_PRIORITY_HIGH, 0, cib, + &cib_callbacks); native->ipc = mainloop_get_ipc_client(native->source); if (rc != pcmk_ok || native->ipc == NULL || !crm_ipc_connected(native->ipc)) { crm_info("Could not connect to CIB manager for %s", name); rc = -ENOTCONN; } if (rc == pcmk_ok) { rc = cib__create_op(cib, CRM_OP_REGISTER, NULL, NULL, NULL, cib_sync_call, NULL, name, &hello); } if (rc == pcmk_ok) { xmlNode *reply = NULL; if (crm_ipc_send(native->ipc, hello, crm_ipc_client_response, -1, &reply) > 0) { const char *msg_type = crm_element_value(reply, PCMK__XA_CIB_OP); crm_log_xml_trace(reply, "reg-reply"); if (!pcmk__str_eq(msg_type, CRM_OP_REGISTER, pcmk__str_casei)) { crm_info("Reply to CIB registration message has unknown type " "'%s'", msg_type); rc = -EPROTO; } else { native->token = crm_element_value_copy(reply, PCMK__XA_CIB_CLIENTID); if (native->token == NULL) { rc = -EPROTO; } } pcmk__xml_free(reply); } else { rc = -ECOMM; } pcmk__xml_free(hello); } if (rc == pcmk_ok) { crm_info("Successfully connected to CIB manager for %s", name); return pcmk_ok; } crm_info("Connection to CIB manager for %s failed: %s", name, pcmk_strerror(rc)); cib_native_signoff(cib); return rc; } static int cib_native_free(cib_t *cib) { int rc = pcmk_ok; if (cib->state != cib_disconnected) { rc = cib_native_signoff(cib); } if (cib->state == cib_disconnected) { cib_native_opaque_t *native = cib->variant_opaque; free(native->token); free(cib->variant_opaque); free(cib->cmds); free(cib->user); free(cib); } return rc; } static int cib_native_register_notification(cib_t *cib, const char *callback, int enabled) { int rc = pcmk_ok; xmlNode *notify_msg = pcmk__xe_create(NULL, PCMK__XE_CIB_CALLBACK); cib_native_opaque_t *native = cib->variant_opaque; if (cib->state != cib_disconnected) { crm_xml_add(notify_msg, PCMK__XA_CIB_OP, PCMK__VALUE_CIB_NOTIFY); crm_xml_add(notify_msg, PCMK__XA_CIB_NOTIFY_TYPE, callback); crm_xml_add_int(notify_msg, PCMK__XA_CIB_NOTIFY_ACTIVATE, enabled); rc = crm_ipc_send(native->ipc, notify_msg, crm_ipc_client_response, 1000 * cib->call_timeout, NULL); if (rc <= 0) { crm_trace("Notification not registered: %d", rc); rc = -ECOMM; } } pcmk__xml_free(notify_msg); return rc; } static int cib_native_set_connection_dnotify(cib_t *cib, void (*dnotify) (gpointer user_data)) { cib_native_opaque_t *native = NULL; if (cib == NULL) { crm_err("No CIB!"); return FALSE; } native = cib->variant_opaque; native->dnotify_fn = dnotify; return pcmk_ok; } /*! * \internal * \brief Get the given CIB connection's unique client identifier * * These can be used to check whether this client requested the action that * triggered a CIB notification. * * \param[in] cib CIB connection * \param[out] async_id If not \p NULL, where to store asynchronous client ID * \param[out] sync_id If not \p NULL, where to store synchronous client ID * * \return Legacy Pacemaker return code (specifically, \p pcmk_ok) * * \note This is the \p cib_native variant implementation of * \p cib_api_operations_t:client_id(). * \note For \p cib_native objects, \p async_id and \p sync_id are the same. * \note The client ID is assigned during CIB sign-on. */ static int cib_native_client_id(const cib_t *cib, const char **async_id, const char **sync_id) { cib_native_opaque_t *native = cib->variant_opaque; if (async_id != NULL) { *async_id = native->token; } if (sync_id != NULL) { *sync_id = native->token; } return pcmk_ok; } cib_t * cib_native_new(void) { cib_native_opaque_t *native = NULL; cib_t *cib = cib_new_variant(); if (cib == NULL) { return NULL; } native = calloc(1, sizeof(cib_native_opaque_t)); if (native == NULL) { free(cib); return NULL; } cib->variant = cib_native; cib->variant_opaque = native; native->ipc = NULL; native->source = NULL; native->dnotify_fn = NULL; /* assign variant specific ops */ cib->delegate_fn = cib_native_perform_op_delegate; cib->cmds->signon = cib_native_signon; cib->cmds->signoff = cib_native_signoff; cib->cmds->free = cib_native_free; cib->cmds->register_notification = cib_native_register_notification; cib->cmds->set_connection_dnotify = cib_native_set_connection_dnotify; cib->cmds->client_id = cib_native_client_id; return cib; }