Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/include/crm/common/internal.h b/include/crm/common/internal.h
index 28b20b4380..13c29ea18d 100644
--- a/include/crm/common/internal.h
+++ b/include/crm/common/internal.h
@@ -1,296 +1,304 @@
/*
* Copyright 2015-2020 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#ifndef CRM_COMMON_INTERNAL__H
#define CRM_COMMON_INTERNAL__H
#include <unistd.h> // getpid()
#include <stdbool.h> // bool
#include <string.h> // strcmp()
#include <sys/types.h> // uid_t, gid_t, pid_t
#include <glib.h> // guint, GList, GHashTable
#include <libxml/tree.h> // xmlNode
#include <crm/common/util.h> // crm_strdup_printf()
+#include <crm/common/mainloop.h> // mainloop_io_t, struct ipc_client_callbacks
// Internal ACL-related utilities (from acl.c)
char *pcmk__uid2username(uid_t uid);
const char *pcmk__update_acl_user(xmlNode *request, const char *field,
const char *peer_user);
#if ENABLE_ACL
# include <string.h>
static inline bool
pcmk__is_privileged(const char *user)
{
return user && (!strcmp(user, CRM_DAEMON_USER) || !strcmp(user, "root"));
}
#endif
#if SUPPORT_CIBSECRETS
// Internal CIB utilities (from cib_secrets.c) */
int pcmk__substitute_secrets(const char *rsc_id, GHashTable *params);
#endif
/* internal digest-related utilities (from digest.c) */
bool pcmk__verify_digest(xmlNode *input, const char *expected);
/* internal I/O utilities (from io.c) */
int pcmk__real_path(const char *path, char **resolved_path);
char *pcmk__series_filename(const char *directory, const char *series,
int sequence, bool bzip);
int pcmk__read_series_sequence(const char *directory, const char *series,
unsigned int *seq);
void pcmk__write_series_sequence(const char *directory, const char *series,
unsigned int sequence, int max);
int pcmk__chown_series_sequence(const char *directory, const char *series,
uid_t uid, gid_t gid);
int pcmk__build_path(const char *path_c, mode_t mode);
bool pcmk__daemon_can_write(const char *dir, const char *file);
void pcmk__sync_directory(const char *name);
int pcmk__file_contents(const char *filename, char **contents);
int pcmk__write_sync(int fd, const char *contents);
int pcmk__set_nonblocking(int fd);
const char *pcmk__get_tmpdir(void);
void pcmk__close_fds_in_child(bool);
/*!
* \internal
* \brief Open /dev/null to consume next available file descriptor
*
* Open /dev/null, disregarding the result. This is intended when daemonizing to
* be able to null stdin, stdout, and stderr.
*
* \param[in] flags O_RDONLY (stdin) or O_WRONLY (stdout and stderr)
*/
static inline void
pcmk__open_devnull(int flags)
{
// Static analysis clutter
// cppcheck-suppress leakReturnValNotUsed
(void) open("/dev/null", flags);
}
/* internal logging utilities */
# define pcmk__config_err(fmt...) do { \
crm_config_error = TRUE; \
crm_err(fmt); \
} while (0)
# define pcmk__config_warn(fmt...) do { \
crm_config_warning = TRUE; \
crm_warn(fmt); \
} while (0)
/*!
* \internal
* \brief Execute code depending on whether message would be logged
*
* This is similar to do_crm_log_unlikely() except instead of logging, it either
* continues past this statement or executes else_action depending on whether a
* message of the given severity would be logged or not. This allows whole
* blocks of code to be skipped if tracing or debugging is turned off.
*
* \param[in] level Severity at which to continue past this statement
* \param[in] else_action Code block to execute if severity would not be logged
*
* \note else_action must not contain a break or continue statement
*/
# define pcmk__log_else(level, else_action) do { \
static struct qb_log_callsite *trace_cs = NULL; \
\
if (trace_cs == NULL) { \
trace_cs = qb_log_callsite_get(__func__, __FILE__, "log_else", \
level, __LINE__, 0); \
} \
if (!crm_is_callsite_active(trace_cs, level, 0)) { \
else_action; \
} \
} while(0)
+/* internal main loop utilities (from mainloop.c) */
+
+int pcmk__add_mainloop_ipc(crm_ipc_t *ipc, int priority, void *userdata,
+ struct ipc_client_callbacks *callbacks,
+ mainloop_io_t **source);
+
+
/* internal procfs utilities (from procfs.c) */
pid_t pcmk__procfs_pid_of(const char *name);
unsigned int pcmk__procfs_num_cores(void);
/* internal XML schema functions (from xml.c) */
void crm_schema_init(void);
void crm_schema_cleanup(void);
/* internal functions related to process IDs (from pid.c) */
/*!
* \internal
* \brief Check whether process exists (by PID and optionally executable path)
*
* \param[in] pid PID of process to check
* \param[in] daemon If not NULL, path component to match with procfs entry
*
* \return Standard Pacemaker return code
* \note Particular return codes of interest include pcmk_rc_ok for alive,
* ESRCH for process is not alive (verified by kill and/or executable path
* match), EACCES for caller unable or not allowed to check. A result of
* "alive" is less reliable when \p daemon is not provided or procfs is
* not available, since there is no guarantee that the PID has not been
* recycled for another process.
* \note This function cannot be used to verify \e authenticity of the process.
*/
int pcmk__pid_active(pid_t pid, const char *daemon);
int pcmk__read_pidfile(const char *filename, pid_t *pid);
int pcmk__pidfile_matches(const char *filename, pid_t expected_pid,
const char *expected_name, pid_t *pid);
int pcmk__lock_pidfile(const char *filename, const char *name);
/* interal functions related to resource operations (from operations.c) */
// printf-style format to create operation ID from resource, action, interval
#define PCMK__OP_FMT "%s_%s_%u"
char *pcmk__op_key(const char *rsc_id, const char *op_type, guint interval_ms);
char *pcmk__notify_key(const char *rsc_id, const char *notify_type,
const char *op_type);
char *pcmk__transition_key(int transition_id, int action_id, int target_rc,
const char *node);
void pcmk__filter_op_for_digest(xmlNode *param_set);
// miscellaneous utilities (from utils.c)
const char *pcmk_message_name(const char *name);
extern int pcmk__score_red;
extern int pcmk__score_green;
extern int pcmk__score_yellow;
/* internal generic string functions (from strings.c) */
int pcmk__guint_from_hash(GHashTable *table, const char *key, guint default_val,
guint *result);
bool pcmk__starts_with(const char *str, const char *prefix);
bool pcmk__ends_with(const char *s, const char *match);
bool pcmk__ends_with_ext(const char *s, const char *match);
char *pcmk__add_word(char *list, const char *word);
int pcmk__compress(const char *data, unsigned int length, unsigned int max,
char **result, unsigned int *result_len);
/* Correctly displaying singular or plural is complicated; consider "1 node has"
* vs. "2 nodes have". A flexible solution is to pluralize entire strings, e.g.
*
* if (a == 1) {
* crm_info("singular message"):
* } else {
* crm_info("plural message");
* }
*
* though even that's not sufficient for all languages besides English (if we
* ever desire to do translations of output and log messages). But the following
* convenience macros are "good enough" and more concise for many cases.
*/
/* Example:
* crm_info("Found %d %s", nentries,
* pcmk__plural_alt(nentries, "entry", "entries"));
*/
#define pcmk__plural_alt(i, s1, s2) (((i) == 1)? (s1) : (s2))
// Example: crm_info("Found %d node%s", nnodes, pcmk__plural_s(nnodes));
#define pcmk__plural_s(i) pcmk__plural_alt(i, "", "s")
static inline int
pcmk__str_empty(const char *s)
{
return (s == NULL) || (s[0] == '\0');
}
static inline char *
pcmk__getpid_s(void)
{
return crm_strdup_printf("%lu", (unsigned long) getpid());
}
// More efficient than g_list_length(list) == 1
static inline bool
pcmk__list_of_1(GList *list)
{
return list && (list->next == NULL);
}
// More efficient than g_list_length(list) > 1
static inline bool
pcmk__list_of_multiple(GList *list)
{
return list && (list->next != NULL);
}
/* convenience functions for failure-related node attributes */
#define PCMK__FAIL_COUNT_PREFIX "fail-count"
#define PCMK__LAST_FAILURE_PREFIX "last-failure"
/*!
* \internal
* \brief Generate a failure-related node attribute name for a resource
*
* \param[in] prefix Start of attribute name
* \param[in] rsc_id Resource name
* \param[in] op Operation name
* \param[in] interval_ms Operation interval
*
* \return Newly allocated string with attribute name
*
* \note Failure attributes are named like PREFIX-RSC#OP_INTERVAL (for example,
* "fail-count-myrsc#monitor_30000"). The '#' is used because it is not
* a valid character in a resource ID, to reliably distinguish where the
* operation name begins. The '_' is used simply to be more comparable to
* action labels like "myrsc_monitor_30000".
*/
static inline char *
pcmk__fail_attr_name(const char *prefix, const char *rsc_id, const char *op,
guint interval_ms)
{
CRM_CHECK(prefix && rsc_id && op, return NULL);
return crm_strdup_printf("%s-%s#%s_%u", prefix, rsc_id, op, interval_ms);
}
static inline char *
pcmk__failcount_name(const char *rsc_id, const char *op, guint interval_ms)
{
return pcmk__fail_attr_name(PCMK__FAIL_COUNT_PREFIX, rsc_id, op,
interval_ms);
}
static inline char *
pcmk__lastfailure_name(const char *rsc_id, const char *op, guint interval_ms)
{
return pcmk__fail_attr_name(PCMK__LAST_FAILURE_PREFIX, rsc_id, op,
interval_ms);
}
#endif /* CRM_COMMON_INTERNAL__H */
diff --git a/include/crm/common/ipc.h b/include/crm/common/ipc.h
index a0df956524..8dee1b1f2f 100644
--- a/include/crm/common/ipc.h
+++ b/include/crm/common/ipc.h
@@ -1,137 +1,228 @@
/*
* Copyright 2004-2020 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#ifndef CRM_COMMON_IPC__H
# define CRM_COMMON_IPC__H
#ifdef __cplusplus
extern "C" {
#endif
/**
* \file
- * \brief Wrappers for and extensions to libqb IPC
+ * \brief IPC interface to Pacemaker daemons
+ *
* \ingroup core
*/
#include <sys/uio.h>
#include <qb/qbipcc.h>
#include <crm/common/xml.h>
/*
* Message creation utilities
*
* These are used for both IPC messages and cluster layer messages. However,
* since this is public API, they stay in this header for backward
* compatibility.
*/
#define create_reply(request, xml_response_data) \
create_reply_adv(request, xml_response_data, __FUNCTION__)
xmlNode *create_reply_adv(xmlNode *request, xmlNode *xml_response_data,
const char *origin);
#define create_request(task, xml_data, host_to, sys_to, sys_from, uuid_from) \
create_request_adv(task, xml_data, host_to, sys_to, sys_from, uuid_from, \
__FUNCTION__)
xmlNode *create_request_adv(const char *task, xmlNode *xml_data,
const char *host_to, const char *sys_to,
const char *sys_from, const char *uuid_from,
const char *origin);
+/*
+ * The library supports two methods of creating IPC connections. The older code
+ * allows connecting to any arbitrary IPC name. The newer code only allows
+ * connecting to one of the Pacemaker daemons.
+ *
+ * As daemons are converted to use the new model, the old functions should be
+ * considered deprecated for use with those daemons. Once all daemons are
+ * converted, the old functions should be officially deprecated as public API
+ * and eventually made internal API.
+ */
+
+/*
+ * Pacemaker daemon IPC
+ */
+
+//! Available IPC interfaces
+enum pcmk_ipc_server {
+ pcmk_ipc_attrd, //!< Attribute manager
+ pcmk_ipc_based, //!< CIB manager
+ pcmk_ipc_controld, //!< Controller
+ pcmk_ipc_execd, //!< Executor
+ pcmk_ipc_fenced, //!< Fencer
+ pcmk_ipc_pacemakerd, //!< Launcher
+ pcmk_ipc_schedulerd, //!< Scheduler
+};
+
+//! Possible event types that an IPC event callback can be called for
+enum pcmk_ipc_event {
+ pcmk_ipc_event_connect, //!< Result of asynchronous connection attempt
+ pcmk_ipc_event_disconnect, //!< Termination of IPC connection
+ pcmk_ipc_event_reply, //!< Daemon's reply to client IPC request
+ pcmk_ipc_event_notify, //!< Notification from daemon
+};
+
+//! How IPC replies should be dispatched
+enum pcmk_ipc_dispatch {
+ pcmk_ipc_dispatch_main, //!< Attach IPC to GMainLoop for dispatch
+ pcmk_ipc_dispatch_poll, //!< Caller will poll and dispatch IPC
+ pcmk_ipc_dispatch_sync, //!< Sending a command will wait for any reply
+};
+
+//! Client connection to Pacemaker IPC
+typedef struct pcmk_ipc_api_s pcmk_ipc_api_t;
+
+/*!
+ * \brief Callback function type for Pacemaker daemon IPC APIs
+ *
+ * \param[in] api IPC API connection
+ * \param[in] event_type The type of event that occurred
+ * \param[in] status Event status
+ * \param[in] event_data Event-specific data
+ * \param[in] user_data Caller data provided when callback was registered
+ *
+ * \note For connection and disconnection events, event_data may be NULL (for
+ * local IPC) or the name of the connected node (for remote IPC, for
+ * daemons that support that). For reply and notify events, event_data is
+ * defined by the specific daemon API.
+ */
+typedef void (*pcmk_ipc_callback_t)(pcmk_ipc_api_t *api,
+ enum pcmk_ipc_event event_type,
+ crm_exit_t status,
+ void *event_data, void *user_data);
+
+int pcmk_new_ipc_api(pcmk_ipc_api_t **api, enum pcmk_ipc_server server);
+
+void pcmk_free_ipc_api(pcmk_ipc_api_t *api);
+
+int pcmk_connect_ipc(pcmk_ipc_api_t *api, enum pcmk_ipc_dispatch dispatch_type);
+
+void pcmk_disconnect_ipc(pcmk_ipc_api_t *api);
+
+int pcmk_poll_ipc(pcmk_ipc_api_t *api, int timeout_ms);
+
+void pcmk_dispatch_ipc(pcmk_ipc_api_t *api);
+
+void pcmk_register_ipc_callback(pcmk_ipc_api_t *api, pcmk_ipc_callback_t cb,
+ void *user_data);
+
+const char *pcmk_ipc_name(pcmk_ipc_api_t *api, bool for_log);
+
+bool pcmk_ipc_is_connected(pcmk_ipc_api_t *api);
+
+int pcmk_ipc_purge_node(pcmk_ipc_api_t *api, const char *node_name,
+ uint32_t nodeid);
+
+
+/*
+ * Generic IPC API (to eventually be deprecated as public API and made internal)
+ */
+
/* *INDENT-OFF* */
enum crm_ipc_flags
{
crm_ipc_flags_none = 0x00000000,
crm_ipc_compressed = 0x00000001, /* Message has been compressed */
crm_ipc_proxied = 0x00000100, /* _ALL_ replies to proxied connections need to be sent as events */
crm_ipc_client_response = 0x00000200, /* A Response is expected in reply */
- // These are options only for pcmk__ipc_send_iov()
+ // These are options for Pacemaker's internal use only (pcmk__ipc_send_*())
crm_ipc_server_event = 0x00010000, /* Send an Event instead of a Response */
crm_ipc_server_free = 0x00020000, /* Free the iovec after sending */
crm_ipc_proxied_relay_response = 0x00040000, /* all replies to proxied connections are sent as events, this flag preserves whether the event should be treated as an actual event, or a response.*/
crm_ipc_server_info = 0x00100000, /* Log failures as LOG_INFO */
crm_ipc_server_error = 0x00200000, /* Log failures as LOG_ERR */
};
/* *INDENT-ON* */
typedef struct crm_ipc_s crm_ipc_t;
crm_ipc_t *crm_ipc_new(const char *name, size_t max_size);
bool crm_ipc_connect(crm_ipc_t * client);
void crm_ipc_close(crm_ipc_t * client);
void crm_ipc_destroy(crm_ipc_t * client);
void pcmk_free_ipc_event(struct iovec *event);
int crm_ipc_send(crm_ipc_t * client, xmlNode * message, enum crm_ipc_flags flags,
int32_t ms_timeout, xmlNode ** reply);
int crm_ipc_get_fd(crm_ipc_t * client);
bool crm_ipc_connected(crm_ipc_t * client);
int crm_ipc_ready(crm_ipc_t * client);
long crm_ipc_read(crm_ipc_t * client);
const char *crm_ipc_buffer(crm_ipc_t * client);
uint32_t crm_ipc_buffer_flags(crm_ipc_t * client);
const char *crm_ipc_name(crm_ipc_t * client);
unsigned int crm_ipc_default_buffer_size(void);
/*!
* \brief Check the authenticity of the IPC socket peer process
*
* If everything goes well, peer's authenticity is verified by the means
* of comparing against provided referential UID and GID (either satisfies),
* and the result of this check can be deduced from the return value.
* As an exception, detected UID of 0 ("root") satisfies arbitrary
* provided referential daemon's credentials.
*
* \param[in] sock IPC related, connected Unix socket to check peer of
* \param[in] refuid referential UID to check against
* \param[in] refgid referential GID to check against
* \param[out] gotpid to optionally store obtained PID of the peer
* (not available on FreeBSD, special value of 1
* used instead, and the caller is required to
* special case this value respectively)
* \param[out] gotuid to optionally store obtained UID of the peer
* \param[out] gotgid to optionally store obtained GID of the peer
*
* \return 0 if IPC related socket's peer is not authentic given the
* referential credentials (see above), 1 if it is,
* negative value on error (generally expressing -errno unless
* it was zero even on nonhappy path, -pcmk_err_generic is
* returned then; no message is directly emitted)
*
* \note While this function is tolerant on what constitutes authorized
* IPC daemon process (its effective user matches UID=0 or \p refuid,
* or at least its group matches \p refgid), either or both (in case
* of UID=0) mismatches on the expected credentials of such peer
* process \e shall be investigated at the caller when value of 1
* gets returned there, since higher-than-expected privileges in
* respect to the expected/intended credentials possibly violate
* the least privilege principle and may pose an additional risk
* (i.e. such accidental inconsistency shall be eventually fixed).
*/
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid,
pid_t *gotpid, uid_t *gotuid, gid_t *gotgid);
/* Utils */
xmlNode *create_hello_message(const char *uuid, const char *client_name,
const char *major_version, const char *minor_version);
#ifdef __cplusplus
}
#endif
#endif
diff --git a/lib/common/crmcommon_private.h b/lib/common/crmcommon_private.h
index d06fa20088..f9df27d41c 100644
--- a/lib/common/crmcommon_private.h
+++ b/lib/common/crmcommon_private.h
@@ -1,120 +1,206 @@
/*
* Copyright 2018-2020 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#ifndef CRMCOMMON_PRIVATE__H
# define CRMCOMMON_PRIVATE__H
/* This header is for the sole use of libcrmcommon, so that functions can be
* declared with G_GNUC_INTERNAL for efficiency.
*/
#include <stdint.h> // uint8_t, uint32_t
#include <stdbool.h> // bool
#include <sys/types.h> // size_t
#include <glib.h> // GList
#include <libxml/tree.h> // xmlNode, xmlAttr
#include <qb/qbipcc.h> // struct qb_ipc_response_header
/*
* XML and ACLs
*/
enum xml_private_flags {
xpf_none = 0x0000,
xpf_dirty = 0x0001,
xpf_deleted = 0x0002,
xpf_created = 0x0004,
xpf_modified = 0x0008,
xpf_tracking = 0x0010,
xpf_processed = 0x0020,
xpf_skip = 0x0040,
xpf_moved = 0x0080,
xpf_acl_enabled = 0x0100,
xpf_acl_read = 0x0200,
xpf_acl_write = 0x0400,
xpf_acl_deny = 0x0800,
xpf_acl_create = 0x1000,
xpf_acl_denied = 0x2000,
xpf_lazy = 0x4000,
};
typedef struct xml_private_s {
long check;
uint32_t flags;
char *user;
GList *acls;
GList *deleted_objs;
} xml_private_t;
G_GNUC_INTERNAL
void pcmk__set_xml_flag(xmlNode *xml, enum xml_private_flags flag);
G_GNUC_INTERNAL
bool pcmk__tracking_xml_changes(xmlNode *xml, bool lazy);
G_GNUC_INTERNAL
int pcmk__element_xpath(const char *prefix, xmlNode *xml, char *buffer,
int offset, size_t buffer_size);
G_GNUC_INTERNAL
void pcmk__free_acls(GList *acls);
G_GNUC_INTERNAL
void pcmk__unpack_acl(xmlNode *source, xmlNode *target, const char *user);
G_GNUC_INTERNAL
bool pcmk__check_acl(xmlNode *xml, const char *name,
enum xml_private_flags mode);
G_GNUC_INTERNAL
void pcmk__apply_acl(xmlNode *xml);
G_GNUC_INTERNAL
void pcmk__apply_creation_acl(xmlNode *xml, bool check_top);
G_GNUC_INTERNAL
void pcmk__mark_xml_attr_dirty(xmlAttr *a);
static inline xmlAttr *
pcmk__first_xml_attr(const xmlNode *xml)
{
return xml? xml->properties : NULL;
}
static inline const char *
pcmk__xml_attr_value(const xmlAttr *attr)
{
return ((attr == NULL) || (attr->children == NULL))? NULL
: (const char *) attr->children->content;
}
/*
* IPC
*/
#define PCMK__IPC_VERSION 1
+// IPC behavior that varies by daemon
+typedef struct pcmk__ipc_methods_s {
+ /*!
+ * \internal
+ * \brief Allocate any private data needed by daemon IPC
+ *
+ * \param[in] api IPC API connection
+ *
+ * \return Standard Pacemaker return code
+ */
+ int (*new_data)(pcmk_ipc_api_t *api);
+
+ /*!
+ * \internal
+ * \brief Free any private data used by daemon IPC
+ *
+ * \param[in] api_data Data allocated by new_data() method
+ */
+ void (*free_data)(void *api_data);
+
+ /*!
+ * \internal
+ * \brief Perform daemon-specific handling after successful connection
+ *
+ * Some daemons require clients to register before sending any other
+ * commands. The controller requires a CRM_OP_HELLO (with no reply), and
+ * the CIB manager, executor, and fencer require a CRM_OP_REGISTER (with a
+ * reply). Ideally this would be consistent across all daemons, but for now
+ * this allows each to do its own authorization.
+ *
+ * \param[in] api IPC API connection
+ *
+ * \return Standard Pacemaker return code
+ */
+ int (*post_connect)(pcmk_ipc_api_t *api);
+
+ /*!
+ * \internal
+ * \brief Check whether an IPC request results in a reply
+ *
+ * \parma[in] api IPC API connection
+ * \param[in] request IPC request XML
+ *
+ * \return true if request would result in an IPC reply, false otherwise
+ */
+ bool (*reply_expected)(pcmk_ipc_api_t *api, xmlNode *request);
+
+ /*!
+ * \internal
+ * \brief Perform daemon-specific handling of an IPC message
+ *
+ * \param[in] api IPC API connection
+ * \param[in] msg Message read from IPC connection
+ */
+ void (*dispatch)(pcmk_ipc_api_t *api, xmlNode *msg);
+
+ /*!
+ * \internal
+ * \brief Perform daemon-specific handling of an IPC disconnect
+ *
+ * \param[in] api IPC API connection
+ */
+ void (*post_disconnect)(pcmk_ipc_api_t *api);
+} pcmk__ipc_methods_t;
+
+// Implementation of pcmk_ipc_api_t
+struct pcmk_ipc_api_s {
+ enum pcmk_ipc_server server; // Daemon this IPC API instance is for
+ enum pcmk_ipc_dispatch dispatch_type; // How replies should be dispatched
+ crm_ipc_t *ipc; // IPC connection
+ mainloop_io_t *mainloop_io; // If using mainloop, I/O source for IPC
+ bool free_on_disconnect; // Whether disconnect should free object
+ pcmk_ipc_callback_t cb; // Caller-registered callback (if any)
+ void *user_data; // Caller-registered data (if any)
+ void *api_data; // For daemon-specific use
+ pcmk__ipc_methods_t *cmds; // Behavior that varies by daemon
+};
+
typedef struct pcmk__ipc_header_s {
struct qb_ipc_response_header qb;
uint32_t size_uncompressed;
uint32_t size_compressed;
uint32_t flags;
uint8_t version;
} pcmk__ipc_header_t;
+G_GNUC_INTERNAL
+int pcmk__send_ipc_request(pcmk_ipc_api_t *api, xmlNode *request);
+
+G_GNUC_INTERNAL
+void pcmk__call_ipc_callback(pcmk_ipc_api_t *api,
+ enum pcmk_ipc_event event_type,
+ crm_exit_t status, void *event_data);
+
G_GNUC_INTERNAL
unsigned int pcmk__ipc_buffer_size(unsigned int max);
G_GNUC_INTERNAL
bool pcmk__valid_ipc_header(const pcmk__ipc_header_t *header);
#endif // CRMCOMMON_PRIVATE__H
diff --git a/lib/common/ipc_client.c b/lib/common/ipc_client.c
index 773758865c..16dc9b5497 100644
--- a/lib/common/ipc_client.c
+++ b/lib/common/ipc_client.c
@@ -1,755 +1,1454 @@
/*
* Copyright 2004-2020 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#if defined(US_AUTH_PEERCRED_UCRED) || defined(US_AUTH_PEERCRED_SOCKPEERCRED)
# ifdef US_AUTH_PEERCRED_UCRED
# ifndef _GNU_SOURCE
# define _GNU_SOURCE
# endif
# endif
# include <sys/socket.h>
#elif defined(US_AUTH_GETPEERUCRED)
# include <ucred.h>
#endif
#include <stdio.h>
#include <sys/types.h>
#include <errno.h>
#include <bzlib.h>
#include <crm/crm.h> /* indirectly: pcmk_err_generic */
#include <crm/msg_xml.h>
#include <crm/common/ipc.h>
#include <crm/common/ipc_internal.h>
#include "crmcommon_private.h"
+/*!
+ * \brief Create a new object for using Pacemaker daemon IPC
+ *
+ * \param[out] api Where to store new IPC object
+ * \param[in] server Which Pacemaker daemon the object is for
+ *
+ * \return Standard Pacemaker result code
+ *
+ * \note The caller is responsible for freeing *api using pcmk_free_ipc_api().
+ * \note This is intended to supersede crm_ipc_new() but is not yet usable.
+ */
+int
+pcmk_new_ipc_api(pcmk_ipc_api_t **api, enum pcmk_ipc_server server)
+{
+ size_t max_size = 0;
+
+ if (api == NULL) {
+ return EINVAL;
+ }
+
+ *api = calloc(1, sizeof(pcmk_ipc_api_t));
+ if (*api == NULL) {
+ return errno;
+ }
+
+ (*api)->server = server;
+ if (pcmk_ipc_name(*api, false) == NULL) {
+ pcmk_free_ipc_api(*api);
+ *api = NULL;
+ return EOPNOTSUPP;
+ }
+
+ // Set server methods and max_size (if not default)
+ switch (server) {
+ case pcmk_ipc_attrd:
+ break;
+
+ case pcmk_ipc_based:
+ max_size = 512 * 1024; // 512KB
+ break;
+
+ case pcmk_ipc_controld:
+ break;
+
+ case pcmk_ipc_execd:
+ break;
+
+ case pcmk_ipc_fenced:
+ break;
+
+ case pcmk_ipc_pacemakerd:
+ break;
+
+ case pcmk_ipc_schedulerd:
+ // @TODO max_size could vary by client, maybe take as argument?
+ max_size = 5 * 1024 * 1024; // 5MB
+ break;
+ }
+ if ((*api)->cmds == NULL) {
+ pcmk_free_ipc_api(*api);
+ *api = NULL;
+ return ENOMEM;
+ }
+
+ (*api)->ipc = crm_ipc_new(pcmk_ipc_name(*api, false), max_size);
+ if ((*api)->ipc == NULL) {
+ pcmk_free_ipc_api(*api);
+ *api = NULL;
+ return ENOMEM;
+ }
+
+ // If daemon API has its own data to track, allocate it
+ if ((*api)->cmds->new_data != NULL) {
+ if ((*api)->cmds->new_data(*api) != pcmk_rc_ok) {
+ pcmk_free_ipc_api(*api);
+ *api = NULL;
+ return ENOMEM;
+ }
+ }
+ crm_trace("Created %s API IPC object", pcmk_ipc_name(*api, true));
+ return pcmk_rc_ok;
+}
+
+static void
+free_daemon_specific_data(pcmk_ipc_api_t *api)
+{
+ if ((api != NULL) && (api->cmds != NULL)) {
+ if ((api->cmds->free_data != NULL) && (api->api_data != NULL)) {
+ api->cmds->free_data(api->api_data);
+ api->api_data = NULL;
+ }
+ free(api->cmds);
+ api->cmds = NULL;
+ }
+}
+
+/*!
+ * \internal
+ * \brief Call an IPC API event callback, if one is registed
+ *
+ * \param[in] api IPC API connection
+ * \param[in] event_type The type of event that occurred
+ * \param[in] status Event status
+ * \param[in] event_data Event-specific data
+ */
+void
+pcmk__call_ipc_callback(pcmk_ipc_api_t *api, enum pcmk_ipc_event event_type,
+ crm_exit_t status, void *event_data)
+{
+ if ((api != NULL) && (api->cb != NULL)) {
+ api->cb(api, event_type, status, event_data, api->user_data);
+ }
+}
+
+/*!
+ * \internal
+ * \brief Clean up after an IPC disconnect
+ *
+ * \param[in] user_data IPC API connection that disconnected
+ *
+ * \note This function can be used as a main loop IPC destroy callback.
+ */
+static void
+ipc_post_disconnect(gpointer user_data)
+{
+ pcmk_ipc_api_t *api = user_data;
+
+ crm_info("Disconnected from %s IPC API", pcmk_ipc_name(api, true));
+
+ // Perform any daemon-specific handling needed
+ if ((api->cmds != NULL) && (api->cmds->post_disconnect != NULL)) {
+ api->cmds->post_disconnect(api);
+ }
+
+ // Call client's registered event callback
+ pcmk__call_ipc_callback(api, pcmk_ipc_event_disconnect, CRM_EX_DISCONNECT,
+ NULL);
+
+ /* If this is being called from a running main loop, mainloop_gio_destroy()
+ * will free ipc and mainloop_io immediately after calling this function.
+ * If this is called from a stopped main loop, these will leak, so the best
+ * practice is to close the connection before stopping the main loop.
+ */
+ api->ipc = NULL;
+ api->mainloop_io = NULL;
+
+ if (api->free_on_disconnect) {
+ /* pcmk_free_ipc_api() has already been called, but did not free api
+ * or api->cmds because this function needed them. Do that now.
+ */
+ free_daemon_specific_data(api);
+ crm_trace("Freeing IPC API object after disconnect");
+ free(api);
+ }
+}
+
+/*!
+ * \brief Free the contents of an IPC API object
+ *
+ * \param[in] api IPC API object to free
+ */
+void
+pcmk_free_ipc_api(pcmk_ipc_api_t *api)
+{
+ bool free_on_disconnect = false;
+
+ if (api == NULL) {
+ return;
+ }
+ crm_debug("Releasing %s IPC API", pcmk_ipc_name(api, true));
+
+ if (api->ipc != NULL) {
+ if (api->mainloop_io != NULL) {
+ /* We need to keep the api pointer itself around, because it is the
+ * user data for the IPC client destroy callback. That will be
+ * triggered by the pcmk_disconnect_ipc() call below, but it might
+ * happen later in the main loop (if still running).
+ *
+ * This flag tells the destroy callback to free the object. It can't
+ * do that unconditionally, because the application might call this
+ * function after a disconnect that happened by other means.
+ */
+ free_on_disconnect = api->free_on_disconnect = true;
+ }
+ pcmk_disconnect_ipc(api); // Frees api if free_on_disconnect is true
+ }
+ if (!free_on_disconnect) {
+ free_daemon_specific_data(api);
+ crm_trace("Freeing IPC API object");
+ free(api);
+ }
+}
+
+/*!
+ * \brief Get the IPC name used with an IPC API connection
+ *
+ * \param[in] api IPC API connection
+ * \param[in] for_log If true, return human-friendly name instead of IPC name
+ *
+ * \return IPC API's human-friendly or connection name, or if none is available,
+ * "Pacemaker" if for_log is true and NULL if for_log is false
+ */
+const char *
+pcmk_ipc_name(pcmk_ipc_api_t *api, bool for_log)
+{
+ if (api == NULL) {
+ return for_log? "Pacemaker" : NULL;
+ }
+ switch (api->server) {
+ case pcmk_ipc_attrd:
+ return for_log? "attribute manager" : NULL /* T_ATTRD */;
+
+ case pcmk_ipc_based:
+ return for_log? "CIB manager" : NULL /* PCMK__SERVER_BASED_RW */;
+
+ case pcmk_ipc_controld:
+ return for_log? "controller" : NULL /* CRM_SYSTEM_CRMD */;
+
+ case pcmk_ipc_execd:
+ return for_log? "executor" : NULL /* CRM_SYSTEM_LRMD */;
+
+ case pcmk_ipc_fenced:
+ return for_log? "fencer" : NULL /* "stonith-ng" */;
+
+ case pcmk_ipc_pacemakerd:
+ return for_log? "launcher" : NULL /* CRM_SYSTEM_MCP */;
+
+ case pcmk_ipc_schedulerd:
+ return for_log? "scheduler" : NULL /* CRM_SYSTEM_PENGINE */;
+
+ default:
+ return for_log? "Pacemaker" : NULL;
+ }
+}
+
+/*!
+ * \brief Check whether an IPC API connection is active
+ *
+ * \param[in] api IPC API connection
+ *
+ * \return true if IPC is connected, false otherwise
+ */
+bool
+pcmk_ipc_is_connected(pcmk_ipc_api_t *api)
+{
+ return (api != NULL) && crm_ipc_connected(api->ipc);
+}
+
+/*!
+ * \internal
+ * \brief Call the daemon-specific API's dispatch function
+ *
+ * Perform daemon-specific handling of IPC reply dispatch. It is the daemon
+ * method's responsibility to call the client's registered event callback, as
+ * well as allocate and free any event data.
+ *
+ * \param[in] api IPC API connection
+ */
+static void
+call_api_dispatch(pcmk_ipc_api_t *api, xmlNode *message)
+{
+ crm_log_xml_trace(message, "ipc-received");
+ if ((api->cmds != NULL) && (api->cmds->dispatch != NULL)) {
+ api->cmds->dispatch(api, message);
+ }
+}
+
+/*!
+ * \internal
+ * \brief Dispatch data read from IPC source
+ *
+ * \param[in] buffer Data read from IPC
+ * \param[in] length Number of bytes of data in buffer (ignored)
+ * \param[in] user_data IPC object
+ *
+ * \return Always 0 (meaning connection is still required)
+ *
+ * \note This function can be used as a main loop IPC dispatch callback.
+ */
+static int
+dispatch_ipc_data(const char *buffer, ssize_t length, gpointer user_data)
+{
+ pcmk_ipc_api_t *api = user_data;
+ xmlNode *msg;
+
+ CRM_CHECK(api != NULL, return 0);
+
+ if (buffer == NULL) {
+ crm_warn("Empty message received from %s IPC",
+ pcmk_ipc_name(api, true));
+ return 0;
+ }
+
+ msg = string2xml(buffer);
+ if (msg == NULL) {
+ crm_warn("Malformed message received from %s IPC",
+ pcmk_ipc_name(api, true));
+ return 0;
+ }
+ call_api_dispatch(api, msg);
+ free_xml(msg);
+ return 0;
+}
+
+/*!
+ * \brief Check whether an IPC connection has data available (without main loop)
+ *
+ * \param[in] api IPC API connection
+ * \param[in] timeout_ms If less than 0, poll indefinitely; if 0, poll once
+ * and return immediately; otherwise, poll for up to
+ * this many milliseconds
+ *
+ * \return Standard Pacemaker return code
+ *
+ * \note Callers of pcmk_connect_ipc() using pcmk_ipc_dispatch_poll should call
+ * this function to check whether IPC data is available. Return values of
+ * interest include pcmk_rc_ok meaning data is available, and EAGAIN
+ * meaning no data is available; all other values indicate errors.
+ * \todo This does not allow the caller to poll multiple file descriptors at
+ * once. If there is demand for that, we could add a wrapper for
+ * crm_ipc_get_fd(api->ipc), so the caller can call poll() themselves.
+ */
+int
+pcmk_poll_ipc(pcmk_ipc_api_t *api, int timeout_ms)
+{
+ int rc;
+ struct pollfd pollfd = { 0, };
+
+ if ((api == NULL) || (api->dispatch_type != pcmk_ipc_dispatch_poll)) {
+ return EINVAL;
+ }
+ pollfd.fd = crm_ipc_get_fd(api->ipc);
+ pollfd.events = POLLIN;
+ rc = poll(&pollfd, 1, timeout_ms);
+ if (rc < 0) {
+ return errno;
+ } else if (rc == 0) {
+ return EAGAIN;
+ }
+ return pcmk_rc_ok;
+}
+
+/*!
+ * \brief Dispatch available messages on an IPC connection (without main loop)
+ *
+ * \param[in] api IPC API connection
+ *
+ * \return Standard Pacemaker return code
+ *
+ * \note Callers of pcmk_connect_ipc() using pcmk_ipc_dispatch_poll should call
+ * this function when IPC data is available.
+ */
+void
+pcmk_dispatch_ipc(pcmk_ipc_api_t *api)
+{
+ if (api == NULL) {
+ return;
+ }
+ while (crm_ipc_ready(api->ipc)) {
+ if (crm_ipc_read(api->ipc) > 0) {
+ dispatch_ipc_data(crm_ipc_buffer(api->ipc), 0, api);
+ }
+ }
+}
+
+// \return Standard Pacemaker return code
+static int
+connect_with_main_loop(pcmk_ipc_api_t *api)
+{
+ int rc;
+
+ struct ipc_client_callbacks callbacks = {
+ .dispatch = dispatch_ipc_data,
+ .destroy = ipc_post_disconnect,
+ };
+
+ rc = pcmk__add_mainloop_ipc(api->ipc, G_PRIORITY_DEFAULT, api,
+ &callbacks, &(api->mainloop_io));
+ if (rc != pcmk_rc_ok) {
+ return rc;
+ }
+ crm_debug("Connected to %s IPC (attached to main loop)",
+ pcmk_ipc_name(api, true));
+ /* After this point, api->mainloop_io owns api->ipc, so api->ipc
+ * should not be explicitly freed.
+ */
+ return pcmk_rc_ok;
+}
+
+// \return Standard Pacemaker return code
+static int
+connect_without_main_loop(pcmk_ipc_api_t *api)
+{
+ int rc;
+
+ if (!crm_ipc_connect(api->ipc)) {
+ rc = errno;
+ crm_ipc_close(api->ipc);
+ return rc;
+ }
+ crm_debug("Connected to %s IPC (without main loop)",
+ pcmk_ipc_name(api, true));
+ return pcmk_rc_ok;
+}
+
+/*!
+ * \brief Connect to a Pacemaker daemon via IPC
+ *
+ * \param[in] api IPC API instance
+ * \param[out] dispatch_type How IPC replies should be dispatched
+ *
+ * \return Standard Pacemaker return code
+ */
+int
+pcmk_connect_ipc(pcmk_ipc_api_t *api, enum pcmk_ipc_dispatch dispatch_type)
+{
+ int rc = pcmk_rc_ok;
+
+ if ((api == NULL) || (api->ipc == NULL)) {
+ crm_err("Cannot connect to uninitialized API object");
+ return EINVAL;
+ }
+
+ if (crm_ipc_connected(api->ipc)) {
+ crm_trace("Already connected to %s IPC API", pcmk_ipc_name(api, true));
+ return pcmk_rc_ok;
+ }
+
+ api->dispatch_type = dispatch_type;
+ switch (dispatch_type) {
+ case pcmk_ipc_dispatch_main:
+ rc = connect_with_main_loop(api);
+ break;
+
+ case pcmk_ipc_dispatch_sync:
+ case pcmk_ipc_dispatch_poll:
+ rc = connect_without_main_loop(api);
+ break;
+ }
+ if (rc != pcmk_rc_ok) {
+ return rc;
+ }
+
+ if ((api->cmds != NULL) && (api->cmds->post_connect != NULL)) {
+ rc = api->cmds->post_connect(api);
+ if (rc != pcmk_rc_ok) {
+ crm_ipc_close(api->ipc);
+ }
+ }
+ return rc;
+}
+
+/*!
+ * \brief Disconnect an IPC API instance
+ *
+ * \param[in] api IPC API connection
+ *
+ * \return Standard Pacemaker return code
+ *
+ * \note If the connection is attached to a main loop, this function should be
+ * called before quitting the main loop, to ensure that all memory is
+ * freed.
+ */
+void
+pcmk_disconnect_ipc(pcmk_ipc_api_t *api)
+{
+ if ((api == NULL) || (api->ipc == NULL)) {
+ return;
+ }
+ switch (api->dispatch_type) {
+ case pcmk_ipc_dispatch_main:
+ {
+ mainloop_io_t *mainloop_io = api->mainloop_io;
+
+ // Make sure no code with access to api can use these again
+ api->mainloop_io = NULL;
+ api->ipc = NULL;
+
+ mainloop_del_ipc_client(mainloop_io);
+ // After this point api might have already been freed
+ }
+ break;
+
+ case pcmk_ipc_dispatch_poll:
+ case pcmk_ipc_dispatch_sync:
+ {
+ crm_ipc_t *ipc = api->ipc;
+
+ // Make sure no code with access to api can use ipc again
+ api->ipc = NULL;
+
+ // This should always be the case already, but to be safe
+ api->free_on_disconnect = false;
+
+ crm_ipc_destroy(ipc);
+ ipc_post_disconnect(api);
+ }
+ break;
+ }
+}
+
+/*!
+ * \brief Register a callback for IPC API events
+ *
+ * \param[in] api IPC API connection
+ * \param[in] callback Callback to register
+ * \param[in] userdata Caller data to pass to callback
+ *
+ * \note This function may be called multiple times to update the callback
+ * and/or user data. The caller remains responsible for freeing
+ * userdata in any case (after the IPC is disconnected, if the
+ * user data is still registered with the IPC).
+ */
+void
+pcmk_register_ipc_callback(pcmk_ipc_api_t *api, pcmk_ipc_callback_t cb,
+ void *user_data)
+{
+ if (api == NULL) {
+ return;
+ }
+ api->cb = cb;
+ api->user_data = user_data;
+}
+
+/*!
+ * \internal
+ * \brief Send an XML request across an IPC API connection
+ *
+ * \param[in] api IPC API connection
+ * \param[in] request XML request to send
+ *
+ * \return Standard Pacemaker return code
+ *
+ * \note Daemon-specific IPC API functions should call this function to send
+ * requests, because it handles different dispatch types appropriately.
+ */
+int
+pcmk__send_ipc_request(pcmk_ipc_api_t *api, xmlNode *request)
+{
+ int rc;
+ xmlNode *reply = NULL;
+ enum crm_ipc_flags flags = crm_ipc_flags_none;
+
+ if ((api == NULL) || (api->ipc == NULL) || (request == NULL)) {
+ return EINVAL;
+ }
+ crm_log_xml_trace(request, "ipc-sent");
+
+ // Synchronous dispatch requires waiting for a reply
+ if ((api->dispatch_type == pcmk_ipc_dispatch_sync)
+ && (api->cmds != NULL)
+ && (api->cmds->reply_expected != NULL)
+ && (api->cmds->reply_expected(api, request))) {
+ flags = crm_ipc_client_response;
+ }
+
+ // The 0 here means a default timeout of 5 seconds
+ rc = crm_ipc_send(api->ipc, request, flags, 0, &reply);
+
+ if (rc < 0) {
+ return pcmk_legacy2rc(rc);
+ } else if (rc == 0) {
+ return ENODATA;
+ }
+
+ // With synchronous dispatch, we dispatch any reply now
+ if (reply != NULL) {
+ call_api_dispatch(api, reply);
+ free_xml(reply);
+ }
+ return pcmk_rc_ok;
+}
+
+/*!
+ * \internal
+ * \brief Create the XML for an IPC request to purge a node from the peer cache
+ *
+ * \param[in] api IPC API connection
+ * \param[in] node_name If not NULL, name of node to purge
+ * \param[in] nodeid If not 0, node ID of node to purge
+ *
+ * \return Newly allocated IPC request XML
+ *
+ * \note The controller, fencer, and pacemakerd use the same request syntax, but
+ * the attribute manager uses a different one. The CIB manager doesn't
+ * have any syntax for it. The executor and scheduler don't connect to the
+ * cluster layer and thus don't have or need any syntax for it.
+ *
+ * \todo Modify the attribute manager to accept the common syntax (as well
+ * as its current one, for compatibility with older clients). Modify
+ * the CIB manager to accept and honor the common syntax. Modify the
+ * executor and scheduler to accept the syntax (immediately returning
+ * success), just for consistency. Modify this function to use the
+ * common syntax with all daemons if their version supports it.
+ */
+static xmlNode *
+create_purge_node_request(pcmk_ipc_api_t *api, const char *node_name,
+ uint32_t nodeid)
+{
+ xmlNode *request = NULL;
+ const char *client = crm_system_name? crm_system_name : "client";
+
+ switch (api->server) {
+ case pcmk_ipc_attrd:
+ request = create_xml_node(NULL, __FUNCTION__);
+ crm_xml_add(request, F_TYPE, T_ATTRD);
+ crm_xml_add(request, F_ORIG, crm_system_name);
+ crm_xml_add(request, PCMK__XA_TASK, PCMK__ATTRD_CMD_PEER_REMOVE);
+ crm_xml_add(request, PCMK__XA_ATTR_NODE_NAME, node_name);
+ if (nodeid > 0) {
+ crm_xml_add_int(request, PCMK__XA_ATTR_NODE_ID, (int) nodeid);
+ }
+ break;
+
+ case pcmk_ipc_controld:
+ case pcmk_ipc_fenced:
+ case pcmk_ipc_pacemakerd:
+ request = create_request(CRM_OP_RM_NODE_CACHE, NULL, NULL,
+ pcmk_ipc_name(api, false), client, NULL);
+ if (nodeid > 0) {
+ crm_xml_set_id(request, "%lu", (unsigned long) nodeid);
+ }
+ crm_xml_add(request, XML_ATTR_UNAME, node_name);
+ break;
+
+ case pcmk_ipc_based:
+ case pcmk_ipc_execd:
+ case pcmk_ipc_schedulerd:
+ break;
+ }
+ return request;
+}
+
+/*!
+ * \brief Ask a Pacemaker daemon to purge a node from its peer cache
+ *
+ * \param[in] api IPC API connection
+ * \param[in] node_name If not NULL, name of node to purge
+ * \param[in] nodeid If not 0, node ID of node to purge
+ *
+ * \return Standard Pacemaker return code
+ *
+ * \note At least one of node_name or nodeid must be specified.
+ */
+int
+pcmk_ipc_purge_node(pcmk_ipc_api_t *api, const char *node_name, uint32_t nodeid)
+{
+ int rc = 0;
+ xmlNode *request = NULL;
+
+ if (api == NULL) {
+ return EINVAL;
+ }
+ if ((node_name == NULL) && (nodeid == 0)) {
+ return EINVAL;
+ }
+
+ request = create_purge_node_request(api, node_name, nodeid);
+ if (request == NULL) {
+ return EOPNOTSUPP;
+ }
+ rc = pcmk__send_ipc_request(api, request);
+ free_xml(request);
+
+ crm_debug("%s peer cache purge of node %s[%lu]: rc=%d",
+ pcmk_ipc_name(api, true), node_name, (unsigned long) nodeid, rc);
+ return rc;
+}
+
+/*
+ * Generic IPC API (to eventually be deprecated as public API and made internal)
+ */
+
struct crm_ipc_s {
struct pollfd pfd;
unsigned int max_buf_size; // maximum bytes we can send or receive over IPC
unsigned int buf_size; // size of allocated buffer
int msg_size;
int need_reply;
char *buffer;
char *name;
qb_ipcc_connection_t *ipc;
};
+/*!
+ * \brief Create a new (legacy) object for using Pacemaker daemon IPC
+ *
+ * \param[in] name IPC system name to connect to
+ * \param[in] max_size Use a maximum IPC buffer size of at least this size
+ *
+ * \return Newly allocated IPC object on success, NULL otherwise
+ *
+ * \note The caller is responsible for freeing the result using
+ * crm_ipc_destroy().
+ * \note This should be considered deprecated for use with daemons supported by
+ * pcmk_new_ipc_api().
+ */
crm_ipc_t *
crm_ipc_new(const char *name, size_t max_size)
{
crm_ipc_t *client = NULL;
client = calloc(1, sizeof(crm_ipc_t));
+ if (client == NULL) {
+ crm_err("Could not create IPC connection: %s", strerror(errno));
+ return NULL;
+ }
client->name = strdup(name);
+ if (client->name == NULL) {
+ crm_err("Could not create IPC connection: %s", strerror(errno));
+ free(client);
+ return NULL;
+ }
client->buf_size = pcmk__ipc_buffer_size(max_size);
client->buffer = malloc(client->buf_size);
+ if (client->buffer == NULL) {
+ crm_err("Could not create IPC connection: %s", strerror(errno));
+ free(client->name);
+ free(client);
+ return NULL;
+ }
/* Clients initiating connection pick the max buf size */
client->max_buf_size = client->buf_size;
client->pfd.fd = -1;
client->pfd.events = POLLIN;
client->pfd.revents = 0;
return client;
}
/*!
* \brief Establish an IPC connection to a Pacemaker component
*
* \param[in] client Connection instance obtained from crm_ipc_new()
*
* \return TRUE on success, FALSE otherwise (in which case errno will be set;
* specifically, in case of discovering the remote side is not
* authentic, its value is set to ECONNABORTED).
*/
bool
crm_ipc_connect(crm_ipc_t * client)
{
uid_t cl_uid = 0;
gid_t cl_gid = 0;
pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0;
int rv;
client->need_reply = FALSE;
client->ipc = qb_ipcc_connect(client->name, client->buf_size);
if (client->ipc == NULL) {
crm_debug("Could not establish %s connection: %s (%d)", client->name, pcmk_strerror(errno), errno);
return FALSE;
}
client->pfd.fd = crm_ipc_get_fd(client);
if (client->pfd.fd < 0) {
rv = errno;
/* message already omitted */
crm_ipc_close(client);
errno = rv;
return FALSE;
}
rv = pcmk_daemon_user(&cl_uid, &cl_gid);
if (rv < 0) {
/* message already omitted */
crm_ipc_close(client);
errno = -rv;
return FALSE;
}
if (!(rv = crm_ipc_is_authentic_process(client->pfd.fd, cl_uid, cl_gid,
&found_pid, &found_uid,
&found_gid))) {
crm_err("Daemon (IPC %s) is not authentic:"
" process %lld (uid: %lld, gid: %lld)",
client->name, (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
(long long) found_uid, (long long) found_gid);
crm_ipc_close(client);
errno = ECONNABORTED;
return FALSE;
} else if (rv < 0) {
errno = -rv;
crm_perror(LOG_ERR, "Could not verify authenticity of daemon (IPC %s)",
client->name);
crm_ipc_close(client);
errno = -rv;
return FALSE;
}
qb_ipcc_context_set(client->ipc, client);
#ifdef HAVE_IPCS_GET_BUFFER_SIZE
client->max_buf_size = qb_ipcc_get_buffer_size(client->ipc);
if (client->max_buf_size > client->buf_size) {
free(client->buffer);
client->buffer = calloc(1, client->max_buf_size);
client->buf_size = client->max_buf_size;
}
#endif
return TRUE;
}
void
crm_ipc_close(crm_ipc_t * client)
{
if (client) {
- crm_trace("Disconnecting %s IPC connection %p (%p)", client->name, client, client->ipc);
-
if (client->ipc) {
qb_ipcc_connection_t *ipc = client->ipc;
client->ipc = NULL;
qb_ipcc_disconnect(ipc);
}
}
}
void
crm_ipc_destroy(crm_ipc_t * client)
{
if (client) {
if (client->ipc && qb_ipcc_is_connected(client->ipc)) {
crm_notice("Destroying an active IPC connection to %s", client->name);
/* The next line is basically unsafe
*
* If this connection was attached to mainloop and mainloop is active,
* the 'disconnected' callback will end up back here and we'll end
* up free'ing the memory twice - something that can still happen
* even without this if we destroy a connection and it closes before
* we call exit
*/
/* crm_ipc_close(client); */
}
crm_trace("Destroying IPC connection to %s: %p", client->name, client);
free(client->buffer);
free(client->name);
free(client);
}
}
int
crm_ipc_get_fd(crm_ipc_t * client)
{
int fd = 0;
if (client && client->ipc && (qb_ipcc_fd_get(client->ipc, &fd) == 0)) {
return fd;
}
errno = EINVAL;
crm_perror(LOG_ERR, "Could not obtain file IPC descriptor for %s",
(client? client->name : "unspecified client"));
return -errno;
}
bool
crm_ipc_connected(crm_ipc_t * client)
{
bool rc = FALSE;
if (client == NULL) {
crm_trace("No client");
return FALSE;
} else if (client->ipc == NULL) {
crm_trace("No connection");
return FALSE;
} else if (client->pfd.fd < 0) {
crm_trace("Bad descriptor");
return FALSE;
}
rc = qb_ipcc_is_connected(client->ipc);
if (rc == FALSE) {
client->pfd.fd = -EINVAL;
}
return rc;
}
/*!
* \brief Check whether an IPC connection is ready to be read
*
* \param[in] client Connection to check
*
* \return Positive value if ready to be read, 0 if not ready, -errno on error
*/
int
crm_ipc_ready(crm_ipc_t *client)
{
int rc;
CRM_ASSERT(client != NULL);
if (crm_ipc_connected(client) == FALSE) {
return -ENOTCONN;
}
client->pfd.revents = 0;
rc = poll(&(client->pfd), 1, 0);
return (rc < 0)? -errno : rc;
}
// \return Standard Pacemaker return code
static int
crm_ipc_decompress(crm_ipc_t * client)
{
pcmk__ipc_header_t *header = (pcmk__ipc_header_t *)(void*)client->buffer;
if (header->size_compressed) {
int rc = 0;
unsigned int size_u = 1 + header->size_uncompressed;
/* never let buf size fall below our max size required for ipc reads. */
unsigned int new_buf_size = QB_MAX((sizeof(pcmk__ipc_header_t) + size_u), client->max_buf_size);
char *uncompressed = calloc(1, new_buf_size);
crm_trace("Decompressing message data %u bytes into %u bytes",
header->size_compressed, size_u);
rc = BZ2_bzBuffToBuffDecompress(uncompressed + sizeof(pcmk__ipc_header_t), &size_u,
client->buffer + sizeof(pcmk__ipc_header_t), header->size_compressed, 1, 0);
if (rc != BZ_OK) {
crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
bz2_strerror(rc), rc);
free(uncompressed);
return EILSEQ;
}
/*
* This assert no longer holds true. For an identical msg, some clients may
* require compression, and others may not. If that same msg (event) is sent
* to multiple clients, it could result in some clients receiving a compressed
* msg even though compression was not explicitly required for them.
*
* CRM_ASSERT((header->size_uncompressed + sizeof(pcmk__ipc_header_t)) >= ipc_buffer_max);
*/
CRM_ASSERT(size_u == header->size_uncompressed);
memcpy(uncompressed, client->buffer, sizeof(pcmk__ipc_header_t)); /* Preserve the header */
header = (pcmk__ipc_header_t *)(void*)uncompressed;
free(client->buffer);
client->buf_size = new_buf_size;
client->buffer = uncompressed;
}
CRM_ASSERT(client->buffer[sizeof(pcmk__ipc_header_t) + header->size_uncompressed - 1] == 0);
return pcmk_rc_ok;
}
long
crm_ipc_read(crm_ipc_t * client)
{
pcmk__ipc_header_t *header = NULL;
CRM_ASSERT(client != NULL);
CRM_ASSERT(client->ipc != NULL);
CRM_ASSERT(client->buffer != NULL);
client->buffer[0] = 0;
client->msg_size = qb_ipcc_event_recv(client->ipc, client->buffer,
client->buf_size, 0);
if (client->msg_size >= 0) {
int rc = crm_ipc_decompress(client);
if (rc != pcmk_rc_ok) {
return pcmk_rc2legacy(rc);
}
header = (pcmk__ipc_header_t *)(void*)client->buffer;
if (!pcmk__valid_ipc_header(header)) {
return -EBADMSG;
}
crm_trace("Received %s event %d, size=%u, rc=%d, text: %.100s",
client->name, header->qb.id, header->qb.size, client->msg_size,
client->buffer + sizeof(pcmk__ipc_header_t));
} else {
crm_trace("No message from %s received: %s", client->name, pcmk_strerror(client->msg_size));
}
if (crm_ipc_connected(client) == FALSE || client->msg_size == -ENOTCONN) {
crm_err("Connection to %s failed", client->name);
}
if (header) {
/* Data excluding the header */
return header->size_uncompressed;
}
return -ENOMSG;
}
const char *
crm_ipc_buffer(crm_ipc_t * client)
{
CRM_ASSERT(client != NULL);
return client->buffer + sizeof(pcmk__ipc_header_t);
}
uint32_t
crm_ipc_buffer_flags(crm_ipc_t * client)
{
pcmk__ipc_header_t *header = NULL;
CRM_ASSERT(client != NULL);
if (client->buffer == NULL) {
return 0;
}
header = (pcmk__ipc_header_t *)(void*)client->buffer;
return header->flags;
}
const char *
crm_ipc_name(crm_ipc_t * client)
{
CRM_ASSERT(client != NULL);
return client->name;
}
// \return Standard Pacemaker return code
static int
internal_ipc_get_reply(crm_ipc_t *client, int request_id, int ms_timeout,
ssize_t *bytes)
{
time_t timeout = time(NULL) + 1 + (ms_timeout / 1000);
int rc = pcmk_rc_ok;
/* get the reply */
crm_trace("client %s waiting on reply to msg id %d", client->name, request_id);
do {
*bytes = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, 1000);
if (*bytes > 0) {
pcmk__ipc_header_t *hdr = NULL;
rc = crm_ipc_decompress(client);
if (rc != pcmk_rc_ok) {
return rc;
}
hdr = (pcmk__ipc_header_t *)(void*)client->buffer;
if (hdr->qb.id == request_id) {
/* Got it */
break;
} else if (hdr->qb.id < request_id) {
xmlNode *bad = string2xml(crm_ipc_buffer(client));
crm_err("Discarding old reply %d (need %d)", hdr->qb.id, request_id);
crm_log_xml_notice(bad, "OldIpcReply");
} else {
xmlNode *bad = string2xml(crm_ipc_buffer(client));
crm_err("Discarding newer reply %d (need %d)", hdr->qb.id, request_id);
crm_log_xml_notice(bad, "ImpossibleReply");
CRM_ASSERT(hdr->qb.id <= request_id);
}
} else if (crm_ipc_connected(client) == FALSE) {
crm_err("Server disconnected client %s while waiting for msg id %d", client->name,
request_id);
break;
}
} while (time(NULL) < timeout);
if (*bytes < 0) {
rc = (int) -*bytes; // System errno
}
return rc;
}
/*!
* \brief Send an IPC XML message
*
* \param[in] client Connection to IPC server
* \param[in] message XML message to send
* \param[in] flags Bitmask of crm_ipc_flags
* \param[in] ms_timeout Give up if not sent within this much time
* (5 seconds if 0, or no timeout if negative)
* \param[out] reply Reply from server (or NULL if none)
*
* \return Negative errno on error, otherwise size of reply received in bytes
* if reply was needed, otherwise number of bytes sent
*/
int
crm_ipc_send(crm_ipc_t * client, xmlNode * message, enum crm_ipc_flags flags, int32_t ms_timeout,
xmlNode ** reply)
{
int rc = 0;
ssize_t qb_rc = 0;
ssize_t bytes = 0;
struct iovec *iov;
static uint32_t id = 0;
static int factor = 8;
pcmk__ipc_header_t *header;
if (client == NULL) {
crm_notice("Can't send IPC request without connection (bug?): %.100s",
message);
return -ENOTCONN;
} else if (crm_ipc_connected(client) == FALSE) {
/* Don't even bother */
crm_notice("Can't send IPC request to %s: Connection closed",
client->name);
return -ENOTCONN;
}
if (ms_timeout == 0) {
ms_timeout = 5000;
}
if (client->need_reply) {
qb_rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, ms_timeout);
if (qb_rc < 0) {
crm_warn("Sending IPC to %s disabled until pending reply received",
client->name);
return -EALREADY;
} else {
crm_notice("Sending IPC to %s re-enabled after pending reply received",
client->name);
client->need_reply = FALSE;
}
}
id++;
CRM_LOG_ASSERT(id != 0); /* Crude wrap-around detection */
rc = pcmk__ipc_prepare_iov(id, message, client->max_buf_size, &iov, &bytes);
if (rc != pcmk_rc_ok) {
crm_warn("Couldn't prepare IPC request to %s: %s " CRM_XS " rc=%d",
client->name, pcmk_rc_str(rc), rc);
return pcmk_rc2legacy(rc);
}
header = iov[0].iov_base;
header->flags |= flags;
if(is_set(flags, crm_ipc_proxied)) {
/* Don't look for a synchronous response */
clear_bit(flags, crm_ipc_client_response);
}
if(header->size_compressed) {
if(factor < 10 && (client->max_buf_size / 10) < (bytes / factor)) {
crm_notice("Compressed message exceeds %d0%% of configured IPC "
"limit (%u bytes); consider setting PCMK_ipc_buffer to "
"%u or higher",
factor, client->max_buf_size, 2 * client->max_buf_size);
factor++;
}
}
crm_trace("Sending %s IPC request %d of %u bytes using %dms timeout",
client->name, header->qb.id, header->qb.size, ms_timeout);
if (ms_timeout > 0 || is_not_set(flags, crm_ipc_client_response)) {
time_t timeout = time(NULL) + 1 + (ms_timeout / 1000);
do {
/* @TODO Is this check really needed? Won't qb_ipcc_sendv() return
* an error if it's not connected?
*/
if (!crm_ipc_connected(client)) {
goto send_cleanup;
}
qb_rc = qb_ipcc_sendv(client->ipc, iov, 2);
} while ((qb_rc == -EAGAIN) && (time(NULL) < timeout));
rc = (int) qb_rc; // Negative of system errno, or bytes sent
if (qb_rc <= 0) {
goto send_cleanup;
} else if (is_not_set(flags, crm_ipc_client_response)) {
crm_trace("Not waiting for reply to %s IPC request %d",
client->name, header->qb.id);
goto send_cleanup;
}
rc = internal_ipc_get_reply(client, header->qb.id, ms_timeout, &bytes);
if (rc != pcmk_rc_ok) {
/* We didn't get the reply in time, so disable future sends for now.
* The only alternative would be to close the connection since we
* don't know how to detect and discard out-of-sequence replies.
*
* @TODO Implement out-of-sequence detection
*/
client->need_reply = TRUE;
}
rc = (int) bytes; // Negative system errno, or size of reply received
} else {
// No timeout, and client response needed
do {
qb_rc = qb_ipcc_sendv_recv(client->ipc, iov, 2, client->buffer,
client->buf_size, -1);
} while ((qb_rc == -EAGAIN) && crm_ipc_connected(client));
rc = (int) qb_rc; // Negative system errno, or size of reply received
}
if (rc > 0) {
pcmk__ipc_header_t *hdr = (pcmk__ipc_header_t *)(void*)client->buffer;
crm_trace("Received %d-byte reply %d to %s IPC %d: %.100s",
rc, hdr->qb.id, client->name, header->qb.id,
crm_ipc_buffer(client));
if (reply) {
*reply = string2xml(crm_ipc_buffer(client));
}
} else {
crm_trace("No reply to %s IPC %d: rc=%d",
client->name, header->qb.id, rc);
}
send_cleanup:
if (crm_ipc_connected(client) == FALSE) {
crm_notice("Couldn't send %s IPC request %d: Connection closed "
CRM_XS " rc=%d", client->name, header->qb.id, rc);
} else if (rc == -ETIMEDOUT) {
crm_warn("%s IPC request %d failed: %s after %dms " CRM_XS " rc=%d",
client->name, header->qb.id, pcmk_strerror(rc), ms_timeout,
rc);
crm_write_blackbox(0, NULL);
} else if (rc <= 0) {
crm_warn("%s IPC request %d failed: %s " CRM_XS " rc=%d",
client->name, header->qb.id,
((rc == 0)? "No bytes sent" : pcmk_strerror(rc)), rc);
}
pcmk_free_ipc_event(iov);
return rc;
}
int
crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid,
pid_t *gotpid, uid_t *gotuid, gid_t *gotgid) {
int ret = 0;
pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0;
#if defined(US_AUTH_PEERCRED_UCRED)
struct ucred ucred;
socklen_t ucred_len = sizeof(ucred);
if (!getsockopt(sock, SOL_SOCKET, SO_PEERCRED,
&ucred, &ucred_len)
&& ucred_len == sizeof(ucred)) {
found_pid = ucred.pid; found_uid = ucred.uid; found_gid = ucred.gid;
#elif defined(US_AUTH_PEERCRED_SOCKPEERCRED)
struct sockpeercred sockpeercred;
socklen_t sockpeercred_len = sizeof(sockpeercred);
if (!getsockopt(sock, SOL_SOCKET, SO_PEERCRED,
&sockpeercred, &sockpeercred_len)
&& sockpeercred_len == sizeof(sockpeercred_len)) {
found_pid = sockpeercred.pid;
found_uid = sockpeercred.uid; found_gid = sockpeercred.gid;
#elif defined(US_AUTH_GETPEEREID)
if (!getpeereid(sock, &found_uid, &found_gid)) {
found_pid = PCMK__SPECIAL_PID; /* cannot obtain PID (FreeBSD) */
#elif defined(US_AUTH_GETPEERUCRED)
ucred_t *ucred;
if (!getpeerucred(sock, &ucred)) {
errno = 0;
found_pid = ucred_getpid(ucred);
found_uid = ucred_geteuid(ucred); found_gid = ucred_getegid(ucred);
ret = -errno;
ucred_free(ucred);
if (ret) {
return (ret < 0) ? ret : -pcmk_err_generic;
}
#else
# error "No way to authenticate a Unix socket peer"
errno = 0;
if (0) {
#endif
if (gotpid != NULL) {
*gotpid = found_pid;
}
if (gotuid != NULL) {
*gotuid = found_uid;
}
if (gotgid != NULL) {
*gotgid = found_gid;
}
ret = (found_uid == 0 || found_uid == refuid || found_gid == refgid);
} else {
ret = (errno > 0) ? -errno : -pcmk_err_generic;
}
return ret;
}
int
pcmk__ipc_is_authentic_process_active(const char *name, uid_t refuid,
gid_t refgid, pid_t *gotpid)
{
static char last_asked_name[PATH_MAX / 2] = ""; /* log spam prevention */
int fd;
int rc = pcmk_rc_ipc_unresponsive;
int auth_rc = 0;
int32_t qb_rc;
pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0;
qb_ipcc_connection_t *c;
c = qb_ipcc_connect(name, 0);
if (c == NULL) {
crm_info("Could not connect to %s IPC: %s", name, strerror(errno));
rc = pcmk_rc_ipc_unresponsive;
goto bail;
}
qb_rc = qb_ipcc_fd_get(c, &fd);
if (qb_rc != 0) {
rc = (int) -qb_rc; // System errno
crm_err("Could not get fd from %s IPC: %s " CRM_XS " rc=%d",
name, pcmk_rc_str(rc), rc);
goto bail;
}
auth_rc = crm_ipc_is_authentic_process(fd, refuid, refgid, &found_pid,
&found_uid, &found_gid);
if (auth_rc < 0) {
rc = pcmk_legacy2rc(auth_rc);
crm_err("Could not get peer credentials from %s IPC: %s "
CRM_XS " rc=%d", name, pcmk_rc_str(rc), rc);
goto bail;
}
if (gotpid != NULL) {
*gotpid = found_pid;
}
if (auth_rc == 0) {
crm_err("Daemon (IPC %s) effectively blocked with unauthorized"
" process %lld (uid: %lld, gid: %lld)",
name, (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
(long long) found_uid, (long long) found_gid);
rc = pcmk_rc_ipc_unauthorized;
goto bail;
}
rc = pcmk_rc_ok;
if ((found_uid != refuid || found_gid != refgid)
&& strncmp(last_asked_name, name, sizeof(last_asked_name))) {
if ((found_uid == 0) && (refuid != 0)) {
crm_warn("Daemon (IPC %s) runs as root, whereas the expected"
" credentials are %lld:%lld, hazard of violating"
" the least privilege principle",
name, (long long) refuid, (long long) refgid);
} else {
crm_notice("Daemon (IPC %s) runs as %lld:%lld, whereas the"
" expected credentials are %lld:%lld, which may"
" mean a different set of privileges than expected",
name, (long long) found_uid, (long long) found_gid,
(long long) refuid, (long long) refgid);
}
memccpy(last_asked_name, name, '\0', sizeof(last_asked_name));
}
bail:
if (c != NULL) {
qb_ipcc_disconnect(c);
}
return rc;
}
xmlNode *
create_hello_message(const char *uuid,
const char *client_name, const char *major_version, const char *minor_version)
{
xmlNode *hello_node = NULL;
xmlNode *hello = NULL;
if (pcmk__str_empty(uuid) || pcmk__str_empty(client_name)
|| pcmk__str_empty(major_version) || pcmk__str_empty(minor_version)) {
crm_err("Could not create IPC hello message from %s (UUID %s): "
"missing information",
client_name? client_name : "unknown client",
uuid? uuid : "unknown");
return NULL;
}
hello_node = create_xml_node(NULL, XML_TAG_OPTIONS);
if (hello_node == NULL) {
crm_err("Could not create IPC hello message from %s (UUID %s): "
"Message data creation failed", client_name, uuid);
return NULL;
}
crm_xml_add(hello_node, "major_version", major_version);
crm_xml_add(hello_node, "minor_version", minor_version);
crm_xml_add(hello_node, "client_name", client_name);
crm_xml_add(hello_node, "client_uuid", uuid);
hello = create_request(CRM_OP_HELLO, hello_node, NULL, NULL, client_name, uuid);
if (hello == NULL) {
crm_err("Could not create IPC hello message from %s (UUID %s): "
"Request creation failed", client_name, uuid);
return NULL;
}
free_xml(hello_node);
crm_trace("Created hello message from %s (UUID %s)", client_name, uuid);
return hello;
}
diff --git a/lib/common/mainloop.c b/lib/common/mainloop.c
index 634eead71b..e942e578c0 100644
--- a/lib/common/mainloop.c
+++ b/lib/common/mainloop.c
@@ -1,1417 +1,1451 @@
/*
* Copyright 2004-2020 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#ifndef _GNU_SOURCE
# define _GNU_SOURCE
#endif
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include <sys/wait.h>
#include <crm/crm.h>
#include <crm/common/xml.h>
#include <crm/common/mainloop.h>
#include <crm/common/ipc_internal.h>
#include <qb/qbarray.h>
struct mainloop_child_s {
pid_t pid;
char *desc;
unsigned timerid;
gboolean timeout;
void *privatedata;
enum mainloop_child_flags flags;
/* Called when a process dies */
void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode);
};
struct trigger_s {
GSource source;
gboolean running;
gboolean trigger;
void *user_data;
guint id;
};
static gboolean
crm_trigger_prepare(GSource * source, gint * timeout)
{
crm_trigger_t *trig = (crm_trigger_t *) source;
/* cluster-glue's FD and IPC related sources make use of
* g_source_add_poll() but do not set a timeout in their prepare
* functions
*
* This means mainloop's poll() will block until an event for one
* of these sources occurs - any /other/ type of source, such as
* this one or g_idle_*, that doesn't use g_source_add_poll() is
* S-O-L and won't be processed until there is something fd-based
* happens.
*
* Luckily the timeout we can set here affects all sources and
* puts an upper limit on how long poll() can take.
*
* So unconditionally set a small-ish timeout, not too small that
* we're in constant motion, which will act as an upper bound on
* how long the signal handling might be delayed for.
*/
*timeout = 500; /* Timeout in ms */
return trig->trigger;
}
static gboolean
crm_trigger_check(GSource * source)
{
crm_trigger_t *trig = (crm_trigger_t *) source;
return trig->trigger;
}
static gboolean
crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
{
int rc = TRUE;
crm_trigger_t *trig = (crm_trigger_t *) source;
if (trig->running) {
/* Wait until the existing job is complete before starting the next one */
return TRUE;
}
trig->trigger = FALSE;
if (callback) {
rc = callback(trig->user_data);
if (rc < 0) {
crm_trace("Trigger handler %p not yet complete", trig);
trig->running = TRUE;
rc = TRUE;
}
}
return rc;
}
static void
crm_trigger_finalize(GSource * source)
{
crm_trace("Trigger %p destroyed", source);
}
static GSourceFuncs crm_trigger_funcs = {
crm_trigger_prepare,
crm_trigger_check,
crm_trigger_dispatch,
crm_trigger_finalize,
};
static crm_trigger_t *
mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data),
gpointer userdata)
{
crm_trigger_t *trigger = NULL;
trigger = (crm_trigger_t *) source;
trigger->id = 0;
trigger->trigger = FALSE;
trigger->user_data = userdata;
if (dispatch) {
g_source_set_callback(source, dispatch, trigger, NULL);
}
g_source_set_priority(source, priority);
g_source_set_can_recurse(source, FALSE);
trigger->id = g_source_attach(source, NULL);
return trigger;
}
void
mainloop_trigger_complete(crm_trigger_t * trig)
{
crm_trace("Trigger handler %p complete", trig);
trig->running = FALSE;
}
/* If dispatch returns:
* -1: Job running but not complete
* 0: Remove the trigger from mainloop
* 1: Leave the trigger in mainloop
*/
crm_trigger_t *
mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data), gpointer userdata)
{
GSource *source = NULL;
CRM_ASSERT(sizeof(crm_trigger_t) > sizeof(GSource));
source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t));
CRM_ASSERT(source != NULL);
return mainloop_setup_trigger(source, priority, dispatch, userdata);
}
void
mainloop_set_trigger(crm_trigger_t * source)
{
if(source) {
source->trigger = TRUE;
}
}
gboolean
mainloop_destroy_trigger(crm_trigger_t * source)
{
GSource *gs = NULL;
if(source == NULL) {
return TRUE;
}
gs = (GSource *)source;
g_source_destroy(gs); /* Remove from mainloop, ref_count-- */
g_source_unref(gs); /* The caller no longer carries a reference to source
*
* At this point the source should be free'd,
* unless we're currently processing said
* source, in which case mainloop holds an
* additional reference and it will be free'd
* once our processing completes
*/
return TRUE;
}
// Define a custom glib source for signal handling
// Data structure for custom glib source
typedef struct signal_s {
crm_trigger_t trigger; // trigger that invoked source (must be first)
void (*handler) (int sig); // signal handler
int signal; // signal that was received
} crm_signal_t;
// Table to associate signal handlers with signal numbers
static crm_signal_t *crm_signals[NSIG];
/*!
* \internal
* \brief Dispatch an event from custom glib source for signals
*
* Given an signal event, clear the event trigger and call any registered
* signal handler.
*
* \param[in] source glib source that triggered this dispatch
* \param[in] callback (ignored)
* \param[in] userdata (ignored)
*/
static gboolean
crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
{
crm_signal_t *sig = (crm_signal_t *) source;
if(sig->signal != SIGCHLD) {
crm_notice("Caught '%s' signal "CRM_XS" %d (%s handler)",
strsignal(sig->signal), sig->signal,
(sig->handler? "invoking" : "no"));
}
sig->trigger.trigger = FALSE;
if (sig->handler) {
sig->handler(sig->signal);
}
return TRUE;
}
/*!
* \internal
* \brief Handle a signal by setting a trigger for signal source
*
* \param[in] sig Signal number that was received
*
* \note This is the true signal handler for the mainloop signal source, and
* must be async-safe.
*/
static void
mainloop_signal_handler(int sig)
{
if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]);
}
}
// Functions implementing our custom glib source for signal handling
static GSourceFuncs crm_signal_funcs = {
crm_trigger_prepare,
crm_trigger_check,
crm_signal_dispatch,
crm_trigger_finalize,
};
/*!
* \internal
* \brief Set a true signal handler
*
* signal()-like interface to sigaction()
*
* \param[in] sig Signal number to register handler for
* \param[in] dispatch Signal handler
*
* \return The previous value of the signal handler, or SIG_ERR on error
* \note The dispatch function must be async-safe.
*/
sighandler_t
crm_signal_handler(int sig, sighandler_t dispatch)
{
sigset_t mask;
struct sigaction sa;
struct sigaction old;
if (sigemptyset(&mask) < 0) {
crm_err("Could not set handler for signal %d: %s",
sig, pcmk_strerror(errno));
return SIG_ERR;
}
memset(&sa, 0, sizeof(struct sigaction));
sa.sa_handler = dispatch;
sa.sa_flags = SA_RESTART;
sa.sa_mask = mask;
if (sigaction(sig, &sa, &old) < 0) {
crm_err("Could not set handler for signal %d: %s",
sig, pcmk_strerror(errno));
return SIG_ERR;
}
return old.sa_handler;
}
static void
mainloop_destroy_signal_entry(int sig)
{
crm_signal_t *tmp = crm_signals[sig];
crm_signals[sig] = NULL;
crm_trace("Destroying signal %d", sig);
mainloop_destroy_trigger((crm_trigger_t *) tmp);
}
/*!
* \internal
* \brief Add a signal handler to a mainloop
*
* \param[in] sig Signal number to handle
* \param[in] dispatch Signal handler function
*
* \note The true signal handler merely sets a mainloop trigger to call this
* dispatch function via the mainloop. Therefore, the dispatch function
* does not need to be async-safe.
*/
gboolean
mainloop_add_signal(int sig, void (*dispatch) (int sig))
{
GSource *source = NULL;
int priority = G_PRIORITY_HIGH - 1;
if (sig == SIGTERM) {
/* TERM is higher priority than other signals,
* signals are higher priority than other ipc.
* Yes, minus: smaller is "higher"
*/
priority--;
}
if (sig >= NSIG || sig < 0) {
crm_err("Signal %d is out of range", sig);
return FALSE;
} else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
crm_trace("Signal handler for %d is already installed", sig);
return TRUE;
} else if (crm_signals[sig] != NULL) {
crm_err("Different signal handler for %d is already installed", sig);
return FALSE;
}
CRM_ASSERT(sizeof(crm_signal_t) > sizeof(GSource));
source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t));
crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
CRM_ASSERT(crm_signals[sig] != NULL);
crm_signals[sig]->handler = dispatch;
crm_signals[sig]->signal = sig;
if (crm_signal_handler(sig, mainloop_signal_handler) == SIG_ERR) {
mainloop_destroy_signal_entry(sig);
return FALSE;
}
#if 0
/* If we want signals to interrupt mainloop's poll(), instead of waiting for
* the timeout, then we should call siginterrupt() below
*
* For now, just enforce a low timeout
*/
if (siginterrupt(sig, 1) < 0) {
crm_perror(LOG_INFO, "Could not enable system call interruptions for signal %d", sig);
}
#endif
return TRUE;
}
gboolean
mainloop_destroy_signal(int sig)
{
if (sig >= NSIG || sig < 0) {
crm_err("Signal %d is out of range", sig);
return FALSE;
} else if (crm_signal_handler(sig, NULL) == SIG_ERR) {
crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig);
return FALSE;
} else if (crm_signals[sig] == NULL) {
return TRUE;
}
mainloop_destroy_signal_entry(sig);
return TRUE;
}
static qb_array_t *gio_map = NULL;
void
mainloop_cleanup(void)
{
if (gio_map) {
qb_array_free(gio_map);
}
for (int sig = 0; sig < NSIG; ++sig) {
mainloop_destroy_signal_entry(sig);
}
}
/*
* libqb...
*/
struct gio_to_qb_poll {
int32_t is_used;
guint source;
int32_t events;
void *data;
qb_ipcs_dispatch_fn_t fn;
enum qb_loop_priority p;
};
static gboolean
gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
{
struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
gint fd = g_io_channel_unix_get_fd(gio);
crm_trace("%p.%d %d", data, fd, condition);
/* if this assert get's hit, then there is a race condition between
* when we destroy a fd and when mainloop actually gives it up */
CRM_ASSERT(adaptor->is_used > 0);
return (adaptor->fn(fd, condition, adaptor->data) == 0);
}
static void
gio_poll_destroy(gpointer data)
{
struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
adaptor->is_used--;
CRM_ASSERT(adaptor->is_used >= 0);
if (adaptor->is_used == 0) {
crm_trace("Marking adaptor %p unused", adaptor);
adaptor->source = 0;
}
}
/*!
* \internal
* \brief Convert libqb's poll priority into GLib's one
*
* \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback)
*
* \return best matching GLib's priority
*/
static gint
conv_prio_libqb2glib(enum qb_loop_priority prio)
{
gint ret = G_PRIORITY_DEFAULT;
switch (prio) {
case QB_LOOP_LOW:
ret = G_PRIORITY_LOW;
break;
case QB_LOOP_HIGH:
ret = G_PRIORITY_HIGH;
break;
default:
crm_trace("Invalid libqb's loop priority %d, assuming QB_LOOP_MED",
prio);
/* fall-through */
case QB_LOOP_MED:
break;
}
return ret;
}
/*!
* \internal
* \brief Convert libqb's poll priority to rate limiting spec
*
* \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback)
*
* \return best matching rate limiting spec
*/
static enum qb_ipcs_rate_limit
conv_libqb_prio2ratelimit(enum qb_loop_priority prio)
{
/* this is an inversion of what libqb's qb_ipcs_request_rate_limit does */
enum qb_ipcs_rate_limit ret = QB_IPCS_RATE_NORMAL;
switch (prio) {
case QB_LOOP_LOW:
ret = QB_IPCS_RATE_SLOW;
break;
case QB_LOOP_HIGH:
ret = QB_IPCS_RATE_FAST;
break;
default:
crm_trace("Invalid libqb's loop priority %d, assuming QB_LOOP_MED",
prio);
/* fall-through */
case QB_LOOP_MED:
break;
}
return ret;
}
static int32_t
gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
void *data, qb_ipcs_dispatch_fn_t fn, int32_t add)
{
struct gio_to_qb_poll *adaptor;
GIOChannel *channel;
int32_t res = 0;
res = qb_array_index(gio_map, fd, (void **)&adaptor);
if (res < 0) {
crm_err("Array lookup failed for fd=%d: %d", fd, res);
return res;
}
crm_trace("Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
if (add && adaptor->source) {
crm_err("Adaptor for descriptor %d is still in-use", fd);
return -EEXIST;
}
if (!add && !adaptor->is_used) {
crm_err("Adaptor for descriptor %d is not in-use", fd);
return -ENOENT;
}
/* channel is created with ref_count = 1 */
channel = g_io_channel_unix_new(fd);
if (!channel) {
crm_err("No memory left to add fd=%d", fd);
return -ENOMEM;
}
if (adaptor->source) {
g_source_remove(adaptor->source);
adaptor->source = 0;
}
/* Because unlike the poll() API, glib doesn't tell us about HUPs by default */
evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
adaptor->fn = fn;
adaptor->events = evts;
adaptor->data = data;
adaptor->p = p;
adaptor->is_used++;
adaptor->source =
g_io_add_watch_full(channel, conv_prio_libqb2glib(p), evts,
gio_read_socket, adaptor, gio_poll_destroy);
/* Now that mainloop now holds a reference to channel,
* thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
*
* This means that channel will be free'd by:
* g_main_context_dispatch()
* -> g_source_destroy_internal()
* -> g_source_callback_unref()
* shortly after gio_poll_destroy() completes
*/
g_io_channel_unref(channel);
crm_trace("Added to mainloop with gsource id=%d", adaptor->source);
if (adaptor->source > 0) {
return 0;
}
return -EINVAL;
}
static int32_t
gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
void *data, qb_ipcs_dispatch_fn_t fn)
{
return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE);
}
static int32_t
gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
void *data, qb_ipcs_dispatch_fn_t fn)
{
return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE);
}
static int32_t
gio_poll_dispatch_del(int32_t fd)
{
struct gio_to_qb_poll *adaptor;
crm_trace("Looking for fd=%d", fd);
if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
if (adaptor->source) {
g_source_remove(adaptor->source);
adaptor->source = 0;
}
}
return 0;
}
struct qb_ipcs_poll_handlers gio_poll_funcs = {
.job_add = NULL,
.dispatch_add = gio_poll_dispatch_add,
.dispatch_mod = gio_poll_dispatch_mod,
.dispatch_del = gio_poll_dispatch_del,
};
static enum qb_ipc_type
pick_ipc_type(enum qb_ipc_type requested)
{
const char *env = getenv("PCMK_ipc_type");
if (env && strcmp("shared-mem", env) == 0) {
return QB_IPC_SHM;
} else if (env && strcmp("socket", env) == 0) {
return QB_IPC_SOCKET;
} else if (env && strcmp("posix", env) == 0) {
return QB_IPC_POSIX_MQ;
} else if (env && strcmp("sysv", env) == 0) {
return QB_IPC_SYSV_MQ;
} else if (requested == QB_IPC_NATIVE) {
/* We prefer shared memory because the server never blocks on
* send. If part of a message fits into the socket, libqb
* needs to block until the remainder can be sent also.
* Otherwise the client will wait forever for the remaining
* bytes.
*/
return QB_IPC_SHM;
}
return requested;
}
qb_ipcs_service_t *
mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
struct qb_ipcs_service_handlers *callbacks)
{
return mainloop_add_ipc_server_with_prio(name, type, callbacks, QB_LOOP_MED);
}
qb_ipcs_service_t *
mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type,
struct qb_ipcs_service_handlers *callbacks,
enum qb_loop_priority prio)
{
int rc = 0;
qb_ipcs_service_t *server = NULL;
if (gio_map == NULL) {
gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1);
}
server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
if (server == NULL) {
crm_err("Could not create %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
return NULL;
}
if (prio != QB_LOOP_MED) {
qb_ipcs_request_rate_limit(server, conv_libqb_prio2ratelimit(prio));
}
#ifdef HAVE_IPCS_GET_BUFFER_SIZE
/* All clients should use at least ipc_buffer_max as their buffer size */
qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size());
#endif
qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
rc = qb_ipcs_run(server);
if (rc < 0) {
crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
return NULL;
}
return server;
}
void
mainloop_del_ipc_server(qb_ipcs_service_t * server)
{
if (server) {
qb_ipcs_destroy(server);
}
}
struct mainloop_io_s {
char *name;
void *userdata;
int fd;
guint source;
crm_ipc_t *ipc;
GIOChannel *channel;
int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata);
int (*dispatch_fn_io) (gpointer userdata);
void (*destroy_fn) (gpointer userdata);
};
static gboolean
mainloop_gio_callback(GIOChannel * gio, GIOCondition condition, gpointer data)
{
gboolean keep = TRUE;
mainloop_io_t *client = data;
CRM_ASSERT(client->fd == g_io_channel_unix_get_fd(gio));
if (condition & G_IO_IN) {
if (client->ipc) {
long rc = 0;
int max = 10;
do {
rc = crm_ipc_read(client->ipc);
if (rc <= 0) {
crm_trace("Message acquisition from %s[%p] failed: %s (%ld)",
client->name, client, pcmk_strerror(rc), rc);
} else if (client->dispatch_fn_ipc) {
const char *buffer = crm_ipc_buffer(client->ipc);
crm_trace("New message from %s[%p] = %ld (I/O condition=%d)", client->name, client, rc, condition);
if (client->dispatch_fn_ipc(buffer, rc, client->userdata) < 0) {
crm_trace("Connection to %s no longer required", client->name);
keep = FALSE;
}
}
} while (keep && rc > 0 && --max > 0);
} else {
crm_trace("New message from %s[%p] %u", client->name, client, condition);
if (client->dispatch_fn_io) {
if (client->dispatch_fn_io(client->userdata) < 0) {
crm_trace("Connection to %s no longer required", client->name);
keep = FALSE;
}
}
}
}
if (client->ipc && crm_ipc_connected(client->ipc) == FALSE) {
crm_err("Connection to %s closed " CRM_XS "client=%p condition=%d",
client->name, client, condition);
keep = FALSE;
} else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
crm_trace("The connection %s[%p] has been closed (I/O condition=%d)",
client->name, client, condition);
keep = FALSE;
} else if ((condition & G_IO_IN) == 0) {
/*
#define GLIB_SYSDEF_POLLIN =1
#define GLIB_SYSDEF_POLLPRI =2
#define GLIB_SYSDEF_POLLOUT =4
#define GLIB_SYSDEF_POLLERR =8
#define GLIB_SYSDEF_POLLHUP =16
#define GLIB_SYSDEF_POLLNVAL =32
typedef enum
{
G_IO_IN GLIB_SYSDEF_POLLIN,
G_IO_OUT GLIB_SYSDEF_POLLOUT,
G_IO_PRI GLIB_SYSDEF_POLLPRI,
G_IO_ERR GLIB_SYSDEF_POLLERR,
G_IO_HUP GLIB_SYSDEF_POLLHUP,
G_IO_NVAL GLIB_SYSDEF_POLLNVAL
} GIOCondition;
A bitwise combination representing a condition to watch for on an event source.
G_IO_IN There is data to read.
G_IO_OUT Data can be written (without blocking).
G_IO_PRI There is urgent data to read.
G_IO_ERR Error condition.
G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets).
G_IO_NVAL Invalid request. The file descriptor is not open.
*/
crm_err("Strange condition: %d", condition);
}
/* keep == FALSE results in mainloop_gio_destroy() being called
* just before the source is removed from mainloop
*/
return keep;
}
static void
mainloop_gio_destroy(gpointer c)
{
mainloop_io_t *client = c;
char *c_name = strdup(client->name);
/* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c
* client->channel will still have ref_count > 0... should be == 1
*/
crm_trace("Destroying client %s[%p]", c_name, c);
if (client->ipc) {
crm_ipc_close(client->ipc);
}
if (client->destroy_fn) {
void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
client->destroy_fn = NULL;
destroy_fn(client->userdata);
}
if (client->ipc) {
crm_ipc_t *ipc = client->ipc;
client->ipc = NULL;
crm_ipc_destroy(ipc);
}
crm_trace("Destroyed client %s[%p]", c_name, c);
free(client->name); client->name = NULL;
free(client);
free(c_name);
}
-mainloop_io_t *
-mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata,
- struct ipc_client_callbacks *callbacks)
+/*!
+ * \brief Connect to IPC and add it as a main loop source
+ *
+ * \param[in] ipc IPC connection to add
+ * \param[in] priority Event source priority to use for connection
+ * \param[in] userdata Data to register with callbacks
+ * \param[in] callbacks Dispatch and destroy callbacks for connection
+ * \param[out] source Newly allocated event source
+ *
+ * \return Standard Pacemaker return code
+ *
+ * \note On failure, the caller is still responsible for ipc. On success, the
+ * caller should call mainloop_del_ipc_client() when source is no longer
+ * needed, which will lead to the disconnection of the IPC later in the
+ * main loop if it is connected. However the IPC disconnects,
+ * mainloop_gio_destroy() will free ipc and source after calling the
+ * destroy callback.
+ */
+int
+pcmk__add_mainloop_ipc(crm_ipc_t *ipc, int priority, void *userdata,
+ struct ipc_client_callbacks *callbacks,
+ mainloop_io_t **source)
{
- mainloop_io_t *client = NULL;
- crm_ipc_t *conn = crm_ipc_new(name, max_size);
+ CRM_CHECK((ipc != NULL) && (callbacks != NULL), return EINVAL);
- if (conn && crm_ipc_connect(conn)) {
- int32_t fd = crm_ipc_get_fd(conn);
+ if (!crm_ipc_connect(ipc)) {
+ return ENOTCONN;
+ }
+ *source = mainloop_add_fd(crm_ipc_name(ipc), priority, crm_ipc_get_fd(ipc),
+ userdata, NULL);
+ if (*source == NULL) {
+ int rc = errno;
- client = mainloop_add_fd(name, priority, fd, userdata, NULL);
+ crm_ipc_close(ipc);
+ return rc;
}
+ (*source)->ipc = ipc;
+ (*source)->destroy_fn = callbacks->destroy;
+ (*source)->dispatch_fn_ipc = callbacks->dispatch;
+ return pcmk_rc_ok;
+}
- if (client == NULL) {
- crm_perror(LOG_TRACE, "Connection to %s failed", name);
- if (conn) {
- crm_ipc_close(conn);
- crm_ipc_destroy(conn);
+mainloop_io_t *
+mainloop_add_ipc_client(const char *name, int priority, size_t max_size,
+ void *userdata, struct ipc_client_callbacks *callbacks)
+{
+ crm_ipc_t *ipc = crm_ipc_new(name, max_size);
+ mainloop_io_t *source = NULL;
+ int rc = pcmk__add_mainloop_ipc(ipc, priority, userdata, callbacks,
+ &source);
+
+ if (rc != pcmk_rc_ok) {
+ if (crm_log_level == LOG_STDOUT) {
+ fprintf(stderr, "Connection to %s failed: %s",
+ name, pcmk_rc_str(rc));
}
+ crm_ipc_destroy(ipc);
return NULL;
}
-
- client->ipc = conn;
- client->destroy_fn = callbacks->destroy;
- client->dispatch_fn_ipc = callbacks->dispatch;
- return client;
+ return source;
}
void
mainloop_del_ipc_client(mainloop_io_t * client)
{
mainloop_del_fd(client);
}
crm_ipc_t *
mainloop_get_ipc_client(mainloop_io_t * client)
{
if (client) {
return client->ipc;
}
return NULL;
}
mainloop_io_t *
mainloop_add_fd(const char *name, int priority, int fd, void *userdata,
struct mainloop_fd_callbacks * callbacks)
{
mainloop_io_t *client = NULL;
if (fd >= 0) {
client = calloc(1, sizeof(mainloop_io_t));
if (client == NULL) {
return NULL;
}
client->name = strdup(name);
client->userdata = userdata;
if (callbacks) {
client->destroy_fn = callbacks->destroy;
client->dispatch_fn_io = callbacks->dispatch;
}
client->fd = fd;
client->channel = g_io_channel_unix_new(fd);
client->source =
g_io_add_watch_full(client->channel, priority,
(G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
client, mainloop_gio_destroy);
/* Now that mainloop now holds a reference to channel,
* thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
*
* This means that channel will be free'd by:
* g_main_context_dispatch() or g_source_remove()
* -> g_source_destroy_internal()
* -> g_source_callback_unref()
* shortly after mainloop_gio_destroy() completes
*/
g_io_channel_unref(client->channel);
crm_trace("Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
} else {
errno = EINVAL;
}
return client;
}
void
mainloop_del_fd(mainloop_io_t * client)
{
if (client != NULL) {
crm_trace("Removing client %s[%p]", client->name, client);
if (client->source) {
/* Results in mainloop_gio_destroy() being called just
* before the source is removed from mainloop
*/
g_source_remove(client->source);
}
}
}
static GListPtr child_list = NULL;
pid_t
mainloop_child_pid(mainloop_child_t * child)
{
return child->pid;
}
const char *
mainloop_child_name(mainloop_child_t * child)
{
return child->desc;
}
int
mainloop_child_timeout(mainloop_child_t * child)
{
return child->timeout;
}
void *
mainloop_child_userdata(mainloop_child_t * child)
{
return child->privatedata;
}
void
mainloop_clear_child_userdata(mainloop_child_t * child)
{
child->privatedata = NULL;
}
/* good function name */
static void
child_free(mainloop_child_t *child)
{
if (child->timerid != 0) {
crm_trace("Removing timer %d", child->timerid);
g_source_remove(child->timerid);
child->timerid = 0;
}
free(child->desc);
free(child);
}
/* terrible function name */
static int
child_kill_helper(mainloop_child_t *child)
{
int rc;
if (child->flags & mainloop_leave_pid_group) {
crm_debug("Kill pid %d only. leave group intact.", child->pid);
rc = kill(child->pid, SIGKILL);
} else {
crm_debug("Kill pid %d's group", child->pid);
rc = kill(-child->pid, SIGKILL);
}
if (rc < 0) {
if (errno != ESRCH) {
crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid);
}
return -errno;
}
return 0;
}
static gboolean
child_timeout_callback(gpointer p)
{
mainloop_child_t *child = p;
int rc = 0;
child->timerid = 0;
if (child->timeout) {
crm_crit("%s process (PID %d) will not die!", child->desc, (int)child->pid);
return FALSE;
}
rc = child_kill_helper(child);
if (rc == -ESRCH) {
/* Nothing left to do. pid doesn't exist */
return FALSE;
}
child->timeout = TRUE;
crm_warn("%s process (PID %d) timed out", child->desc, (int)child->pid);
child->timerid = g_timeout_add(5000, child_timeout_callback, child);
return FALSE;
}
static bool
child_waitpid(mainloop_child_t *child, int flags)
{
int rc = 0;
int core = 0;
int signo = 0;
int status = 0;
int exitcode = 0;
bool callback_needed = true;
rc = waitpid(child->pid, &status, flags);
if (rc == 0) { // WNOHANG in flags, and child status is not available
crm_trace("Child process %d (%s) still active",
child->pid, child->desc);
callback_needed = false;
} else if (rc != child->pid) {
/* According to POSIX, possible conditions:
* - child->pid was non-positive (process group or any child),
* and rc is specific child
* - errno ECHILD (pid does not exist or is not child)
* - errno EINVAL (invalid flags)
* - errno EINTR (caller interrupted by signal)
*
* @TODO Handle these cases more specifically.
*/
signo = SIGCHLD;
exitcode = 1;
crm_notice("Wait for child process %d (%s) interrupted: %s",
child->pid, child->desc, pcmk_strerror(errno));
} else if (WIFEXITED(status)) {
exitcode = WEXITSTATUS(status);
crm_trace("Child process %d (%s) exited with status %d",
child->pid, child->desc, exitcode);
} else if (WIFSIGNALED(status)) {
signo = WTERMSIG(status);
crm_trace("Child process %d (%s) exited with signal %d (%s)",
child->pid, child->desc, signo, strsignal(signo));
#ifdef WCOREDUMP // AIX, SunOS, maybe others
} else if (WCOREDUMP(status)) {
core = 1;
crm_err("Child process %d (%s) dumped core",
child->pid, child->desc);
#endif
} else { // flags must contain WUNTRACED and/or WCONTINUED to reach this
crm_trace("Child process %d (%s) stopped or continued",
child->pid, child->desc);
callback_needed = false;
}
if (callback_needed && child->callback) {
child->callback(child, child->pid, core, signo, exitcode);
}
return callback_needed;
}
static void
child_death_dispatch(int signal)
{
for (GList *iter = child_list; iter; ) {
GList *saved = iter;
mainloop_child_t *child = iter->data;
iter = iter->next;
if (child_waitpid(child, WNOHANG)) {
crm_trace("Removing completed process %d from child list",
child->pid);
child_list = g_list_remove_link(child_list, saved);
g_list_free(saved);
child_free(child);
}
}
}
static gboolean
child_signal_init(gpointer p)
{
crm_trace("Installed SIGCHLD handler");
/* Do NOT use g_child_watch_add() and friends, they rely on pthreads */
mainloop_add_signal(SIGCHLD, child_death_dispatch);
/* In case they terminated before the signal handler was installed */
child_death_dispatch(SIGCHLD);
return FALSE;
}
gboolean
mainloop_child_kill(pid_t pid)
{
GListPtr iter;
mainloop_child_t *child = NULL;
mainloop_child_t *match = NULL;
/* It is impossible to block SIGKILL, this allows us to
* call waitpid without WNOHANG flag.*/
int waitflags = 0, rc = 0;
for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
child = iter->data;
if (pid == child->pid) {
match = child;
}
}
if (match == NULL) {
return FALSE;
}
rc = child_kill_helper(match);
if(rc == -ESRCH) {
/* It's gone, but hasn't shown up in waitpid() yet. Wait until we get
* SIGCHLD and let handler clean it up as normal (so we get the correct
* return code/status). The blocking alternative would be to call
* child_waitpid(match, 0).
*/
crm_trace("Waiting for signal that child process %d completed",
match->pid);
return TRUE;
} else if(rc != 0) {
/* If KILL for some other reason set the WNOHANG flag since we
* can't be certain what happened.
*/
waitflags = WNOHANG;
}
if (!child_waitpid(match, waitflags)) {
/* not much we can do if this occurs */
return FALSE;
}
child_list = g_list_remove(child_list, match);
child_free(match);
return TRUE;
}
/* Create/Log a new tracked process
* To track a process group, use -pid
*
* @TODO Using a non-positive pid (i.e. any child, or process group) would
* likely not be useful since we will free the child after the first
* completed process.
*/
void
mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags,
void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
{
static bool need_init = TRUE;
mainloop_child_t *child = g_new(mainloop_child_t, 1);
child->pid = pid;
child->timerid = 0;
child->timeout = FALSE;
child->privatedata = privatedata;
child->callback = callback;
child->flags = flags;
if(desc) {
child->desc = strdup(desc);
}
if (timeout) {
child->timerid = g_timeout_add(timeout, child_timeout_callback, child);
}
child_list = g_list_append(child_list, child);
if(need_init) {
need_init = FALSE;
/* SIGCHLD processing has to be invoked from mainloop.
* We do not want it to be possible to both add a child pid
* to mainloop, and have the pid's exit callback invoked within
* the same callstack. */
g_timeout_add(1, child_signal_init, NULL);
}
}
void
mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata,
void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
{
mainloop_child_add_with_flags(pid, timeout, desc, privatedata, 0, callback);
}
struct mainloop_timer_s {
guint id;
guint period_ms;
bool repeat;
char *name;
GSourceFunc cb;
void *userdata;
};
static gboolean
mainloop_timer_cb(gpointer user_data)
{
int id = 0;
bool repeat = FALSE;
struct mainloop_timer_s *t = user_data;
CRM_ASSERT(t != NULL);
id = t->id;
t->id = 0; /* Ensure it's unset during callbacks so that
* mainloop_timer_running() works as expected
*/
if(t->cb) {
crm_trace("Invoking callbacks for timer %s", t->name);
repeat = t->repeat;
if(t->cb(t->userdata) == FALSE) {
crm_trace("Timer %s complete", t->name);
repeat = FALSE;
}
}
if(repeat) {
/* Restore if repeating */
t->id = id;
}
return repeat;
}
bool
mainloop_timer_running(mainloop_timer_t *t)
{
if(t && t->id != 0) {
return TRUE;
}
return FALSE;
}
void
mainloop_timer_start(mainloop_timer_t *t)
{
mainloop_timer_stop(t);
if(t && t->period_ms > 0) {
crm_trace("Starting timer %s", t->name);
t->id = g_timeout_add(t->period_ms, mainloop_timer_cb, t);
}
}
void
mainloop_timer_stop(mainloop_timer_t *t)
{
if(t && t->id != 0) {
crm_trace("Stopping timer %s", t->name);
g_source_remove(t->id);
t->id = 0;
}
}
guint
mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
{
guint last = 0;
if(t) {
last = t->period_ms;
t->period_ms = period_ms;
}
if(t && t->id != 0 && last != t->period_ms) {
mainloop_timer_start(t);
}
return last;
}
mainloop_timer_t *
mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
{
mainloop_timer_t *t = calloc(1, sizeof(mainloop_timer_t));
if(t) {
if(name) {
t->name = crm_strdup_printf("%s-%u-%d", name, period_ms, repeat);
} else {
t->name = crm_strdup_printf("%p-%u-%d", t, period_ms, repeat);
}
t->id = 0;
t->period_ms = period_ms;
t->repeat = repeat;
t->cb = cb;
t->userdata = userdata;
crm_trace("Created timer %s with %p %p", t->name, userdata, t->userdata);
}
return t;
}
void
mainloop_timer_del(mainloop_timer_t *t)
{
if(t) {
crm_trace("Destroying timer %s", t->name);
mainloop_timer_stop(t);
free(t->name);
free(t);
}
}
/*
* Helpers to make sure certain events aren't lost at shutdown
*/
static gboolean
drain_timeout_cb(gpointer user_data)
{
bool *timeout_popped = (bool*) user_data;
*timeout_popped = TRUE;
return FALSE;
}
/*!
* \brief Drain some remaining main loop events then quit it
*
* \param[in] mloop Main loop to drain and quit
* \param[in] n Drain up to this many pending events
*/
void
pcmk_quit_main_loop(GMainLoop *mloop, unsigned int n)
{
if ((mloop != NULL) && g_main_loop_is_running(mloop)) {
GMainContext *ctx = g_main_loop_get_context(mloop);
/* Drain up to n events in case some memory clean-up is pending
* (helpful to reduce noise in valgrind output).
*/
for (int i = 0; (i < n) && g_main_context_pending(ctx); ++i) {
g_main_context_dispatch(ctx);
}
g_main_loop_quit(mloop);
}
}
/*!
* \brief Process main loop events while a certain condition is met
*
* \param[in] mloop Main loop to process
* \param[in] timer_ms Don't process longer than this amount of time
* \param[in] check Function that returns TRUE if events should be processed
*
* \note This function is intended to be called at shutdown if certain important
* events should not be missed. The caller would likely quit the main loop
* or exit after calling this function. The check() function will be
* passed the remaining timeout in milliseconds.
*/
void
pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool (*check)(guint))
{
bool timeout_popped = FALSE;
guint timer = 0;
GMainContext *ctx = NULL;
CRM_CHECK(mloop && check, return);
ctx = g_main_loop_get_context(mloop);
if (ctx) {
time_t start_time = time(NULL);
timer = g_timeout_add(timer_ms, drain_timeout_cb, &timeout_popped);
while (!timeout_popped
&& check(timer_ms - (time(NULL) - start_time) * 1000)) {
g_main_context_iteration(ctx, TRUE);
}
}
if (!timeout_popped && (timer > 0)) {
g_source_remove(timer);
}
}
// Deprecated functions kept only for backward API compatibility
gboolean crm_signal(int sig, void (*dispatch) (int sig));
/*
* \brief Use crm_signal_handler() instead
* \deprecated
*/
gboolean
crm_signal(int sig, void (*dispatch) (int sig))
{
return crm_signal_handler(sig, dispatch) != SIG_ERR;
}

File Metadata

Mime Type
text/x-diff
Expires
Mon, Apr 21, 6:41 PM (22 h, 13 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1665238
Default Alt Text
(112 KB)

Event Timeline