Page MenuHomeClusterLabs Projects

mainloop.c
No OneTemporary

mainloop.c

/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <crm_internal.h>
#ifndef _GNU_SOURCE
# define _GNU_SOURCE
#endif
#include <stdlib.h>
#include <signal.h>
#include <errno.h>
#include <sys/wait.h>
#include <crm/crm.h>
#include <crm/common/xml.h>
#include <crm/common/mainloop.h>
#include <crm/common/ipcs.h>
#include <qb/qbarray.h>
struct mainloop_child_s {
pid_t pid;
char *desc;
unsigned timerid;
unsigned watchid;
gboolean timeout;
void *privatedata;
enum mainloop_child_flags flags;
/* Called when a process dies */
void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode);
};
struct trigger_s {
GSource source;
gboolean running;
gboolean trigger;
void *user_data;
guint id;
};
static gboolean
crm_trigger_prepare(GSource * source, gint * timeout)
{
crm_trigger_t *trig = (crm_trigger_t *) source;
/* cluster-glue's FD and IPC related sources make use of
* g_source_add_poll() but do not set a timeout in their prepare
* functions
*
* This means mainloop's poll() will block until an event for one
* of these sources occurs - any /other/ type of source, such as
* this one or g_idle_*, that doesn't use g_source_add_poll() is
* S-O-L and won't be processed until there is something fd-based
* happens.
*
* Luckily the timeout we can set here affects all sources and
* puts an upper limit on how long poll() can take.
*
* So unconditionally set a small-ish timeout, not too small that
* we're in constant motion, which will act as an upper bound on
* how long the signal handling might be delayed for.
*/
*timeout = 500; /* Timeout in ms */
return trig->trigger;
}
static gboolean
crm_trigger_check(GSource * source)
{
crm_trigger_t *trig = (crm_trigger_t *) source;
return trig->trigger;
}
static gboolean
crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
{
int rc = TRUE;
crm_trigger_t *trig = (crm_trigger_t *) source;
if (trig->running) {
/* Wait until the existing job is complete before starting the next one */
return TRUE;
}
trig->trigger = FALSE;
if (callback) {
rc = callback(trig->user_data);
if (rc < 0) {
crm_trace("Trigger handler %p not yet complete", trig);
trig->running = TRUE;
rc = TRUE;
}
}
return rc;
}
static void
crm_trigger_finalize(GSource * source)
{
crm_trace("Trigger %p destroyed", source);
}
#if 0
struct _GSourceCopy
{
gpointer callback_data;
GSourceCallbackFuncs *callback_funcs;
const GSourceFuncs *source_funcs;
guint ref_count;
GMainContext *context;
gint priority;
guint flags;
guint source_id;
GSList *poll_fds;
GSource *prev;
GSource *next;
char *name;
void *priv;
};
static int
g_source_refcount(GSource * source)
{
/* Duplicating the contents of private header files is a necessary evil */
if (source) {
struct _GSourceCopy *evil = (struct _GSourceCopy*)source;
return evil->ref_count;
}
return 0;
}
#else
static int g_source_refcount(GSource * source)
{
return 0;
}
#endif
static GSourceFuncs crm_trigger_funcs = {
crm_trigger_prepare,
crm_trigger_check,
crm_trigger_dispatch,
crm_trigger_finalize,
};
static crm_trigger_t *
mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data),
gpointer userdata)
{
crm_trigger_t *trigger = NULL;
trigger = (crm_trigger_t *) source;
trigger->id = 0;
trigger->trigger = FALSE;
trigger->user_data = userdata;
if (dispatch) {
g_source_set_callback(source, dispatch, trigger, NULL);
}
g_source_set_priority(source, priority);
g_source_set_can_recurse(source, FALSE);
crm_trace("Setup %p with ref-count=%u", source, g_source_refcount(source));
trigger->id = g_source_attach(source, NULL);
crm_trace("Attached %p with ref-count=%u", source, g_source_refcount(source));
return trigger;
}
void
mainloop_trigger_complete(crm_trigger_t * trig)
{
crm_trace("Trigger handler %p complete", trig);
trig->running = FALSE;
}
/* If dispatch returns:
* -1: Job running but not complete
* 0: Remove the trigger from mainloop
* 1: Leave the trigger in mainloop
*/
crm_trigger_t *
mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data), gpointer userdata)
{
GSource *source = NULL;
CRM_ASSERT(sizeof(crm_trigger_t) > sizeof(GSource));
source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t));
CRM_ASSERT(source != NULL);
return mainloop_setup_trigger(source, priority, dispatch, userdata);
}
void
mainloop_set_trigger(crm_trigger_t * source)
{
if(source) {
source->trigger = TRUE;
}
}
gboolean
mainloop_destroy_trigger(crm_trigger_t * source)
{
GSource *gs = NULL;
if(source == NULL) {
return TRUE;
}
gs = (GSource *)source;
if(g_source_refcount(gs) > 2) {
crm_info("Trigger %p is still referenced %u times", gs, g_source_refcount(gs));
}
g_source_destroy(gs); /* Remove from mainloop, ref_count-- */
g_source_unref(gs); /* The caller no longer carries a reference to source
*
* At this point the source should be free'd,
* unless we're currently processing said
* source, in which case mainloop holds an
* additional reference and it will be free'd
* once our processing completes
*/
return TRUE;
}
typedef struct signal_s {
crm_trigger_t trigger; /* must be first */
void (*handler) (int sig);
int signal;
} crm_signal_t;
static crm_signal_t *crm_signals[NSIG];
static gboolean
crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
{
crm_signal_t *sig = (crm_signal_t *) source;
if(sig->signal != SIGCHLD) {
crm_notice("Caught '%s' signal "CRM_XS" %d (%s handler)",
strsignal(sig->signal), sig->signal,
(sig->handler? "invoking" : "no"));
}
sig->trigger.trigger = FALSE;
if (sig->handler) {
sig->handler(sig->signal);
}
return TRUE;
}
static void
mainloop_signal_handler(int sig)
{
if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]);
}
}
static GSourceFuncs crm_signal_funcs = {
crm_trigger_prepare,
crm_trigger_check,
crm_signal_dispatch,
crm_trigger_finalize,
};
gboolean
crm_signal(int sig, void (*dispatch) (int sig))
{
sigset_t mask;
struct sigaction sa;
struct sigaction old;
if (sigemptyset(&mask) < 0) {
crm_perror(LOG_ERR, "Call to sigemptyset failed");
return FALSE;
}
memset(&sa, 0, sizeof(struct sigaction));
sa.sa_handler = dispatch;
sa.sa_flags = SA_RESTART;
sa.sa_mask = mask;
if (sigaction(sig, &sa, &old) < 0) {
crm_perror(LOG_ERR, "Could not install signal handler for signal %d", sig);
return FALSE;
}
return TRUE;
}
gboolean
mainloop_add_signal(int sig, void (*dispatch) (int sig))
{
GSource *source = NULL;
int priority = G_PRIORITY_HIGH - 1;
if (sig == SIGTERM) {
/* TERM is higher priority than other signals,
* signals are higher priority than other ipc.
* Yes, minus: smaller is "higher"
*/
priority--;
}
if (sig >= NSIG || sig < 0) {
crm_err("Signal %d is out of range", sig);
return FALSE;
} else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
crm_trace("Signal handler for %d is already installed", sig);
return TRUE;
} else if (crm_signals[sig] != NULL) {
crm_err("Different signal handler for %d is already installed", sig);
return FALSE;
}
CRM_ASSERT(sizeof(crm_signal_t) > sizeof(GSource));
source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t));
crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
CRM_ASSERT(crm_signals[sig] != NULL);
crm_signals[sig]->handler = dispatch;
crm_signals[sig]->signal = sig;
if (crm_signal(sig, mainloop_signal_handler) == FALSE) {
crm_signal_t *tmp = crm_signals[sig];
crm_signals[sig] = NULL;
mainloop_destroy_trigger((crm_trigger_t *) tmp);
return FALSE;
}
#if 0
/* If we want signals to interrupt mainloop's poll(), instead of waiting for
* the timeout, then we should call siginterrupt() below
*
* For now, just enforce a low timeout
*/
if (siginterrupt(sig, 1) < 0) {
crm_perror(LOG_INFO, "Could not enable system call interruptions for signal %d", sig);
}
#endif
return TRUE;
}
gboolean
mainloop_destroy_signal(int sig)
{
crm_signal_t *tmp = NULL;
if (sig >= NSIG || sig < 0) {
crm_err("Signal %d is out of range", sig);
return FALSE;
} else if (crm_signal(sig, NULL) == FALSE) {
crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig);
return FALSE;
} else if (crm_signals[sig] == NULL) {
return TRUE;
}
crm_trace("Destroying signal %d", sig);
tmp = crm_signals[sig];
crm_signals[sig] = NULL;
mainloop_destroy_trigger((crm_trigger_t *) tmp);
return TRUE;
}
static qb_array_t *gio_map = NULL;
void
mainloop_cleanup(void)
{
if(gio_map) {
qb_array_free(gio_map);
}
}
/*
* libqb...
*/
struct gio_to_qb_poll {
int32_t is_used;
guint source;
int32_t events;
void *data;
qb_ipcs_dispatch_fn_t fn;
enum qb_loop_priority p;
};
static gboolean
gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
{
struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
gint fd = g_io_channel_unix_get_fd(gio);
crm_trace("%p.%d %d", data, fd, condition);
/* if this assert get's hit, then there is a race condition between
* when we destroy a fd and when mainloop actually gives it up */
CRM_ASSERT(adaptor->is_used > 0);
return (adaptor->fn(fd, condition, adaptor->data) == 0);
}
static void
gio_poll_destroy(gpointer data)
{
struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
adaptor->is_used--;
CRM_ASSERT(adaptor->is_used >= 0);
if (adaptor->is_used == 0) {
crm_trace("Marking adaptor %p unused", adaptor);
adaptor->source = 0;
}
}
/*!
* \internal
* \brief Convert libqb's poll priority into GLib's one
*
* \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback)
*
* \return best matching GLib's priority
*/
static gint
conv_prio_libqb2glib(enum qb_loop_priority prio)
{
gint ret = G_PRIORITY_DEFAULT;
switch (prio) {
case QB_LOOP_LOW:
ret = G_PRIORITY_LOW;
break;
case QB_LOOP_HIGH:
ret = G_PRIORITY_HIGH;
break;
default:
crm_trace("Invalid libqb's loop priority %d, assuming QB_LOOP_MED",
prio);
/* fall-through */
case QB_LOOP_MED:
break;
}
return ret;
}
/*!
* \internal
* \brief Convert libqb's poll priority to rate limiting spec
*
* \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback)
*
* \return best matching rate limiting spec
*/
static enum qb_ipcs_rate_limit
conv_libqb_prio2ratelimit(enum qb_loop_priority prio)
{
/* this is an inversion of what libqb's qb_ipcs_request_rate_limit does */
enum qb_ipcs_rate_limit ret = QB_IPCS_RATE_NORMAL;
switch (prio) {
case QB_LOOP_LOW:
ret = QB_IPCS_RATE_SLOW;
break;
case QB_LOOP_HIGH:
ret = QB_IPCS_RATE_FAST;
break;
default:
crm_trace("Invalid libqb's loop priority %d, assuming QB_LOOP_MED",
prio);
/* fall-through */
case QB_LOOP_MED:
break;
}
return ret;
}
static int32_t
gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
void *data, qb_ipcs_dispatch_fn_t fn, int32_t add)
{
struct gio_to_qb_poll *adaptor;
GIOChannel *channel;
int32_t res = 0;
res = qb_array_index(gio_map, fd, (void **)&adaptor);
if (res < 0) {
crm_err("Array lookup failed for fd=%d: %d", fd, res);
return res;
}
crm_trace("Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
if (add && adaptor->source) {
crm_err("Adaptor for descriptor %d is still in-use", fd);
return -EEXIST;
}
if (!add && !adaptor->is_used) {
crm_err("Adaptor for descriptor %d is not in-use", fd);
return -ENOENT;
}
/* channel is created with ref_count = 1 */
channel = g_io_channel_unix_new(fd);
if (!channel) {
crm_err("No memory left to add fd=%d", fd);
return -ENOMEM;
}
if (adaptor->source) {
g_source_remove(adaptor->source);
adaptor->source = 0;
}
/* Because unlike the poll() API, glib doesn't tell us about HUPs by default */
evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
adaptor->fn = fn;
adaptor->events = evts;
adaptor->data = data;
adaptor->p = p;
adaptor->is_used++;
adaptor->source =
g_io_add_watch_full(channel, conv_prio_libqb2glib(p), evts,
gio_read_socket, adaptor, gio_poll_destroy);
/* Now that mainloop now holds a reference to channel,
* thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
*
* This means that channel will be free'd by:
* g_main_context_dispatch()
* -> g_source_destroy_internal()
* -> g_source_callback_unref()
* shortly after gio_poll_destroy() completes
*/
g_io_channel_unref(channel);
crm_trace("Added to mainloop with gsource id=%d", adaptor->source);
if (adaptor->source > 0) {
return 0;
}
return -EINVAL;
}
static int32_t
gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
void *data, qb_ipcs_dispatch_fn_t fn)
{
return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE);
}
static int32_t
gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
void *data, qb_ipcs_dispatch_fn_t fn)
{
return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE);
}
static int32_t
gio_poll_dispatch_del(int32_t fd)
{
struct gio_to_qb_poll *adaptor;
crm_trace("Looking for fd=%d", fd);
if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
if (adaptor->source) {
g_source_remove(adaptor->source);
adaptor->source = 0;
}
}
return 0;
}
struct qb_ipcs_poll_handlers gio_poll_funcs = {
.job_add = NULL,
.dispatch_add = gio_poll_dispatch_add,
.dispatch_mod = gio_poll_dispatch_mod,
.dispatch_del = gio_poll_dispatch_del,
};
static enum qb_ipc_type
pick_ipc_type(enum qb_ipc_type requested)
{
const char *env = getenv("PCMK_ipc_type");
if (env && strcmp("shared-mem", env) == 0) {
return QB_IPC_SHM;
} else if (env && strcmp("socket", env) == 0) {
return QB_IPC_SOCKET;
} else if (env && strcmp("posix", env) == 0) {
return QB_IPC_POSIX_MQ;
} else if (env && strcmp("sysv", env) == 0) {
return QB_IPC_SYSV_MQ;
} else if (requested == QB_IPC_NATIVE) {
/* We prefer shared memory because the server never blocks on
* send. If part of a message fits into the socket, libqb
* needs to block until the remainder can be sent also.
* Otherwise the client will wait forever for the remaining
* bytes.
*/
return QB_IPC_SHM;
}
return requested;
}
qb_ipcs_service_t *
mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
struct qb_ipcs_service_handlers *callbacks)
{
return mainloop_add_ipc_server_with_prio(name, type, callbacks, QB_LOOP_MED);
}
qb_ipcs_service_t *
mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type,
struct qb_ipcs_service_handlers *callbacks,
enum qb_loop_priority prio)
{
int rc = 0;
qb_ipcs_service_t *server = NULL;
if (gio_map == NULL) {
gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1);
}
crm_client_init();
server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
if (server == NULL) {
crm_err("Could not create %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
return NULL;
}
if (prio != QB_LOOP_MED) {
qb_ipcs_request_rate_limit(server, conv_libqb_prio2ratelimit(prio));
}
#ifdef HAVE_IPCS_GET_BUFFER_SIZE
/* All clients should use at least ipc_buffer_max as their buffer size */
qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size());
#endif
qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
rc = qb_ipcs_run(server);
if (rc < 0) {
crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
return NULL;
}
return server;
}
void
mainloop_del_ipc_server(qb_ipcs_service_t * server)
{
if (server) {
qb_ipcs_destroy(server);
}
}
struct mainloop_io_s {
char *name;
void *userdata;
int fd;
guint source;
crm_ipc_t *ipc;
GIOChannel *channel;
int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata);
int (*dispatch_fn_io) (gpointer userdata);
void (*destroy_fn) (gpointer userdata);
};
static gboolean
mainloop_gio_callback(GIOChannel * gio, GIOCondition condition, gpointer data)
{
gboolean keep = TRUE;
mainloop_io_t *client = data;
CRM_ASSERT(client->fd == g_io_channel_unix_get_fd(gio));
if (condition & G_IO_IN) {
if (client->ipc) {
long rc = 0;
int max = 10;
do {
rc = crm_ipc_read(client->ipc);
if (rc <= 0) {
crm_trace("Message acquisition from %s[%p] failed: %s (%ld)",
client->name, client, pcmk_strerror(rc), rc);
} else if (client->dispatch_fn_ipc) {
const char *buffer = crm_ipc_buffer(client->ipc);
crm_trace("New message from %s[%p] = %ld (I/O condition=%d)", client->name, client, rc, condition);
if (client->dispatch_fn_ipc(buffer, rc, client->userdata) < 0) {
crm_trace("Connection to %s no longer required", client->name);
keep = FALSE;
}
}
} while (keep && rc > 0 && --max > 0);
} else {
crm_trace("New message from %s[%p] %u", client->name, client, condition);
if (client->dispatch_fn_io) {
if (client->dispatch_fn_io(client->userdata) < 0) {
crm_trace("Connection to %s no longer required", client->name);
keep = FALSE;
}
}
}
}
if (client->ipc && crm_ipc_connected(client->ipc) == FALSE) {
crm_err("Connection to %s[%p] closed (I/O condition=%d)", client->name, client, condition);
keep = FALSE;
} else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
crm_trace("The connection %s[%p] has been closed (I/O condition=%d)",
client->name, client, condition);
keep = FALSE;
} else if ((condition & G_IO_IN) == 0) {
/*
#define GLIB_SYSDEF_POLLIN =1
#define GLIB_SYSDEF_POLLPRI =2
#define GLIB_SYSDEF_POLLOUT =4
#define GLIB_SYSDEF_POLLERR =8
#define GLIB_SYSDEF_POLLHUP =16
#define GLIB_SYSDEF_POLLNVAL =32
typedef enum
{
G_IO_IN GLIB_SYSDEF_POLLIN,
G_IO_OUT GLIB_SYSDEF_POLLOUT,
G_IO_PRI GLIB_SYSDEF_POLLPRI,
G_IO_ERR GLIB_SYSDEF_POLLERR,
G_IO_HUP GLIB_SYSDEF_POLLHUP,
G_IO_NVAL GLIB_SYSDEF_POLLNVAL
} GIOCondition;
A bitwise combination representing a condition to watch for on an event source.
G_IO_IN There is data to read.
G_IO_OUT Data can be written (without blocking).
G_IO_PRI There is urgent data to read.
G_IO_ERR Error condition.
G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets).
G_IO_NVAL Invalid request. The file descriptor is not open.
*/
crm_err("Strange condition: %d", condition);
}
/* keep == FALSE results in mainloop_gio_destroy() being called
* just before the source is removed from mainloop
*/
return keep;
}
static void
mainloop_gio_destroy(gpointer c)
{
mainloop_io_t *client = c;
char *c_name = strdup(client->name);
/* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c
* client->channel will still have ref_count > 0... should be == 1
*/
crm_trace("Destroying client %s[%p]", c_name, c);
if (client->ipc) {
crm_ipc_close(client->ipc);
}
if (client->destroy_fn) {
void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
client->destroy_fn = NULL;
destroy_fn(client->userdata);
}
if (client->ipc) {
crm_ipc_t *ipc = client->ipc;
client->ipc = NULL;
crm_ipc_destroy(ipc);
}
crm_trace("Destroyed client %s[%p]", c_name, c);
free(client->name); client->name = NULL;
free(client);
free(c_name);
}
mainloop_io_t *
mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata,
struct ipc_client_callbacks *callbacks)
{
mainloop_io_t *client = NULL;
crm_ipc_t *conn = crm_ipc_new(name, max_size);
if (conn && crm_ipc_connect(conn)) {
int32_t fd = crm_ipc_get_fd(conn);
client = mainloop_add_fd(name, priority, fd, userdata, NULL);
}
if (client == NULL) {
crm_perror(LOG_TRACE, "Connection to %s failed", name);
if (conn) {
crm_ipc_close(conn);
crm_ipc_destroy(conn);
}
return NULL;
}
client->ipc = conn;
client->destroy_fn = callbacks->destroy;
client->dispatch_fn_ipc = callbacks->dispatch;
return client;
}
void
mainloop_del_ipc_client(mainloop_io_t * client)
{
mainloop_del_fd(client);
}
crm_ipc_t *
mainloop_get_ipc_client(mainloop_io_t * client)
{
if (client) {
return client->ipc;
}
return NULL;
}
mainloop_io_t *
mainloop_add_fd(const char *name, int priority, int fd, void *userdata,
struct mainloop_fd_callbacks * callbacks)
{
mainloop_io_t *client = NULL;
if (fd >= 0) {
client = calloc(1, sizeof(mainloop_io_t));
if (client == NULL) {
return NULL;
}
client->name = strdup(name);
client->userdata = userdata;
if (callbacks) {
client->destroy_fn = callbacks->destroy;
client->dispatch_fn_io = callbacks->dispatch;
}
client->fd = fd;
client->channel = g_io_channel_unix_new(fd);
client->source =
g_io_add_watch_full(client->channel, priority,
(G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
client, mainloop_gio_destroy);
/* Now that mainloop now holds a reference to channel,
* thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
*
* This means that channel will be free'd by:
* g_main_context_dispatch() or g_source_remove()
* -> g_source_destroy_internal()
* -> g_source_callback_unref()
* shortly after mainloop_gio_destroy() completes
*/
g_io_channel_unref(client->channel);
crm_trace("Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
} else {
errno = EINVAL;
}
return client;
}
void
mainloop_del_fd(mainloop_io_t * client)
{
if (client != NULL) {
crm_trace("Removing client %s[%p]", client->name, client);
if (client->source) {
/* Results in mainloop_gio_destroy() being called just
* before the source is removed from mainloop
*/
g_source_remove(client->source);
}
}
}
static GListPtr child_list = NULL;
pid_t
mainloop_child_pid(mainloop_child_t * child)
{
return child->pid;
}
const char *
mainloop_child_name(mainloop_child_t * child)
{
return child->desc;
}
int
mainloop_child_timeout(mainloop_child_t * child)
{
return child->timeout;
}
void *
mainloop_child_userdata(mainloop_child_t * child)
{
return child->privatedata;
}
void
mainloop_clear_child_userdata(mainloop_child_t * child)
{
child->privatedata = NULL;
}
/* good function name */
static void
child_free(mainloop_child_t *child)
{
if (child->timerid != 0) {
crm_trace("Removing timer %d", child->timerid);
g_source_remove(child->timerid);
child->timerid = 0;
}
free(child->desc);
free(child);
}
/* terrible function name */
static int
child_kill_helper(mainloop_child_t *child)
{
int rc;
if (child->flags & mainloop_leave_pid_group) {
crm_debug("Kill pid %d only. leave group intact.", child->pid);
rc = kill(child->pid, SIGKILL);
} else {
crm_debug("Kill pid %d's group", child->pid);
rc = kill(-child->pid, SIGKILL);
}
if (rc < 0) {
if (errno != ESRCH) {
crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid);
}
return -errno;
}
return 0;
}
static gboolean
child_timeout_callback(gpointer p)
{
mainloop_child_t *child = p;
int rc = 0;
child->timerid = 0;
if (child->timeout) {
crm_crit("%s process (PID %d) will not die!", child->desc, (int)child->pid);
return FALSE;
}
rc = child_kill_helper(child);
if (rc == -ESRCH) {
/* Nothing left to do. pid doesn't exist */
return FALSE;
}
child->timeout = TRUE;
crm_warn("%s process (PID %d) timed out", child->desc, (int)child->pid);
child->timerid = g_timeout_add(5000, child_timeout_callback, child);
return FALSE;
}
static gboolean
child_waitpid(mainloop_child_t *child, int flags)
{
int rc = 0;
int core = 0;
int signo = 0;
int status = 0;
int exitcode = 0;
rc = waitpid(child->pid, &status, flags);
if(rc == 0) {
crm_perror(LOG_DEBUG, "wait(%d) = %d", child->pid, rc);
return FALSE;
} else if(rc != child->pid) {
signo = SIGCHLD;
exitcode = 1;
status = 1;
crm_perror(LOG_ERR, "Call to waitpid(%d) failed", child->pid);
} else {
crm_trace("Managed process %d exited: %p", child->pid, child);
if (WIFEXITED(status)) {
exitcode = WEXITSTATUS(status);
crm_trace("Managed process %d (%s) exited with rc=%d", child->pid, child->desc, exitcode);
} else if (WIFSIGNALED(status)) {
signo = WTERMSIG(status);
crm_trace("Managed process %d (%s) exited with signal=%d", child->pid, child->desc, signo);
}
#ifdef WCOREDUMP
if (WCOREDUMP(status)) {
core = 1;
crm_err("Managed process %d (%s) dumped core", child->pid, child->desc);
}
#endif
}
if (child->callback) {
child->callback(child, child->pid, core, signo, exitcode);
}
return TRUE;
}
static void
child_death_dispatch(int signal)
{
GListPtr iter = child_list;
gboolean exited;
while(iter) {
GListPtr saved = NULL;
mainloop_child_t *child = iter->data;
exited = child_waitpid(child, WNOHANG);
saved = iter;
iter = iter->next;
if (exited == FALSE) {
continue;
}
crm_trace("Removing process entry %p for %d", child, child->pid);
child_list = g_list_remove_link(child_list, saved);
g_list_free(saved);
child_free(child);
}
}
static gboolean
child_signal_init(gpointer p)
{
crm_trace("Installed SIGCHLD handler");
/* Do NOT use g_child_watch_add() and friends, they rely on pthreads */
mainloop_add_signal(SIGCHLD, child_death_dispatch);
/* In case they terminated before the signal handler was installed */
child_death_dispatch(SIGCHLD);
return FALSE;
}
int
mainloop_child_kill(pid_t pid)
{
GListPtr iter;
mainloop_child_t *child = NULL;
mainloop_child_t *match = NULL;
/* It is impossible to block SIGKILL, this allows us to
* call waitpid without WNOHANG flag.*/
int waitflags = 0, rc = 0;
for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
child = iter->data;
if (pid == child->pid) {
match = child;
}
}
if (match == NULL) {
return FALSE;
}
rc = child_kill_helper(match);
if(rc == -ESRCH) {
/* It's gone, but hasn't shown up in waitpid() yet
*
* Wait until we get SIGCHLD and let child_death_dispatch()
* clean it up as normal (so we get the correct return
* code/status)
*
* The blocking alternative would be to call:
* child_waitpid(match, 0);
*/
crm_trace("Waiting for child %d to be reaped by child_death_dispatch()", match->pid);
return TRUE;
} else if(rc != 0) {
/* If KILL for some other reason set the WNOHANG flag since we
* can't be certain what happened.
*/
waitflags = WNOHANG;
}
if (child_waitpid(match, waitflags) == FALSE) {
/* not much we can do if this occurs */
return FALSE;
}
child_list = g_list_remove(child_list, match);
child_free(match);
return TRUE;
}
/* Create/Log a new tracked process
* To track a process group, use -pid
*/
void
mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags,
void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
{
static bool need_init = TRUE;
mainloop_child_t *child = g_new(mainloop_child_t, 1);
child->pid = pid;
child->timerid = 0;
child->timeout = FALSE;
child->privatedata = privatedata;
child->callback = callback;
child->flags = flags;
if(desc) {
child->desc = strdup(desc);
}
if (timeout) {
child->timerid = g_timeout_add(timeout, child_timeout_callback, child);
}
child_list = g_list_append(child_list, child);
if(need_init) {
need_init = FALSE;
/* SIGCHLD processing has to be invoked from mainloop.
* We do not want it to be possible to both add a child pid
* to mainloop, and have the pid's exit callback invoked within
* the same callstack. */
g_timeout_add(1, child_signal_init, NULL);
}
}
void
mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata,
void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
{
mainloop_child_add_with_flags(pid, timeout, desc, privatedata, 0, callback);
}
struct mainloop_timer_s {
guint id;
guint period_ms;
bool repeat;
char *name;
GSourceFunc cb;
void *userdata;
};
struct mainloop_timer_s mainloop;
static gboolean mainloop_timer_cb(gpointer user_data)
{
int id = 0;
bool repeat = FALSE;
struct mainloop_timer_s *t = user_data;
CRM_ASSERT(t != NULL);
id = t->id;
t->id = 0; /* Ensure it's unset during callbacks so that
* mainloop_timer_running() works as expected
*/
if(t->cb) {
crm_trace("Invoking callbacks for timer %s", t->name);
repeat = t->repeat;
if(t->cb(t->userdata) == FALSE) {
crm_trace("Timer %s complete", t->name);
repeat = FALSE;
}
}
if(repeat) {
/* Restore if repeating */
t->id = id;
}
return repeat;
}
bool mainloop_timer_running(mainloop_timer_t *t)
{
if(t && t->id != 0) {
return TRUE;
}
return FALSE;
}
void mainloop_timer_start(mainloop_timer_t *t)
{
mainloop_timer_stop(t);
if(t && t->period_ms > 0) {
crm_trace("Starting timer %s", t->name);
t->id = g_timeout_add(t->period_ms, mainloop_timer_cb, t);
}
}
void mainloop_timer_stop(mainloop_timer_t *t)
{
if(t && t->id != 0) {
crm_trace("Stopping timer %s", t->name);
g_source_remove(t->id);
t->id = 0;
}
}
guint mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
{
guint last = 0;
if(t) {
last = t->period_ms;
t->period_ms = period_ms;
}
if(t && t->id != 0 && last != t->period_ms) {
mainloop_timer_start(t);
}
return last;
}
mainloop_timer_t *
mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
{
mainloop_timer_t *t = calloc(1, sizeof(mainloop_timer_t));
if(t) {
if(name) {
t->name = crm_strdup_printf("%s-%u-%d", name, period_ms, repeat);
} else {
t->name = crm_strdup_printf("%p-%u-%d", t, period_ms, repeat);
}
t->id = 0;
t->period_ms = period_ms;
t->repeat = repeat;
t->cb = cb;
t->userdata = userdata;
crm_trace("Created timer %s with %p %p", t->name, userdata, t->userdata);
}
return t;
}
void
mainloop_timer_del(mainloop_timer_t *t)
{
if(t) {
crm_trace("Destroying timer %s", t->name);
mainloop_timer_stop(t);
free(t->name);
free(t);
}
}
/*
* Helpers to make sure certain events aren't lost at shutdown
*/
static gboolean
drain_timeout_cb(gpointer user_data)
{
bool *timeout_popped = (bool*) user_data;
*timeout_popped = TRUE;
return FALSE;
}
/*!
* \brief Process main loop events while a certain condition is met
*
* \param[in] mloop Main loop to process
* \param[in] timer_ms Don't process longer than this amount of time
* \param[in] check Function that returns TRUE if events should be processed
*
* \note This function is intended to be called at shutdown if certain important
* events should not be missed. The caller would likely quit the main loop
* or exit after calling this function. The check() function will be
* passed the remaining timeout in milliseconds.
*/
void
pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool (*check)(guint))
{
bool timeout_popped = FALSE;
guint timer = 0;
GMainContext *ctx = NULL;
CRM_CHECK(mloop && check, return);
ctx = g_main_loop_get_context(mloop);
if (ctx) {
time_t start_time = time(NULL);
timer = g_timeout_add(timer_ms, drain_timeout_cb, &timeout_popped);
while (!timeout_popped
&& check(timer_ms - (time(NULL) - start_time) * 1000)) {
g_main_context_iteration(ctx, TRUE);
}
}
if (!timeout_popped && (timer > 0)) {
g_source_remove(timer);
}
}

File Metadata

Mime Type
text/x-c
Expires
Tue, Oct 29, 7:48 PM (1 d, 19 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
942547
Default Alt Text
mainloop.c (35 KB)

Event Timeline