diff --git a/include/crm/common/mainloop.h b/include/crm/common/mainloop.h index 85da1cd661..2cfb63e84f 100644 --- a/include/crm/common/mainloop.h +++ b/include/crm/common/mainloop.h @@ -1,135 +1,159 @@ /* * Copyright 2009-2019 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #ifndef CRM_COMMON_MAINLOOP__H # define CRM_COMMON_MAINLOOP__H #ifdef __cplusplus extern "C" { #endif /** * \file * \brief Wrappers for and extensions to glib mainloop * \ingroup core */ # include // sighandler_t # include enum mainloop_child_flags { /* don't kill pid group on timeout, only kill the pid */ mainloop_leave_pid_group = 0x01, }; typedef struct trigger_s crm_trigger_t; typedef struct mainloop_io_s mainloop_io_t; typedef struct mainloop_child_s mainloop_child_t; typedef struct mainloop_timer_s mainloop_timer_t; void mainloop_cleanup(void); crm_trigger_t *mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data), gpointer userdata); void mainloop_set_trigger(crm_trigger_t * source); void mainloop_trigger_complete(crm_trigger_t * trig); gboolean mainloop_destroy_trigger(crm_trigger_t * source); # ifndef HAVE_SIGHANDLER_T typedef void (*sighandler_t)(int); # endif sighandler_t crm_signal_handler(int sig, sighandler_t dispatch); gboolean crm_signal(int sig, void (*dispatch) (int sig)); // deprecated gboolean mainloop_add_signal(int sig, void (*dispatch) (int sig)); gboolean mainloop_destroy_signal(int sig); bool mainloop_timer_running(mainloop_timer_t *t); void mainloop_timer_start(mainloop_timer_t *t); void mainloop_timer_stop(mainloop_timer_t *t); guint mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms); mainloop_timer_t *mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata); void mainloop_timer_del(mainloop_timer_t *t); # include # include struct ipc_client_callbacks { int (*dispatch) (const char *buffer, ssize_t length, gpointer userdata); void (*destroy) (gpointer); }; qb_ipcs_service_t *mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks); +/*! + * \brief Start server-side API end-point, hooked into the internal event loop + * + * \param[in] name name of the IPC end-point ("address" for the client) + * \param[in] type selects libqb's IPC back-end (or use #QB_IPC_NATIVE) + * \param[in] callbacks defines libqb's IPC service-level handlers + * \param[in] priority priority relative to other events handled in the + * abstract handling loop, use #QB_LOOP_MED when unsure + * + * \return libqb's opaque handle to the created service abstraction + * + * \note For portability concerns, do not use this function if you keep + * \p priority as #QB_LOOP_MED, stick with #mainloop_add_ipc_server + * (with exactly such semantics) instead (once you link with this new + * symbol employed, you can't downgrade the library freely anymore). + * + * \note The intended effect will only get fully reflected when run-time + * linked to patched libqb: https://github.com/ClusterLabs/libqb/pull/352 + */ +qb_ipcs_service_t *mainloop_add_ipc_server_with_prio(const char *name, + enum qb_ipc_type type, + struct qb_ipcs_service_handlers *callbacks, + enum qb_loop_priority prio); + void mainloop_del_ipc_server(qb_ipcs_service_t * server); mainloop_io_t *mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks); void mainloop_del_ipc_client(mainloop_io_t * client); crm_ipc_t *mainloop_get_ipc_client(mainloop_io_t * client); struct mainloop_fd_callbacks { int (*dispatch) (gpointer userdata); void (*destroy) (gpointer userdata); }; mainloop_io_t *mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks); void mainloop_del_fd(mainloop_io_t * client); /* * Create a new tracked process * To track a process group, use -pid */ void mainloop_child_add(pid_t pid, int timeout, const char *desc, void *userdata, void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode)); void mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *userdata, enum mainloop_child_flags, void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode)); void *mainloop_child_userdata(mainloop_child_t * child); int mainloop_child_timeout(mainloop_child_t * child); const char *mainloop_child_name(mainloop_child_t * child); pid_t mainloop_child_pid(mainloop_child_t * child); void mainloop_clear_child_userdata(mainloop_child_t * child); gboolean mainloop_child_kill(pid_t pid); void pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool (*check)(guint)); # define G_PRIORITY_MEDIUM (G_PRIORITY_HIGH/2) #ifdef __cplusplus } #endif #endif diff --git a/lib/common/mainloop.c b/lib/common/mainloop.c index 18f7014af1..17e69f0a87 100644 --- a/lib/common/mainloop.c +++ b/lib/common/mainloop.c @@ -1,1351 +1,1427 @@ /* * Copyright 2004-2019 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #include #ifndef _GNU_SOURCE # define _GNU_SOURCE #endif #include #include #include #include #include #include #include #include #include struct mainloop_child_s { pid_t pid; char *desc; unsigned timerid; gboolean timeout; void *privatedata; enum mainloop_child_flags flags; /* Called when a process dies */ void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode); }; struct trigger_s { GSource source; gboolean running; gboolean trigger; void *user_data; guint id; }; static gboolean crm_trigger_prepare(GSource * source, gint * timeout) { crm_trigger_t *trig = (crm_trigger_t *) source; /* cluster-glue's FD and IPC related sources make use of * g_source_add_poll() but do not set a timeout in their prepare * functions * * This means mainloop's poll() will block until an event for one * of these sources occurs - any /other/ type of source, such as * this one or g_idle_*, that doesn't use g_source_add_poll() is * S-O-L and won't be processed until there is something fd-based * happens. * * Luckily the timeout we can set here affects all sources and * puts an upper limit on how long poll() can take. * * So unconditionally set a small-ish timeout, not too small that * we're in constant motion, which will act as an upper bound on * how long the signal handling might be delayed for. */ *timeout = 500; /* Timeout in ms */ return trig->trigger; } static gboolean crm_trigger_check(GSource * source) { crm_trigger_t *trig = (crm_trigger_t *) source; return trig->trigger; } static gboolean crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata) { int rc = TRUE; crm_trigger_t *trig = (crm_trigger_t *) source; if (trig->running) { /* Wait until the existing job is complete before starting the next one */ return TRUE; } trig->trigger = FALSE; if (callback) { rc = callback(trig->user_data); if (rc < 0) { crm_trace("Trigger handler %p not yet complete", trig); trig->running = TRUE; rc = TRUE; } } return rc; } static void crm_trigger_finalize(GSource * source) { crm_trace("Trigger %p destroyed", source); } #if 0 struct _GSourceCopy { gpointer callback_data; GSourceCallbackFuncs *callback_funcs; const GSourceFuncs *source_funcs; guint ref_count; GMainContext *context; gint priority; guint flags; guint source_id; GSList *poll_fds; GSource *prev; GSource *next; char *name; void *priv; }; static int g_source_refcount(GSource * source) { /* Duplicating the contents of private header files is a necessary evil */ if (source) { struct _GSourceCopy *evil = (struct _GSourceCopy*)source; return evil->ref_count; } return 0; } #else static int g_source_refcount(GSource * source) { return 0; } #endif static GSourceFuncs crm_trigger_funcs = { crm_trigger_prepare, crm_trigger_check, crm_trigger_dispatch, crm_trigger_finalize, }; static crm_trigger_t * mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data), gpointer userdata) { crm_trigger_t *trigger = NULL; trigger = (crm_trigger_t *) source; trigger->id = 0; trigger->trigger = FALSE; trigger->user_data = userdata; if (dispatch) { g_source_set_callback(source, dispatch, trigger, NULL); } g_source_set_priority(source, priority); g_source_set_can_recurse(source, FALSE); crm_trace("Setup %p with ref-count=%u", source, g_source_refcount(source)); trigger->id = g_source_attach(source, NULL); crm_trace("Attached %p with ref-count=%u", source, g_source_refcount(source)); return trigger; } void mainloop_trigger_complete(crm_trigger_t * trig) { crm_trace("Trigger handler %p complete", trig); trig->running = FALSE; } /* If dispatch returns: * -1: Job running but not complete * 0: Remove the trigger from mainloop * 1: Leave the trigger in mainloop */ crm_trigger_t * mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data), gpointer userdata) { GSource *source = NULL; CRM_ASSERT(sizeof(crm_trigger_t) > sizeof(GSource)); source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t)); CRM_ASSERT(source != NULL); return mainloop_setup_trigger(source, priority, dispatch, userdata); } void mainloop_set_trigger(crm_trigger_t * source) { if(source) { source->trigger = TRUE; } } gboolean mainloop_destroy_trigger(crm_trigger_t * source) { GSource *gs = NULL; if(source == NULL) { return TRUE; } gs = (GSource *)source; if(g_source_refcount(gs) > 2) { crm_info("Trigger %p is still referenced %u times", gs, g_source_refcount(gs)); } g_source_destroy(gs); /* Remove from mainloop, ref_count-- */ g_source_unref(gs); /* The caller no longer carries a reference to source * * At this point the source should be free'd, * unless we're currently processing said * source, in which case mainloop holds an * additional reference and it will be free'd * once our processing completes */ return TRUE; } // Define a custom glib source for signal handling // Data structure for custom glib source typedef struct signal_s { crm_trigger_t trigger; // trigger that invoked source (must be first) void (*handler) (int sig); // signal handler int signal; // signal that was received } crm_signal_t; // Table to associate signal handlers with signal numbers static crm_signal_t *crm_signals[NSIG]; /*! * \internal * \brief Dispatch an event from custom glib source for signals * * Given an signal event, clear the event trigger and call any registered * signal handler. * * \param[in] source glib source that triggered this dispatch * \param[in] callback (ignored) * \param[in] userdata (ignored) */ static gboolean crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata) { crm_signal_t *sig = (crm_signal_t *) source; if(sig->signal != SIGCHLD) { crm_notice("Caught '%s' signal "CRM_XS" %d (%s handler)", strsignal(sig->signal), sig->signal, (sig->handler? "invoking" : "no")); } sig->trigger.trigger = FALSE; if (sig->handler) { sig->handler(sig->signal); } return TRUE; } /*! * \internal * \brief Handle a signal by setting a trigger for signal source * * \param[in] sig Signal number that was received * * \note This is the true signal handler for the mainloop signal source, and * must be async-safe. */ static void mainloop_signal_handler(int sig) { if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) { mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]); } } // Functions implementing our custom glib source for signal handling static GSourceFuncs crm_signal_funcs = { crm_trigger_prepare, crm_trigger_check, crm_signal_dispatch, crm_trigger_finalize, }; /*! * \internal * \brief Set a true signal handler * * signal()-like interface to sigaction() * * \param[in] sig Signal number to register handler for * \param[in] dispatch Signal handler * * \return The previous value of the signal handler, or SIG_ERR on error * \note The dispatch function must be async-safe. */ sighandler_t crm_signal_handler(int sig, sighandler_t dispatch) { sigset_t mask; struct sigaction sa; struct sigaction old; if (sigemptyset(&mask) < 0) { crm_err("Could not set handler for signal %d: %s", sig, pcmk_strerror(errno)); return SIG_ERR; } memset(&sa, 0, sizeof(struct sigaction)); sa.sa_handler = dispatch; sa.sa_flags = SA_RESTART; sa.sa_mask = mask; if (sigaction(sig, &sa, &old) < 0) { crm_err("Could not set handler for signal %d: %s", sig, pcmk_strerror(errno)); return SIG_ERR; } return old.sa_handler; } /* * \brief Use crm_signal_handler() instead * \deprecated */ gboolean crm_signal(int sig, void (*dispatch) (int sig)) { return crm_signal_handler(sig, dispatch) != SIG_ERR; } static void mainloop_destroy_signal_entry(int sig) { crm_signal_t *tmp = crm_signals[sig]; crm_signals[sig] = NULL; crm_trace("Destroying signal %d", sig); mainloop_destroy_trigger((crm_trigger_t *) tmp); } /*! * \internal * \brief Add a signal handler to a mainloop * * \param[in] sig Signal number to handle * \param[in] dispatch Signal handler function * * \note The true signal handler merely sets a mainloop trigger to call this * dispatch function via the mainloop. Therefore, the dispatch function * does not need to be async-safe. */ gboolean mainloop_add_signal(int sig, void (*dispatch) (int sig)) { GSource *source = NULL; int priority = G_PRIORITY_HIGH - 1; if (sig == SIGTERM) { /* TERM is higher priority than other signals, * signals are higher priority than other ipc. * Yes, minus: smaller is "higher" */ priority--; } if (sig >= NSIG || sig < 0) { crm_err("Signal %d is out of range", sig); return FALSE; } else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) { crm_trace("Signal handler for %d is already installed", sig); return TRUE; } else if (crm_signals[sig] != NULL) { crm_err("Different signal handler for %d is already installed", sig); return FALSE; } CRM_ASSERT(sizeof(crm_signal_t) > sizeof(GSource)); source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t)); crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL); CRM_ASSERT(crm_signals[sig] != NULL); crm_signals[sig]->handler = dispatch; crm_signals[sig]->signal = sig; if (crm_signal_handler(sig, mainloop_signal_handler) == SIG_ERR) { mainloop_destroy_signal_entry(sig); return FALSE; } #if 0 /* If we want signals to interrupt mainloop's poll(), instead of waiting for * the timeout, then we should call siginterrupt() below * * For now, just enforce a low timeout */ if (siginterrupt(sig, 1) < 0) { crm_perror(LOG_INFO, "Could not enable system call interruptions for signal %d", sig); } #endif return TRUE; } gboolean mainloop_destroy_signal(int sig) { if (sig >= NSIG || sig < 0) { crm_err("Signal %d is out of range", sig); return FALSE; } else if (crm_signal_handler(sig, NULL) == SIG_ERR) { crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig); return FALSE; } else if (crm_signals[sig] == NULL) { return TRUE; } mainloop_destroy_signal_entry(sig); return TRUE; } static qb_array_t *gio_map = NULL; void mainloop_cleanup(void) { if (gio_map) { qb_array_free(gio_map); } for (int sig = 0; sig < NSIG; ++sig) { mainloop_destroy_signal_entry(sig); } } /* * libqb... */ struct gio_to_qb_poll { int32_t is_used; guint source; int32_t events; void *data; qb_ipcs_dispatch_fn_t fn; enum qb_loop_priority p; }; static gboolean gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data) { struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; gint fd = g_io_channel_unix_get_fd(gio); crm_trace("%p.%d %d", data, fd, condition); /* if this assert get's hit, then there is a race condition between * when we destroy a fd and when mainloop actually gives it up */ CRM_ASSERT(adaptor->is_used > 0); return (adaptor->fn(fd, condition, adaptor->data) == 0); } static void gio_poll_destroy(gpointer data) { struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; adaptor->is_used--; CRM_ASSERT(adaptor->is_used >= 0); if (adaptor->is_used == 0) { crm_trace("Marking adaptor %p unused", adaptor); adaptor->source = 0; } } +/*! + * \internal + * \brief Convert libqb's poll priority into GLib's one + * + * \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback) + * + * \return best matching GLib's priority + */ +static gint +conv_prio_libqb2glib(enum qb_loop_priority prio) +{ + gint ret = G_PRIORITY_DEFAULT; + switch (prio) { + case QB_LOOP_LOW: + ret = G_PRIORITY_LOW; + break; + case QB_LOOP_HIGH: + ret = G_PRIORITY_HIGH; + break; + default: + crm_trace("Invalid libqb's loop priority %d, assuming QB_LOOP_MED", + prio); + /* fall-through */ + case QB_LOOP_MED: + break; + } + return ret; +} + +/*! + * \internal + * \brief Convert libqb's poll priority to rate limiting spec + * + * \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback) + * + * \return best matching rate limiting spec + */ +static enum qb_ipcs_rate_limit +conv_libqb_prio2ratelimit(enum qb_loop_priority prio) +{ + /* this is an inversion of what libqb's qb_ipcs_request_rate_limit does */ + enum qb_ipcs_rate_limit ret = QB_IPCS_RATE_NORMAL; + switch (prio) { + case QB_LOOP_LOW: + ret = QB_IPCS_RATE_SLOW; + break; + case QB_LOOP_HIGH: + ret = QB_IPCS_RATE_FAST; + break; + default: + crm_trace("Invalid libqb's loop priority %d, assuming QB_LOOP_MED", + prio); + /* fall-through */ + case QB_LOOP_MED: + break; + } + return ret; +} + static int32_t gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn, int32_t add) { struct gio_to_qb_poll *adaptor; GIOChannel *channel; int32_t res = 0; res = qb_array_index(gio_map, fd, (void **)&adaptor); if (res < 0) { crm_err("Array lookup failed for fd=%d: %d", fd, res); return res; } crm_trace("Adding fd=%d to mainloop as adaptor %p", fd, adaptor); if (add && adaptor->source) { crm_err("Adaptor for descriptor %d is still in-use", fd); return -EEXIST; } if (!add && !adaptor->is_used) { crm_err("Adaptor for descriptor %d is not in-use", fd); return -ENOENT; } /* channel is created with ref_count = 1 */ channel = g_io_channel_unix_new(fd); if (!channel) { crm_err("No memory left to add fd=%d", fd); return -ENOMEM; } if (adaptor->source) { g_source_remove(adaptor->source); adaptor->source = 0; } /* Because unlike the poll() API, glib doesn't tell us about HUPs by default */ evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR); adaptor->fn = fn; adaptor->events = evts; adaptor->data = data; adaptor->p = p; adaptor->is_used++; adaptor->source = - g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor, - gio_poll_destroy); + g_io_add_watch_full(channel, conv_prio_libqb2glib(p), evts, + gio_read_socket, adaptor, gio_poll_destroy); /* Now that mainloop now holds a reference to channel, * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new(). * * This means that channel will be free'd by: * g_main_context_dispatch() * -> g_source_destroy_internal() * -> g_source_callback_unref() * shortly after gio_poll_destroy() completes */ g_io_channel_unref(channel); crm_trace("Added to mainloop with gsource id=%d", adaptor->source); if (adaptor->source > 0) { return 0; } return -EINVAL; } static int32_t gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE); } static int32_t gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE); } static int32_t gio_poll_dispatch_del(int32_t fd) { struct gio_to_qb_poll *adaptor; crm_trace("Looking for fd=%d", fd); if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) { if (adaptor->source) { g_source_remove(adaptor->source); adaptor->source = 0; } } return 0; } struct qb_ipcs_poll_handlers gio_poll_funcs = { .job_add = NULL, .dispatch_add = gio_poll_dispatch_add, .dispatch_mod = gio_poll_dispatch_mod, .dispatch_del = gio_poll_dispatch_del, }; static enum qb_ipc_type pick_ipc_type(enum qb_ipc_type requested) { const char *env = getenv("PCMK_ipc_type"); if (env && strcmp("shared-mem", env) == 0) { return QB_IPC_SHM; } else if (env && strcmp("socket", env) == 0) { return QB_IPC_SOCKET; } else if (env && strcmp("posix", env) == 0) { return QB_IPC_POSIX_MQ; } else if (env && strcmp("sysv", env) == 0) { return QB_IPC_SYSV_MQ; } else if (requested == QB_IPC_NATIVE) { /* We prefer shared memory because the server never blocks on * send. If part of a message fits into the socket, libqb * needs to block until the remainder can be sent also. * Otherwise the client will wait forever for the remaining * bytes. */ return QB_IPC_SHM; } return requested; } qb_ipcs_service_t * mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, - struct qb_ipcs_service_handlers * callbacks) + struct qb_ipcs_service_handlers *callbacks) +{ + return mainloop_add_ipc_server_with_prio(name, type, callbacks, QB_LOOP_MED); +} + +qb_ipcs_service_t * +mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type, + struct qb_ipcs_service_handlers *callbacks, + enum qb_loop_priority prio) { int rc = 0; qb_ipcs_service_t *server = NULL; if (gio_map == NULL) { gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1); } crm_client_init(); server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks); + if (server == NULL) { + crm_err("Could not create %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc); + return NULL; + } + + if (prio != QB_LOOP_MED) { + qb_ipcs_request_rate_limit(server, conv_libqb_prio2ratelimit(prio)); + } + #ifdef HAVE_IPCS_GET_BUFFER_SIZE /* All clients should use at least ipc_buffer_max as their buffer size */ qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size()); #endif qb_ipcs_poll_handlers_set(server, &gio_poll_funcs); rc = qb_ipcs_run(server); if (rc < 0) { crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc); return NULL; } return server; } void mainloop_del_ipc_server(qb_ipcs_service_t * server) { if (server) { qb_ipcs_destroy(server); } } struct mainloop_io_s { char *name; void *userdata; int fd; guint source; crm_ipc_t *ipc; GIOChannel *channel; int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata); int (*dispatch_fn_io) (gpointer userdata); void (*destroy_fn) (gpointer userdata); }; static gboolean mainloop_gio_callback(GIOChannel * gio, GIOCondition condition, gpointer data) { gboolean keep = TRUE; mainloop_io_t *client = data; CRM_ASSERT(client->fd == g_io_channel_unix_get_fd(gio)); if (condition & G_IO_IN) { if (client->ipc) { long rc = 0; int max = 10; do { rc = crm_ipc_read(client->ipc); if (rc <= 0) { crm_trace("Message acquisition from %s[%p] failed: %s (%ld)", client->name, client, pcmk_strerror(rc), rc); } else if (client->dispatch_fn_ipc) { const char *buffer = crm_ipc_buffer(client->ipc); crm_trace("New message from %s[%p] = %ld (I/O condition=%d)", client->name, client, rc, condition); if (client->dispatch_fn_ipc(buffer, rc, client->userdata) < 0) { crm_trace("Connection to %s no longer required", client->name); keep = FALSE; } } } while (keep && rc > 0 && --max > 0); } else { crm_trace("New message from %s[%p] %u", client->name, client, condition); if (client->dispatch_fn_io) { if (client->dispatch_fn_io(client->userdata) < 0) { crm_trace("Connection to %s no longer required", client->name); keep = FALSE; } } } } if (client->ipc && crm_ipc_connected(client->ipc) == FALSE) { crm_err("Connection to %s closed " CRM_XS "client=%p condition=%d", client->name, client, condition); keep = FALSE; } else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) { crm_trace("The connection %s[%p] has been closed (I/O condition=%d)", client->name, client, condition); keep = FALSE; } else if ((condition & G_IO_IN) == 0) { /* #define GLIB_SYSDEF_POLLIN =1 #define GLIB_SYSDEF_POLLPRI =2 #define GLIB_SYSDEF_POLLOUT =4 #define GLIB_SYSDEF_POLLERR =8 #define GLIB_SYSDEF_POLLHUP =16 #define GLIB_SYSDEF_POLLNVAL =32 typedef enum { G_IO_IN GLIB_SYSDEF_POLLIN, G_IO_OUT GLIB_SYSDEF_POLLOUT, G_IO_PRI GLIB_SYSDEF_POLLPRI, G_IO_ERR GLIB_SYSDEF_POLLERR, G_IO_HUP GLIB_SYSDEF_POLLHUP, G_IO_NVAL GLIB_SYSDEF_POLLNVAL } GIOCondition; A bitwise combination representing a condition to watch for on an event source. G_IO_IN There is data to read. G_IO_OUT Data can be written (without blocking). G_IO_PRI There is urgent data to read. G_IO_ERR Error condition. G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets). G_IO_NVAL Invalid request. The file descriptor is not open. */ crm_err("Strange condition: %d", condition); } /* keep == FALSE results in mainloop_gio_destroy() being called * just before the source is removed from mainloop */ return keep; } static void mainloop_gio_destroy(gpointer c) { mainloop_io_t *client = c; char *c_name = strdup(client->name); /* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c * client->channel will still have ref_count > 0... should be == 1 */ crm_trace("Destroying client %s[%p]", c_name, c); if (client->ipc) { crm_ipc_close(client->ipc); } if (client->destroy_fn) { void (*destroy_fn) (gpointer userdata) = client->destroy_fn; client->destroy_fn = NULL; destroy_fn(client->userdata); } if (client->ipc) { crm_ipc_t *ipc = client->ipc; client->ipc = NULL; crm_ipc_destroy(ipc); } crm_trace("Destroyed client %s[%p]", c_name, c); free(client->name); client->name = NULL; free(client); free(c_name); } mainloop_io_t * mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks) { mainloop_io_t *client = NULL; crm_ipc_t *conn = crm_ipc_new(name, max_size); if (conn && crm_ipc_connect(conn)) { int32_t fd = crm_ipc_get_fd(conn); client = mainloop_add_fd(name, priority, fd, userdata, NULL); } if (client == NULL) { crm_perror(LOG_TRACE, "Connection to %s failed", name); if (conn) { crm_ipc_close(conn); crm_ipc_destroy(conn); } return NULL; } client->ipc = conn; client->destroy_fn = callbacks->destroy; client->dispatch_fn_ipc = callbacks->dispatch; return client; } void mainloop_del_ipc_client(mainloop_io_t * client) { mainloop_del_fd(client); } crm_ipc_t * mainloop_get_ipc_client(mainloop_io_t * client) { if (client) { return client->ipc; } return NULL; } mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks * callbacks) { mainloop_io_t *client = NULL; if (fd >= 0) { client = calloc(1, sizeof(mainloop_io_t)); if (client == NULL) { return NULL; } client->name = strdup(name); client->userdata = userdata; if (callbacks) { client->destroy_fn = callbacks->destroy; client->dispatch_fn_io = callbacks->dispatch; } client->fd = fd; client->channel = g_io_channel_unix_new(fd); client->source = g_io_add_watch_full(client->channel, priority, (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback, client, mainloop_gio_destroy); /* Now that mainloop now holds a reference to channel, * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new(). * * This means that channel will be free'd by: * g_main_context_dispatch() or g_source_remove() * -> g_source_destroy_internal() * -> g_source_callback_unref() * shortly after mainloop_gio_destroy() completes */ g_io_channel_unref(client->channel); crm_trace("Added connection %d for %s[%p].%d", client->source, client->name, client, fd); } else { errno = EINVAL; } return client; } void mainloop_del_fd(mainloop_io_t * client) { if (client != NULL) { crm_trace("Removing client %s[%p]", client->name, client); if (client->source) { /* Results in mainloop_gio_destroy() being called just * before the source is removed from mainloop */ g_source_remove(client->source); } } } static GListPtr child_list = NULL; pid_t mainloop_child_pid(mainloop_child_t * child) { return child->pid; } const char * mainloop_child_name(mainloop_child_t * child) { return child->desc; } int mainloop_child_timeout(mainloop_child_t * child) { return child->timeout; } void * mainloop_child_userdata(mainloop_child_t * child) { return child->privatedata; } void mainloop_clear_child_userdata(mainloop_child_t * child) { child->privatedata = NULL; } /* good function name */ static void child_free(mainloop_child_t *child) { if (child->timerid != 0) { crm_trace("Removing timer %d", child->timerid); g_source_remove(child->timerid); child->timerid = 0; } free(child->desc); free(child); } /* terrible function name */ static int child_kill_helper(mainloop_child_t *child) { int rc; if (child->flags & mainloop_leave_pid_group) { crm_debug("Kill pid %d only. leave group intact.", child->pid); rc = kill(child->pid, SIGKILL); } else { crm_debug("Kill pid %d's group", child->pid); rc = kill(-child->pid, SIGKILL); } if (rc < 0) { if (errno != ESRCH) { crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid); } return -errno; } return 0; } static gboolean child_timeout_callback(gpointer p) { mainloop_child_t *child = p; int rc = 0; child->timerid = 0; if (child->timeout) { crm_crit("%s process (PID %d) will not die!", child->desc, (int)child->pid); return FALSE; } rc = child_kill_helper(child); if (rc == ESRCH) { /* Nothing left to do. pid doesn't exist */ return FALSE; } child->timeout = TRUE; crm_warn("%s process (PID %d) timed out", child->desc, (int)child->pid); child->timerid = g_timeout_add(5000, child_timeout_callback, child); return FALSE; } static gboolean child_waitpid(mainloop_child_t *child, int flags) { int rc = 0; int core = 0; int signo = 0; int status = 0; int exitcode = 0; rc = waitpid(child->pid, &status, flags); if(rc == 0) { crm_perror(LOG_DEBUG, "wait(%d) = %d", child->pid, rc); return FALSE; } else if(rc != child->pid) { signo = SIGCHLD; exitcode = 1; status = 1; crm_perror(LOG_ERR, "Call to waitpid(%d) failed", child->pid); } else { crm_trace("Managed process %d exited: %p", child->pid, child); if (WIFEXITED(status)) { exitcode = WEXITSTATUS(status); crm_trace("Managed process %d (%s) exited with rc=%d", child->pid, child->desc, exitcode); } else if (WIFSIGNALED(status)) { signo = WTERMSIG(status); crm_trace("Managed process %d (%s) exited with signal=%d", child->pid, child->desc, signo); } #ifdef WCOREDUMP if (WCOREDUMP(status)) { core = 1; crm_err("Managed process %d (%s) dumped core", child->pid, child->desc); } #endif } if (child->callback) { child->callback(child, child->pid, core, signo, exitcode); } return TRUE; } static void child_death_dispatch(int signal) { GListPtr iter = child_list; gboolean exited; while(iter) { GListPtr saved = NULL; mainloop_child_t *child = iter->data; exited = child_waitpid(child, WNOHANG); saved = iter; iter = iter->next; if (exited == FALSE) { continue; } crm_trace("Removing process entry %p for %d", child, child->pid); child_list = g_list_remove_link(child_list, saved); g_list_free(saved); child_free(child); } } static gboolean child_signal_init(gpointer p) { crm_trace("Installed SIGCHLD handler"); /* Do NOT use g_child_watch_add() and friends, they rely on pthreads */ mainloop_add_signal(SIGCHLD, child_death_dispatch); /* In case they terminated before the signal handler was installed */ child_death_dispatch(SIGCHLD); return FALSE; } int mainloop_child_kill(pid_t pid) { GListPtr iter; mainloop_child_t *child = NULL; mainloop_child_t *match = NULL; /* It is impossible to block SIGKILL, this allows us to * call waitpid without WNOHANG flag.*/ int waitflags = 0, rc = 0; for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) { child = iter->data; if (pid == child->pid) { match = child; } } if (match == NULL) { return FALSE; } rc = child_kill_helper(match); if(rc == -ESRCH) { /* It's gone, but hasn't shown up in waitpid() yet * * Wait until we get SIGCHLD and let child_death_dispatch() * clean it up as normal (so we get the correct return * code/status) * * The blocking alternative would be to call: * child_waitpid(match, 0); */ crm_trace("Waiting for child %d to be reaped by child_death_dispatch()", match->pid); return TRUE; } else if(rc != 0) { /* If KILL for some other reason set the WNOHANG flag since we * can't be certain what happened. */ waitflags = WNOHANG; } if (child_waitpid(match, waitflags) == FALSE) { /* not much we can do if this occurs */ return FALSE; } child_list = g_list_remove(child_list, match); child_free(match); return TRUE; } /* Create/Log a new tracked process * To track a process group, use -pid */ void mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags, void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode)) { static bool need_init = TRUE; mainloop_child_t *child = g_new(mainloop_child_t, 1); child->pid = pid; child->timerid = 0; child->timeout = FALSE; child->privatedata = privatedata; child->callback = callback; child->flags = flags; if(desc) { child->desc = strdup(desc); } if (timeout) { child->timerid = g_timeout_add(timeout, child_timeout_callback, child); } child_list = g_list_append(child_list, child); if(need_init) { need_init = FALSE; /* SIGCHLD processing has to be invoked from mainloop. * We do not want it to be possible to both add a child pid * to mainloop, and have the pid's exit callback invoked within * the same callstack. */ g_timeout_add(1, child_signal_init, NULL); } } void mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata, void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode)) { mainloop_child_add_with_flags(pid, timeout, desc, privatedata, 0, callback); } struct mainloop_timer_s { guint id; guint period_ms; bool repeat; char *name; GSourceFunc cb; void *userdata; }; static gboolean mainloop_timer_cb(gpointer user_data) { int id = 0; bool repeat = FALSE; struct mainloop_timer_s *t = user_data; CRM_ASSERT(t != NULL); id = t->id; t->id = 0; /* Ensure it's unset during callbacks so that * mainloop_timer_running() works as expected */ if(t->cb) { crm_trace("Invoking callbacks for timer %s", t->name); repeat = t->repeat; if(t->cb(t->userdata) == FALSE) { crm_trace("Timer %s complete", t->name); repeat = FALSE; } } if(repeat) { /* Restore if repeating */ t->id = id; } return repeat; } bool mainloop_timer_running(mainloop_timer_t *t) { if(t && t->id != 0) { return TRUE; } return FALSE; } void mainloop_timer_start(mainloop_timer_t *t) { mainloop_timer_stop(t); if(t && t->period_ms > 0) { crm_trace("Starting timer %s", t->name); t->id = g_timeout_add(t->period_ms, mainloop_timer_cb, t); } } void mainloop_timer_stop(mainloop_timer_t *t) { if(t && t->id != 0) { crm_trace("Stopping timer %s", t->name); g_source_remove(t->id); t->id = 0; } } guint mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms) { guint last = 0; if(t) { last = t->period_ms; t->period_ms = period_ms; } if(t && t->id != 0 && last != t->period_ms) { mainloop_timer_start(t); } return last; } mainloop_timer_t * mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata) { mainloop_timer_t *t = calloc(1, sizeof(mainloop_timer_t)); if(t) { if(name) { t->name = crm_strdup_printf("%s-%u-%d", name, period_ms, repeat); } else { t->name = crm_strdup_printf("%p-%u-%d", t, period_ms, repeat); } t->id = 0; t->period_ms = period_ms; t->repeat = repeat; t->cb = cb; t->userdata = userdata; crm_trace("Created timer %s with %p %p", t->name, userdata, t->userdata); } return t; } void mainloop_timer_del(mainloop_timer_t *t) { if(t) { crm_trace("Destroying timer %s", t->name); mainloop_timer_stop(t); free(t->name); free(t); } } /* * Helpers to make sure certain events aren't lost at shutdown */ static gboolean drain_timeout_cb(gpointer user_data) { bool *timeout_popped = (bool*) user_data; *timeout_popped = TRUE; return FALSE; } /*! * \brief Process main loop events while a certain condition is met * * \param[in] mloop Main loop to process * \param[in] timer_ms Don't process longer than this amount of time * \param[in] check Function that returns TRUE if events should be processed * * \note This function is intended to be called at shutdown if certain important * events should not be missed. The caller would likely quit the main loop * or exit after calling this function. The check() function will be * passed the remaining timeout in milliseconds. */ void pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool (*check)(guint)) { bool timeout_popped = FALSE; guint timer = 0; GMainContext *ctx = NULL; CRM_CHECK(mloop && check, return); ctx = g_main_loop_get_context(mloop); if (ctx) { time_t start_time = time(NULL); timer = g_timeout_add(timer_ms, drain_timeout_cb, &timeout_popped); while (!timeout_popped && check(timer_ms - (time(NULL) - start_time) * 1000)) { g_main_context_iteration(ctx, TRUE); } } if (!timeout_popped && (timer > 0)) { g_source_remove(timer); } }