diff --git a/include/crm/common/internal.h b/include/crm/common/internal.h index 28b20b4380..13c29ea18d 100644 --- a/include/crm/common/internal.h +++ b/include/crm/common/internal.h @@ -1,296 +1,304 @@ /* * Copyright 2015-2020 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #ifndef CRM_COMMON_INTERNAL__H #define CRM_COMMON_INTERNAL__H #include // getpid() #include // bool #include // strcmp() #include // uid_t, gid_t, pid_t #include // guint, GList, GHashTable #include // xmlNode #include // crm_strdup_printf() +#include // mainloop_io_t, struct ipc_client_callbacks // Internal ACL-related utilities (from acl.c) char *pcmk__uid2username(uid_t uid); const char *pcmk__update_acl_user(xmlNode *request, const char *field, const char *peer_user); #if ENABLE_ACL # include static inline bool pcmk__is_privileged(const char *user) { return user && (!strcmp(user, CRM_DAEMON_USER) || !strcmp(user, "root")); } #endif #if SUPPORT_CIBSECRETS // Internal CIB utilities (from cib_secrets.c) */ int pcmk__substitute_secrets(const char *rsc_id, GHashTable *params); #endif /* internal digest-related utilities (from digest.c) */ bool pcmk__verify_digest(xmlNode *input, const char *expected); /* internal I/O utilities (from io.c) */ int pcmk__real_path(const char *path, char **resolved_path); char *pcmk__series_filename(const char *directory, const char *series, int sequence, bool bzip); int pcmk__read_series_sequence(const char *directory, const char *series, unsigned int *seq); void pcmk__write_series_sequence(const char *directory, const char *series, unsigned int sequence, int max); int pcmk__chown_series_sequence(const char *directory, const char *series, uid_t uid, gid_t gid); int pcmk__build_path(const char *path_c, mode_t mode); bool pcmk__daemon_can_write(const char *dir, const char *file); void pcmk__sync_directory(const char *name); int pcmk__file_contents(const char *filename, char **contents); int pcmk__write_sync(int fd, const char *contents); int pcmk__set_nonblocking(int fd); const char *pcmk__get_tmpdir(void); void pcmk__close_fds_in_child(bool); /*! * \internal * \brief Open /dev/null to consume next available file descriptor * * Open /dev/null, disregarding the result. This is intended when daemonizing to * be able to null stdin, stdout, and stderr. * * \param[in] flags O_RDONLY (stdin) or O_WRONLY (stdout and stderr) */ static inline void pcmk__open_devnull(int flags) { // Static analysis clutter // cppcheck-suppress leakReturnValNotUsed (void) open("/dev/null", flags); } /* internal logging utilities */ # define pcmk__config_err(fmt...) do { \ crm_config_error = TRUE; \ crm_err(fmt); \ } while (0) # define pcmk__config_warn(fmt...) do { \ crm_config_warning = TRUE; \ crm_warn(fmt); \ } while (0) /*! * \internal * \brief Execute code depending on whether message would be logged * * This is similar to do_crm_log_unlikely() except instead of logging, it either * continues past this statement or executes else_action depending on whether a * message of the given severity would be logged or not. This allows whole * blocks of code to be skipped if tracing or debugging is turned off. * * \param[in] level Severity at which to continue past this statement * \param[in] else_action Code block to execute if severity would not be logged * * \note else_action must not contain a break or continue statement */ # define pcmk__log_else(level, else_action) do { \ static struct qb_log_callsite *trace_cs = NULL; \ \ if (trace_cs == NULL) { \ trace_cs = qb_log_callsite_get(__func__, __FILE__, "log_else", \ level, __LINE__, 0); \ } \ if (!crm_is_callsite_active(trace_cs, level, 0)) { \ else_action; \ } \ } while(0) +/* internal main loop utilities (from mainloop.c) */ + +int pcmk__add_mainloop_ipc(crm_ipc_t *ipc, int priority, void *userdata, + struct ipc_client_callbacks *callbacks, + mainloop_io_t **source); + + /* internal procfs utilities (from procfs.c) */ pid_t pcmk__procfs_pid_of(const char *name); unsigned int pcmk__procfs_num_cores(void); /* internal XML schema functions (from xml.c) */ void crm_schema_init(void); void crm_schema_cleanup(void); /* internal functions related to process IDs (from pid.c) */ /*! * \internal * \brief Check whether process exists (by PID and optionally executable path) * * \param[in] pid PID of process to check * \param[in] daemon If not NULL, path component to match with procfs entry * * \return Standard Pacemaker return code * \note Particular return codes of interest include pcmk_rc_ok for alive, * ESRCH for process is not alive (verified by kill and/or executable path * match), EACCES for caller unable or not allowed to check. A result of * "alive" is less reliable when \p daemon is not provided or procfs is * not available, since there is no guarantee that the PID has not been * recycled for another process. * \note This function cannot be used to verify \e authenticity of the process. */ int pcmk__pid_active(pid_t pid, const char *daemon); int pcmk__read_pidfile(const char *filename, pid_t *pid); int pcmk__pidfile_matches(const char *filename, pid_t expected_pid, const char *expected_name, pid_t *pid); int pcmk__lock_pidfile(const char *filename, const char *name); /* interal functions related to resource operations (from operations.c) */ // printf-style format to create operation ID from resource, action, interval #define PCMK__OP_FMT "%s_%s_%u" char *pcmk__op_key(const char *rsc_id, const char *op_type, guint interval_ms); char *pcmk__notify_key(const char *rsc_id, const char *notify_type, const char *op_type); char *pcmk__transition_key(int transition_id, int action_id, int target_rc, const char *node); void pcmk__filter_op_for_digest(xmlNode *param_set); // miscellaneous utilities (from utils.c) const char *pcmk_message_name(const char *name); extern int pcmk__score_red; extern int pcmk__score_green; extern int pcmk__score_yellow; /* internal generic string functions (from strings.c) */ int pcmk__guint_from_hash(GHashTable *table, const char *key, guint default_val, guint *result); bool pcmk__starts_with(const char *str, const char *prefix); bool pcmk__ends_with(const char *s, const char *match); bool pcmk__ends_with_ext(const char *s, const char *match); char *pcmk__add_word(char *list, const char *word); int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len); /* Correctly displaying singular or plural is complicated; consider "1 node has" * vs. "2 nodes have". A flexible solution is to pluralize entire strings, e.g. * * if (a == 1) { * crm_info("singular message"): * } else { * crm_info("plural message"); * } * * though even that's not sufficient for all languages besides English (if we * ever desire to do translations of output and log messages). But the following * convenience macros are "good enough" and more concise for many cases. */ /* Example: * crm_info("Found %d %s", nentries, * pcmk__plural_alt(nentries, "entry", "entries")); */ #define pcmk__plural_alt(i, s1, s2) (((i) == 1)? (s1) : (s2)) // Example: crm_info("Found %d node%s", nnodes, pcmk__plural_s(nnodes)); #define pcmk__plural_s(i) pcmk__plural_alt(i, "", "s") static inline int pcmk__str_empty(const char *s) { return (s == NULL) || (s[0] == '\0'); } static inline char * pcmk__getpid_s(void) { return crm_strdup_printf("%lu", (unsigned long) getpid()); } // More efficient than g_list_length(list) == 1 static inline bool pcmk__list_of_1(GList *list) { return list && (list->next == NULL); } // More efficient than g_list_length(list) > 1 static inline bool pcmk__list_of_multiple(GList *list) { return list && (list->next != NULL); } /* convenience functions for failure-related node attributes */ #define PCMK__FAIL_COUNT_PREFIX "fail-count" #define PCMK__LAST_FAILURE_PREFIX "last-failure" /*! * \internal * \brief Generate a failure-related node attribute name for a resource * * \param[in] prefix Start of attribute name * \param[in] rsc_id Resource name * \param[in] op Operation name * \param[in] interval_ms Operation interval * * \return Newly allocated string with attribute name * * \note Failure attributes are named like PREFIX-RSC#OP_INTERVAL (for example, * "fail-count-myrsc#monitor_30000"). The '#' is used because it is not * a valid character in a resource ID, to reliably distinguish where the * operation name begins. The '_' is used simply to be more comparable to * action labels like "myrsc_monitor_30000". */ static inline char * pcmk__fail_attr_name(const char *prefix, const char *rsc_id, const char *op, guint interval_ms) { CRM_CHECK(prefix && rsc_id && op, return NULL); return crm_strdup_printf("%s-%s#%s_%u", prefix, rsc_id, op, interval_ms); } static inline char * pcmk__failcount_name(const char *rsc_id, const char *op, guint interval_ms) { return pcmk__fail_attr_name(PCMK__FAIL_COUNT_PREFIX, rsc_id, op, interval_ms); } static inline char * pcmk__lastfailure_name(const char *rsc_id, const char *op, guint interval_ms) { return pcmk__fail_attr_name(PCMK__LAST_FAILURE_PREFIX, rsc_id, op, interval_ms); } #endif /* CRM_COMMON_INTERNAL__H */ diff --git a/include/crm/common/ipc.h b/include/crm/common/ipc.h index a0df956524..8dee1b1f2f 100644 --- a/include/crm/common/ipc.h +++ b/include/crm/common/ipc.h @@ -1,137 +1,228 @@ /* * Copyright 2004-2020 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #ifndef CRM_COMMON_IPC__H # define CRM_COMMON_IPC__H #ifdef __cplusplus extern "C" { #endif /** * \file - * \brief Wrappers for and extensions to libqb IPC + * \brief IPC interface to Pacemaker daemons + * * \ingroup core */ #include #include #include /* * Message creation utilities * * These are used for both IPC messages and cluster layer messages. However, * since this is public API, they stay in this header for backward * compatibility. */ #define create_reply(request, xml_response_data) \ create_reply_adv(request, xml_response_data, __FUNCTION__) xmlNode *create_reply_adv(xmlNode *request, xmlNode *xml_response_data, const char *origin); #define create_request(task, xml_data, host_to, sys_to, sys_from, uuid_from) \ create_request_adv(task, xml_data, host_to, sys_to, sys_from, uuid_from, \ __FUNCTION__) xmlNode *create_request_adv(const char *task, xmlNode *xml_data, const char *host_to, const char *sys_to, const char *sys_from, const char *uuid_from, const char *origin); +/* + * The library supports two methods of creating IPC connections. The older code + * allows connecting to any arbitrary IPC name. The newer code only allows + * connecting to one of the Pacemaker daemons. + * + * As daemons are converted to use the new model, the old functions should be + * considered deprecated for use with those daemons. Once all daemons are + * converted, the old functions should be officially deprecated as public API + * and eventually made internal API. + */ + +/* + * Pacemaker daemon IPC + */ + +//! Available IPC interfaces +enum pcmk_ipc_server { + pcmk_ipc_attrd, //!< Attribute manager + pcmk_ipc_based, //!< CIB manager + pcmk_ipc_controld, //!< Controller + pcmk_ipc_execd, //!< Executor + pcmk_ipc_fenced, //!< Fencer + pcmk_ipc_pacemakerd, //!< Launcher + pcmk_ipc_schedulerd, //!< Scheduler +}; + +//! Possible event types that an IPC event callback can be called for +enum pcmk_ipc_event { + pcmk_ipc_event_connect, //!< Result of asynchronous connection attempt + pcmk_ipc_event_disconnect, //!< Termination of IPC connection + pcmk_ipc_event_reply, //!< Daemon's reply to client IPC request + pcmk_ipc_event_notify, //!< Notification from daemon +}; + +//! How IPC replies should be dispatched +enum pcmk_ipc_dispatch { + pcmk_ipc_dispatch_main, //!< Attach IPC to GMainLoop for dispatch + pcmk_ipc_dispatch_poll, //!< Caller will poll and dispatch IPC + pcmk_ipc_dispatch_sync, //!< Sending a command will wait for any reply +}; + +//! Client connection to Pacemaker IPC +typedef struct pcmk_ipc_api_s pcmk_ipc_api_t; + +/*! + * \brief Callback function type for Pacemaker daemon IPC APIs + * + * \param[in] api IPC API connection + * \param[in] event_type The type of event that occurred + * \param[in] status Event status + * \param[in] event_data Event-specific data + * \param[in] user_data Caller data provided when callback was registered + * + * \note For connection and disconnection events, event_data may be NULL (for + * local IPC) or the name of the connected node (for remote IPC, for + * daemons that support that). For reply and notify events, event_data is + * defined by the specific daemon API. + */ +typedef void (*pcmk_ipc_callback_t)(pcmk_ipc_api_t *api, + enum pcmk_ipc_event event_type, + crm_exit_t status, + void *event_data, void *user_data); + +int pcmk_new_ipc_api(pcmk_ipc_api_t **api, enum pcmk_ipc_server server); + +void pcmk_free_ipc_api(pcmk_ipc_api_t *api); + +int pcmk_connect_ipc(pcmk_ipc_api_t *api, enum pcmk_ipc_dispatch dispatch_type); + +void pcmk_disconnect_ipc(pcmk_ipc_api_t *api); + +int pcmk_poll_ipc(pcmk_ipc_api_t *api, int timeout_ms); + +void pcmk_dispatch_ipc(pcmk_ipc_api_t *api); + +void pcmk_register_ipc_callback(pcmk_ipc_api_t *api, pcmk_ipc_callback_t cb, + void *user_data); + +const char *pcmk_ipc_name(pcmk_ipc_api_t *api, bool for_log); + +bool pcmk_ipc_is_connected(pcmk_ipc_api_t *api); + +int pcmk_ipc_purge_node(pcmk_ipc_api_t *api, const char *node_name, + uint32_t nodeid); + + +/* + * Generic IPC API (to eventually be deprecated as public API and made internal) + */ + /* *INDENT-OFF* */ enum crm_ipc_flags { crm_ipc_flags_none = 0x00000000, crm_ipc_compressed = 0x00000001, /* Message has been compressed */ crm_ipc_proxied = 0x00000100, /* _ALL_ replies to proxied connections need to be sent as events */ crm_ipc_client_response = 0x00000200, /* A Response is expected in reply */ - // These are options only for pcmk__ipc_send_iov() + // These are options for Pacemaker's internal use only (pcmk__ipc_send_*()) crm_ipc_server_event = 0x00010000, /* Send an Event instead of a Response */ crm_ipc_server_free = 0x00020000, /* Free the iovec after sending */ crm_ipc_proxied_relay_response = 0x00040000, /* all replies to proxied connections are sent as events, this flag preserves whether the event should be treated as an actual event, or a response.*/ crm_ipc_server_info = 0x00100000, /* Log failures as LOG_INFO */ crm_ipc_server_error = 0x00200000, /* Log failures as LOG_ERR */ }; /* *INDENT-ON* */ typedef struct crm_ipc_s crm_ipc_t; crm_ipc_t *crm_ipc_new(const char *name, size_t max_size); bool crm_ipc_connect(crm_ipc_t * client); void crm_ipc_close(crm_ipc_t * client); void crm_ipc_destroy(crm_ipc_t * client); void pcmk_free_ipc_event(struct iovec *event); int crm_ipc_send(crm_ipc_t * client, xmlNode * message, enum crm_ipc_flags flags, int32_t ms_timeout, xmlNode ** reply); int crm_ipc_get_fd(crm_ipc_t * client); bool crm_ipc_connected(crm_ipc_t * client); int crm_ipc_ready(crm_ipc_t * client); long crm_ipc_read(crm_ipc_t * client); const char *crm_ipc_buffer(crm_ipc_t * client); uint32_t crm_ipc_buffer_flags(crm_ipc_t * client); const char *crm_ipc_name(crm_ipc_t * client); unsigned int crm_ipc_default_buffer_size(void); /*! * \brief Check the authenticity of the IPC socket peer process * * If everything goes well, peer's authenticity is verified by the means * of comparing against provided referential UID and GID (either satisfies), * and the result of this check can be deduced from the return value. * As an exception, detected UID of 0 ("root") satisfies arbitrary * provided referential daemon's credentials. * * \param[in] sock IPC related, connected Unix socket to check peer of * \param[in] refuid referential UID to check against * \param[in] refgid referential GID to check against * \param[out] gotpid to optionally store obtained PID of the peer * (not available on FreeBSD, special value of 1 * used instead, and the caller is required to * special case this value respectively) * \param[out] gotuid to optionally store obtained UID of the peer * \param[out] gotgid to optionally store obtained GID of the peer * * \return 0 if IPC related socket's peer is not authentic given the * referential credentials (see above), 1 if it is, * negative value on error (generally expressing -errno unless * it was zero even on nonhappy path, -pcmk_err_generic is * returned then; no message is directly emitted) * * \note While this function is tolerant on what constitutes authorized * IPC daemon process (its effective user matches UID=0 or \p refuid, * or at least its group matches \p refgid), either or both (in case * of UID=0) mismatches on the expected credentials of such peer * process \e shall be investigated at the caller when value of 1 * gets returned there, since higher-than-expected privileges in * respect to the expected/intended credentials possibly violate * the least privilege principle and may pose an additional risk * (i.e. such accidental inconsistency shall be eventually fixed). */ int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid); /* Utils */ xmlNode *create_hello_message(const char *uuid, const char *client_name, const char *major_version, const char *minor_version); #ifdef __cplusplus } #endif #endif diff --git a/lib/common/crmcommon_private.h b/lib/common/crmcommon_private.h index d06fa20088..f9df27d41c 100644 --- a/lib/common/crmcommon_private.h +++ b/lib/common/crmcommon_private.h @@ -1,120 +1,206 @@ /* * Copyright 2018-2020 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #ifndef CRMCOMMON_PRIVATE__H # define CRMCOMMON_PRIVATE__H /* This header is for the sole use of libcrmcommon, so that functions can be * declared with G_GNUC_INTERNAL for efficiency. */ #include // uint8_t, uint32_t #include // bool #include // size_t #include // GList #include // xmlNode, xmlAttr #include // struct qb_ipc_response_header /* * XML and ACLs */ enum xml_private_flags { xpf_none = 0x0000, xpf_dirty = 0x0001, xpf_deleted = 0x0002, xpf_created = 0x0004, xpf_modified = 0x0008, xpf_tracking = 0x0010, xpf_processed = 0x0020, xpf_skip = 0x0040, xpf_moved = 0x0080, xpf_acl_enabled = 0x0100, xpf_acl_read = 0x0200, xpf_acl_write = 0x0400, xpf_acl_deny = 0x0800, xpf_acl_create = 0x1000, xpf_acl_denied = 0x2000, xpf_lazy = 0x4000, }; typedef struct xml_private_s { long check; uint32_t flags; char *user; GList *acls; GList *deleted_objs; } xml_private_t; G_GNUC_INTERNAL void pcmk__set_xml_flag(xmlNode *xml, enum xml_private_flags flag); G_GNUC_INTERNAL bool pcmk__tracking_xml_changes(xmlNode *xml, bool lazy); G_GNUC_INTERNAL int pcmk__element_xpath(const char *prefix, xmlNode *xml, char *buffer, int offset, size_t buffer_size); G_GNUC_INTERNAL void pcmk__free_acls(GList *acls); G_GNUC_INTERNAL void pcmk__unpack_acl(xmlNode *source, xmlNode *target, const char *user); G_GNUC_INTERNAL bool pcmk__check_acl(xmlNode *xml, const char *name, enum xml_private_flags mode); G_GNUC_INTERNAL void pcmk__apply_acl(xmlNode *xml); G_GNUC_INTERNAL void pcmk__apply_creation_acl(xmlNode *xml, bool check_top); G_GNUC_INTERNAL void pcmk__mark_xml_attr_dirty(xmlAttr *a); static inline xmlAttr * pcmk__first_xml_attr(const xmlNode *xml) { return xml? xml->properties : NULL; } static inline const char * pcmk__xml_attr_value(const xmlAttr *attr) { return ((attr == NULL) || (attr->children == NULL))? NULL : (const char *) attr->children->content; } /* * IPC */ #define PCMK__IPC_VERSION 1 +// IPC behavior that varies by daemon +typedef struct pcmk__ipc_methods_s { + /*! + * \internal + * \brief Allocate any private data needed by daemon IPC + * + * \param[in] api IPC API connection + * + * \return Standard Pacemaker return code + */ + int (*new_data)(pcmk_ipc_api_t *api); + + /*! + * \internal + * \brief Free any private data used by daemon IPC + * + * \param[in] api_data Data allocated by new_data() method + */ + void (*free_data)(void *api_data); + + /*! + * \internal + * \brief Perform daemon-specific handling after successful connection + * + * Some daemons require clients to register before sending any other + * commands. The controller requires a CRM_OP_HELLO (with no reply), and + * the CIB manager, executor, and fencer require a CRM_OP_REGISTER (with a + * reply). Ideally this would be consistent across all daemons, but for now + * this allows each to do its own authorization. + * + * \param[in] api IPC API connection + * + * \return Standard Pacemaker return code + */ + int (*post_connect)(pcmk_ipc_api_t *api); + + /*! + * \internal + * \brief Check whether an IPC request results in a reply + * + * \parma[in] api IPC API connection + * \param[in] request IPC request XML + * + * \return true if request would result in an IPC reply, false otherwise + */ + bool (*reply_expected)(pcmk_ipc_api_t *api, xmlNode *request); + + /*! + * \internal + * \brief Perform daemon-specific handling of an IPC message + * + * \param[in] api IPC API connection + * \param[in] msg Message read from IPC connection + */ + void (*dispatch)(pcmk_ipc_api_t *api, xmlNode *msg); + + /*! + * \internal + * \brief Perform daemon-specific handling of an IPC disconnect + * + * \param[in] api IPC API connection + */ + void (*post_disconnect)(pcmk_ipc_api_t *api); +} pcmk__ipc_methods_t; + +// Implementation of pcmk_ipc_api_t +struct pcmk_ipc_api_s { + enum pcmk_ipc_server server; // Daemon this IPC API instance is for + enum pcmk_ipc_dispatch dispatch_type; // How replies should be dispatched + crm_ipc_t *ipc; // IPC connection + mainloop_io_t *mainloop_io; // If using mainloop, I/O source for IPC + bool free_on_disconnect; // Whether disconnect should free object + pcmk_ipc_callback_t cb; // Caller-registered callback (if any) + void *user_data; // Caller-registered data (if any) + void *api_data; // For daemon-specific use + pcmk__ipc_methods_t *cmds; // Behavior that varies by daemon +}; + typedef struct pcmk__ipc_header_s { struct qb_ipc_response_header qb; uint32_t size_uncompressed; uint32_t size_compressed; uint32_t flags; uint8_t version; } pcmk__ipc_header_t; +G_GNUC_INTERNAL +int pcmk__send_ipc_request(pcmk_ipc_api_t *api, xmlNode *request); + +G_GNUC_INTERNAL +void pcmk__call_ipc_callback(pcmk_ipc_api_t *api, + enum pcmk_ipc_event event_type, + crm_exit_t status, void *event_data); + G_GNUC_INTERNAL unsigned int pcmk__ipc_buffer_size(unsigned int max); G_GNUC_INTERNAL bool pcmk__valid_ipc_header(const pcmk__ipc_header_t *header); #endif // CRMCOMMON_PRIVATE__H diff --git a/lib/common/ipc_client.c b/lib/common/ipc_client.c index 773758865c..16dc9b5497 100644 --- a/lib/common/ipc_client.c +++ b/lib/common/ipc_client.c @@ -1,755 +1,1454 @@ /* * Copyright 2004-2020 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #include #if defined(US_AUTH_PEERCRED_UCRED) || defined(US_AUTH_PEERCRED_SOCKPEERCRED) # ifdef US_AUTH_PEERCRED_UCRED # ifndef _GNU_SOURCE # define _GNU_SOURCE # endif # endif # include #elif defined(US_AUTH_GETPEERUCRED) # include #endif #include #include #include #include #include /* indirectly: pcmk_err_generic */ #include #include #include #include "crmcommon_private.h" +/*! + * \brief Create a new object for using Pacemaker daemon IPC + * + * \param[out] api Where to store new IPC object + * \param[in] server Which Pacemaker daemon the object is for + * + * \return Standard Pacemaker result code + * + * \note The caller is responsible for freeing *api using pcmk_free_ipc_api(). + * \note This is intended to supersede crm_ipc_new() but is not yet usable. + */ +int +pcmk_new_ipc_api(pcmk_ipc_api_t **api, enum pcmk_ipc_server server) +{ + size_t max_size = 0; + + if (api == NULL) { + return EINVAL; + } + + *api = calloc(1, sizeof(pcmk_ipc_api_t)); + if (*api == NULL) { + return errno; + } + + (*api)->server = server; + if (pcmk_ipc_name(*api, false) == NULL) { + pcmk_free_ipc_api(*api); + *api = NULL; + return EOPNOTSUPP; + } + + // Set server methods and max_size (if not default) + switch (server) { + case pcmk_ipc_attrd: + break; + + case pcmk_ipc_based: + max_size = 512 * 1024; // 512KB + break; + + case pcmk_ipc_controld: + break; + + case pcmk_ipc_execd: + break; + + case pcmk_ipc_fenced: + break; + + case pcmk_ipc_pacemakerd: + break; + + case pcmk_ipc_schedulerd: + // @TODO max_size could vary by client, maybe take as argument? + max_size = 5 * 1024 * 1024; // 5MB + break; + } + if ((*api)->cmds == NULL) { + pcmk_free_ipc_api(*api); + *api = NULL; + return ENOMEM; + } + + (*api)->ipc = crm_ipc_new(pcmk_ipc_name(*api, false), max_size); + if ((*api)->ipc == NULL) { + pcmk_free_ipc_api(*api); + *api = NULL; + return ENOMEM; + } + + // If daemon API has its own data to track, allocate it + if ((*api)->cmds->new_data != NULL) { + if ((*api)->cmds->new_data(*api) != pcmk_rc_ok) { + pcmk_free_ipc_api(*api); + *api = NULL; + return ENOMEM; + } + } + crm_trace("Created %s API IPC object", pcmk_ipc_name(*api, true)); + return pcmk_rc_ok; +} + +static void +free_daemon_specific_data(pcmk_ipc_api_t *api) +{ + if ((api != NULL) && (api->cmds != NULL)) { + if ((api->cmds->free_data != NULL) && (api->api_data != NULL)) { + api->cmds->free_data(api->api_data); + api->api_data = NULL; + } + free(api->cmds); + api->cmds = NULL; + } +} + +/*! + * \internal + * \brief Call an IPC API event callback, if one is registed + * + * \param[in] api IPC API connection + * \param[in] event_type The type of event that occurred + * \param[in] status Event status + * \param[in] event_data Event-specific data + */ +void +pcmk__call_ipc_callback(pcmk_ipc_api_t *api, enum pcmk_ipc_event event_type, + crm_exit_t status, void *event_data) +{ + if ((api != NULL) && (api->cb != NULL)) { + api->cb(api, event_type, status, event_data, api->user_data); + } +} + +/*! + * \internal + * \brief Clean up after an IPC disconnect + * + * \param[in] user_data IPC API connection that disconnected + * + * \note This function can be used as a main loop IPC destroy callback. + */ +static void +ipc_post_disconnect(gpointer user_data) +{ + pcmk_ipc_api_t *api = user_data; + + crm_info("Disconnected from %s IPC API", pcmk_ipc_name(api, true)); + + // Perform any daemon-specific handling needed + if ((api->cmds != NULL) && (api->cmds->post_disconnect != NULL)) { + api->cmds->post_disconnect(api); + } + + // Call client's registered event callback + pcmk__call_ipc_callback(api, pcmk_ipc_event_disconnect, CRM_EX_DISCONNECT, + NULL); + + /* If this is being called from a running main loop, mainloop_gio_destroy() + * will free ipc and mainloop_io immediately after calling this function. + * If this is called from a stopped main loop, these will leak, so the best + * practice is to close the connection before stopping the main loop. + */ + api->ipc = NULL; + api->mainloop_io = NULL; + + if (api->free_on_disconnect) { + /* pcmk_free_ipc_api() has already been called, but did not free api + * or api->cmds because this function needed them. Do that now. + */ + free_daemon_specific_data(api); + crm_trace("Freeing IPC API object after disconnect"); + free(api); + } +} + +/*! + * \brief Free the contents of an IPC API object + * + * \param[in] api IPC API object to free + */ +void +pcmk_free_ipc_api(pcmk_ipc_api_t *api) +{ + bool free_on_disconnect = false; + + if (api == NULL) { + return; + } + crm_debug("Releasing %s IPC API", pcmk_ipc_name(api, true)); + + if (api->ipc != NULL) { + if (api->mainloop_io != NULL) { + /* We need to keep the api pointer itself around, because it is the + * user data for the IPC client destroy callback. That will be + * triggered by the pcmk_disconnect_ipc() call below, but it might + * happen later in the main loop (if still running). + * + * This flag tells the destroy callback to free the object. It can't + * do that unconditionally, because the application might call this + * function after a disconnect that happened by other means. + */ + free_on_disconnect = api->free_on_disconnect = true; + } + pcmk_disconnect_ipc(api); // Frees api if free_on_disconnect is true + } + if (!free_on_disconnect) { + free_daemon_specific_data(api); + crm_trace("Freeing IPC API object"); + free(api); + } +} + +/*! + * \brief Get the IPC name used with an IPC API connection + * + * \param[in] api IPC API connection + * \param[in] for_log If true, return human-friendly name instead of IPC name + * + * \return IPC API's human-friendly or connection name, or if none is available, + * "Pacemaker" if for_log is true and NULL if for_log is false + */ +const char * +pcmk_ipc_name(pcmk_ipc_api_t *api, bool for_log) +{ + if (api == NULL) { + return for_log? "Pacemaker" : NULL; + } + switch (api->server) { + case pcmk_ipc_attrd: + return for_log? "attribute manager" : NULL /* T_ATTRD */; + + case pcmk_ipc_based: + return for_log? "CIB manager" : NULL /* PCMK__SERVER_BASED_RW */; + + case pcmk_ipc_controld: + return for_log? "controller" : NULL /* CRM_SYSTEM_CRMD */; + + case pcmk_ipc_execd: + return for_log? "executor" : NULL /* CRM_SYSTEM_LRMD */; + + case pcmk_ipc_fenced: + return for_log? "fencer" : NULL /* "stonith-ng" */; + + case pcmk_ipc_pacemakerd: + return for_log? "launcher" : NULL /* CRM_SYSTEM_MCP */; + + case pcmk_ipc_schedulerd: + return for_log? "scheduler" : NULL /* CRM_SYSTEM_PENGINE */; + + default: + return for_log? "Pacemaker" : NULL; + } +} + +/*! + * \brief Check whether an IPC API connection is active + * + * \param[in] api IPC API connection + * + * \return true if IPC is connected, false otherwise + */ +bool +pcmk_ipc_is_connected(pcmk_ipc_api_t *api) +{ + return (api != NULL) && crm_ipc_connected(api->ipc); +} + +/*! + * \internal + * \brief Call the daemon-specific API's dispatch function + * + * Perform daemon-specific handling of IPC reply dispatch. It is the daemon + * method's responsibility to call the client's registered event callback, as + * well as allocate and free any event data. + * + * \param[in] api IPC API connection + */ +static void +call_api_dispatch(pcmk_ipc_api_t *api, xmlNode *message) +{ + crm_log_xml_trace(message, "ipc-received"); + if ((api->cmds != NULL) && (api->cmds->dispatch != NULL)) { + api->cmds->dispatch(api, message); + } +} + +/*! + * \internal + * \brief Dispatch data read from IPC source + * + * \param[in] buffer Data read from IPC + * \param[in] length Number of bytes of data in buffer (ignored) + * \param[in] user_data IPC object + * + * \return Always 0 (meaning connection is still required) + * + * \note This function can be used as a main loop IPC dispatch callback. + */ +static int +dispatch_ipc_data(const char *buffer, ssize_t length, gpointer user_data) +{ + pcmk_ipc_api_t *api = user_data; + xmlNode *msg; + + CRM_CHECK(api != NULL, return 0); + + if (buffer == NULL) { + crm_warn("Empty message received from %s IPC", + pcmk_ipc_name(api, true)); + return 0; + } + + msg = string2xml(buffer); + if (msg == NULL) { + crm_warn("Malformed message received from %s IPC", + pcmk_ipc_name(api, true)); + return 0; + } + call_api_dispatch(api, msg); + free_xml(msg); + return 0; +} + +/*! + * \brief Check whether an IPC connection has data available (without main loop) + * + * \param[in] api IPC API connection + * \param[in] timeout_ms If less than 0, poll indefinitely; if 0, poll once + * and return immediately; otherwise, poll for up to + * this many milliseconds + * + * \return Standard Pacemaker return code + * + * \note Callers of pcmk_connect_ipc() using pcmk_ipc_dispatch_poll should call + * this function to check whether IPC data is available. Return values of + * interest include pcmk_rc_ok meaning data is available, and EAGAIN + * meaning no data is available; all other values indicate errors. + * \todo This does not allow the caller to poll multiple file descriptors at + * once. If there is demand for that, we could add a wrapper for + * crm_ipc_get_fd(api->ipc), so the caller can call poll() themselves. + */ +int +pcmk_poll_ipc(pcmk_ipc_api_t *api, int timeout_ms) +{ + int rc; + struct pollfd pollfd = { 0, }; + + if ((api == NULL) || (api->dispatch_type != pcmk_ipc_dispatch_poll)) { + return EINVAL; + } + pollfd.fd = crm_ipc_get_fd(api->ipc); + pollfd.events = POLLIN; + rc = poll(&pollfd, 1, timeout_ms); + if (rc < 0) { + return errno; + } else if (rc == 0) { + return EAGAIN; + } + return pcmk_rc_ok; +} + +/*! + * \brief Dispatch available messages on an IPC connection (without main loop) + * + * \param[in] api IPC API connection + * + * \return Standard Pacemaker return code + * + * \note Callers of pcmk_connect_ipc() using pcmk_ipc_dispatch_poll should call + * this function when IPC data is available. + */ +void +pcmk_dispatch_ipc(pcmk_ipc_api_t *api) +{ + if (api == NULL) { + return; + } + while (crm_ipc_ready(api->ipc)) { + if (crm_ipc_read(api->ipc) > 0) { + dispatch_ipc_data(crm_ipc_buffer(api->ipc), 0, api); + } + } +} + +// \return Standard Pacemaker return code +static int +connect_with_main_loop(pcmk_ipc_api_t *api) +{ + int rc; + + struct ipc_client_callbacks callbacks = { + .dispatch = dispatch_ipc_data, + .destroy = ipc_post_disconnect, + }; + + rc = pcmk__add_mainloop_ipc(api->ipc, G_PRIORITY_DEFAULT, api, + &callbacks, &(api->mainloop_io)); + if (rc != pcmk_rc_ok) { + return rc; + } + crm_debug("Connected to %s IPC (attached to main loop)", + pcmk_ipc_name(api, true)); + /* After this point, api->mainloop_io owns api->ipc, so api->ipc + * should not be explicitly freed. + */ + return pcmk_rc_ok; +} + +// \return Standard Pacemaker return code +static int +connect_without_main_loop(pcmk_ipc_api_t *api) +{ + int rc; + + if (!crm_ipc_connect(api->ipc)) { + rc = errno; + crm_ipc_close(api->ipc); + return rc; + } + crm_debug("Connected to %s IPC (without main loop)", + pcmk_ipc_name(api, true)); + return pcmk_rc_ok; +} + +/*! + * \brief Connect to a Pacemaker daemon via IPC + * + * \param[in] api IPC API instance + * \param[out] dispatch_type How IPC replies should be dispatched + * + * \return Standard Pacemaker return code + */ +int +pcmk_connect_ipc(pcmk_ipc_api_t *api, enum pcmk_ipc_dispatch dispatch_type) +{ + int rc = pcmk_rc_ok; + + if ((api == NULL) || (api->ipc == NULL)) { + crm_err("Cannot connect to uninitialized API object"); + return EINVAL; + } + + if (crm_ipc_connected(api->ipc)) { + crm_trace("Already connected to %s IPC API", pcmk_ipc_name(api, true)); + return pcmk_rc_ok; + } + + api->dispatch_type = dispatch_type; + switch (dispatch_type) { + case pcmk_ipc_dispatch_main: + rc = connect_with_main_loop(api); + break; + + case pcmk_ipc_dispatch_sync: + case pcmk_ipc_dispatch_poll: + rc = connect_without_main_loop(api); + break; + } + if (rc != pcmk_rc_ok) { + return rc; + } + + if ((api->cmds != NULL) && (api->cmds->post_connect != NULL)) { + rc = api->cmds->post_connect(api); + if (rc != pcmk_rc_ok) { + crm_ipc_close(api->ipc); + } + } + return rc; +} + +/*! + * \brief Disconnect an IPC API instance + * + * \param[in] api IPC API connection + * + * \return Standard Pacemaker return code + * + * \note If the connection is attached to a main loop, this function should be + * called before quitting the main loop, to ensure that all memory is + * freed. + */ +void +pcmk_disconnect_ipc(pcmk_ipc_api_t *api) +{ + if ((api == NULL) || (api->ipc == NULL)) { + return; + } + switch (api->dispatch_type) { + case pcmk_ipc_dispatch_main: + { + mainloop_io_t *mainloop_io = api->mainloop_io; + + // Make sure no code with access to api can use these again + api->mainloop_io = NULL; + api->ipc = NULL; + + mainloop_del_ipc_client(mainloop_io); + // After this point api might have already been freed + } + break; + + case pcmk_ipc_dispatch_poll: + case pcmk_ipc_dispatch_sync: + { + crm_ipc_t *ipc = api->ipc; + + // Make sure no code with access to api can use ipc again + api->ipc = NULL; + + // This should always be the case already, but to be safe + api->free_on_disconnect = false; + + crm_ipc_destroy(ipc); + ipc_post_disconnect(api); + } + break; + } +} + +/*! + * \brief Register a callback for IPC API events + * + * \param[in] api IPC API connection + * \param[in] callback Callback to register + * \param[in] userdata Caller data to pass to callback + * + * \note This function may be called multiple times to update the callback + * and/or user data. The caller remains responsible for freeing + * userdata in any case (after the IPC is disconnected, if the + * user data is still registered with the IPC). + */ +void +pcmk_register_ipc_callback(pcmk_ipc_api_t *api, pcmk_ipc_callback_t cb, + void *user_data) +{ + if (api == NULL) { + return; + } + api->cb = cb; + api->user_data = user_data; +} + +/*! + * \internal + * \brief Send an XML request across an IPC API connection + * + * \param[in] api IPC API connection + * \param[in] request XML request to send + * + * \return Standard Pacemaker return code + * + * \note Daemon-specific IPC API functions should call this function to send + * requests, because it handles different dispatch types appropriately. + */ +int +pcmk__send_ipc_request(pcmk_ipc_api_t *api, xmlNode *request) +{ + int rc; + xmlNode *reply = NULL; + enum crm_ipc_flags flags = crm_ipc_flags_none; + + if ((api == NULL) || (api->ipc == NULL) || (request == NULL)) { + return EINVAL; + } + crm_log_xml_trace(request, "ipc-sent"); + + // Synchronous dispatch requires waiting for a reply + if ((api->dispatch_type == pcmk_ipc_dispatch_sync) + && (api->cmds != NULL) + && (api->cmds->reply_expected != NULL) + && (api->cmds->reply_expected(api, request))) { + flags = crm_ipc_client_response; + } + + // The 0 here means a default timeout of 5 seconds + rc = crm_ipc_send(api->ipc, request, flags, 0, &reply); + + if (rc < 0) { + return pcmk_legacy2rc(rc); + } else if (rc == 0) { + return ENODATA; + } + + // With synchronous dispatch, we dispatch any reply now + if (reply != NULL) { + call_api_dispatch(api, reply); + free_xml(reply); + } + return pcmk_rc_ok; +} + +/*! + * \internal + * \brief Create the XML for an IPC request to purge a node from the peer cache + * + * \param[in] api IPC API connection + * \param[in] node_name If not NULL, name of node to purge + * \param[in] nodeid If not 0, node ID of node to purge + * + * \return Newly allocated IPC request XML + * + * \note The controller, fencer, and pacemakerd use the same request syntax, but + * the attribute manager uses a different one. The CIB manager doesn't + * have any syntax for it. The executor and scheduler don't connect to the + * cluster layer and thus don't have or need any syntax for it. + * + * \todo Modify the attribute manager to accept the common syntax (as well + * as its current one, for compatibility with older clients). Modify + * the CIB manager to accept and honor the common syntax. Modify the + * executor and scheduler to accept the syntax (immediately returning + * success), just for consistency. Modify this function to use the + * common syntax with all daemons if their version supports it. + */ +static xmlNode * +create_purge_node_request(pcmk_ipc_api_t *api, const char *node_name, + uint32_t nodeid) +{ + xmlNode *request = NULL; + const char *client = crm_system_name? crm_system_name : "client"; + + switch (api->server) { + case pcmk_ipc_attrd: + request = create_xml_node(NULL, __FUNCTION__); + crm_xml_add(request, F_TYPE, T_ATTRD); + crm_xml_add(request, F_ORIG, crm_system_name); + crm_xml_add(request, PCMK__XA_TASK, PCMK__ATTRD_CMD_PEER_REMOVE); + crm_xml_add(request, PCMK__XA_ATTR_NODE_NAME, node_name); + if (nodeid > 0) { + crm_xml_add_int(request, PCMK__XA_ATTR_NODE_ID, (int) nodeid); + } + break; + + case pcmk_ipc_controld: + case pcmk_ipc_fenced: + case pcmk_ipc_pacemakerd: + request = create_request(CRM_OP_RM_NODE_CACHE, NULL, NULL, + pcmk_ipc_name(api, false), client, NULL); + if (nodeid > 0) { + crm_xml_set_id(request, "%lu", (unsigned long) nodeid); + } + crm_xml_add(request, XML_ATTR_UNAME, node_name); + break; + + case pcmk_ipc_based: + case pcmk_ipc_execd: + case pcmk_ipc_schedulerd: + break; + } + return request; +} + +/*! + * \brief Ask a Pacemaker daemon to purge a node from its peer cache + * + * \param[in] api IPC API connection + * \param[in] node_name If not NULL, name of node to purge + * \param[in] nodeid If not 0, node ID of node to purge + * + * \return Standard Pacemaker return code + * + * \note At least one of node_name or nodeid must be specified. + */ +int +pcmk_ipc_purge_node(pcmk_ipc_api_t *api, const char *node_name, uint32_t nodeid) +{ + int rc = 0; + xmlNode *request = NULL; + + if (api == NULL) { + return EINVAL; + } + if ((node_name == NULL) && (nodeid == 0)) { + return EINVAL; + } + + request = create_purge_node_request(api, node_name, nodeid); + if (request == NULL) { + return EOPNOTSUPP; + } + rc = pcmk__send_ipc_request(api, request); + free_xml(request); + + crm_debug("%s peer cache purge of node %s[%lu]: rc=%d", + pcmk_ipc_name(api, true), node_name, (unsigned long) nodeid, rc); + return rc; +} + +/* + * Generic IPC API (to eventually be deprecated as public API and made internal) + */ + struct crm_ipc_s { struct pollfd pfd; unsigned int max_buf_size; // maximum bytes we can send or receive over IPC unsigned int buf_size; // size of allocated buffer int msg_size; int need_reply; char *buffer; char *name; qb_ipcc_connection_t *ipc; }; +/*! + * \brief Create a new (legacy) object for using Pacemaker daemon IPC + * + * \param[in] name IPC system name to connect to + * \param[in] max_size Use a maximum IPC buffer size of at least this size + * + * \return Newly allocated IPC object on success, NULL otherwise + * + * \note The caller is responsible for freeing the result using + * crm_ipc_destroy(). + * \note This should be considered deprecated for use with daemons supported by + * pcmk_new_ipc_api(). + */ crm_ipc_t * crm_ipc_new(const char *name, size_t max_size) { crm_ipc_t *client = NULL; client = calloc(1, sizeof(crm_ipc_t)); + if (client == NULL) { + crm_err("Could not create IPC connection: %s", strerror(errno)); + return NULL; + } client->name = strdup(name); + if (client->name == NULL) { + crm_err("Could not create IPC connection: %s", strerror(errno)); + free(client); + return NULL; + } client->buf_size = pcmk__ipc_buffer_size(max_size); client->buffer = malloc(client->buf_size); + if (client->buffer == NULL) { + crm_err("Could not create IPC connection: %s", strerror(errno)); + free(client->name); + free(client); + return NULL; + } /* Clients initiating connection pick the max buf size */ client->max_buf_size = client->buf_size; client->pfd.fd = -1; client->pfd.events = POLLIN; client->pfd.revents = 0; return client; } /*! * \brief Establish an IPC connection to a Pacemaker component * * \param[in] client Connection instance obtained from crm_ipc_new() * * \return TRUE on success, FALSE otherwise (in which case errno will be set; * specifically, in case of discovering the remote side is not * authentic, its value is set to ECONNABORTED). */ bool crm_ipc_connect(crm_ipc_t * client) { uid_t cl_uid = 0; gid_t cl_gid = 0; pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0; int rv; client->need_reply = FALSE; client->ipc = qb_ipcc_connect(client->name, client->buf_size); if (client->ipc == NULL) { crm_debug("Could not establish %s connection: %s (%d)", client->name, pcmk_strerror(errno), errno); return FALSE; } client->pfd.fd = crm_ipc_get_fd(client); if (client->pfd.fd < 0) { rv = errno; /* message already omitted */ crm_ipc_close(client); errno = rv; return FALSE; } rv = pcmk_daemon_user(&cl_uid, &cl_gid); if (rv < 0) { /* message already omitted */ crm_ipc_close(client); errno = -rv; return FALSE; } if (!(rv = crm_ipc_is_authentic_process(client->pfd.fd, cl_uid, cl_gid, &found_pid, &found_uid, &found_gid))) { crm_err("Daemon (IPC %s) is not authentic:" " process %lld (uid: %lld, gid: %lld)", client->name, (long long) PCMK__SPECIAL_PID_AS_0(found_pid), (long long) found_uid, (long long) found_gid); crm_ipc_close(client); errno = ECONNABORTED; return FALSE; } else if (rv < 0) { errno = -rv; crm_perror(LOG_ERR, "Could not verify authenticity of daemon (IPC %s)", client->name); crm_ipc_close(client); errno = -rv; return FALSE; } qb_ipcc_context_set(client->ipc, client); #ifdef HAVE_IPCS_GET_BUFFER_SIZE client->max_buf_size = qb_ipcc_get_buffer_size(client->ipc); if (client->max_buf_size > client->buf_size) { free(client->buffer); client->buffer = calloc(1, client->max_buf_size); client->buf_size = client->max_buf_size; } #endif return TRUE; } void crm_ipc_close(crm_ipc_t * client) { if (client) { - crm_trace("Disconnecting %s IPC connection %p (%p)", client->name, client, client->ipc); - if (client->ipc) { qb_ipcc_connection_t *ipc = client->ipc; client->ipc = NULL; qb_ipcc_disconnect(ipc); } } } void crm_ipc_destroy(crm_ipc_t * client) { if (client) { if (client->ipc && qb_ipcc_is_connected(client->ipc)) { crm_notice("Destroying an active IPC connection to %s", client->name); /* The next line is basically unsafe * * If this connection was attached to mainloop and mainloop is active, * the 'disconnected' callback will end up back here and we'll end * up free'ing the memory twice - something that can still happen * even without this if we destroy a connection and it closes before * we call exit */ /* crm_ipc_close(client); */ } crm_trace("Destroying IPC connection to %s: %p", client->name, client); free(client->buffer); free(client->name); free(client); } } int crm_ipc_get_fd(crm_ipc_t * client) { int fd = 0; if (client && client->ipc && (qb_ipcc_fd_get(client->ipc, &fd) == 0)) { return fd; } errno = EINVAL; crm_perror(LOG_ERR, "Could not obtain file IPC descriptor for %s", (client? client->name : "unspecified client")); return -errno; } bool crm_ipc_connected(crm_ipc_t * client) { bool rc = FALSE; if (client == NULL) { crm_trace("No client"); return FALSE; } else if (client->ipc == NULL) { crm_trace("No connection"); return FALSE; } else if (client->pfd.fd < 0) { crm_trace("Bad descriptor"); return FALSE; } rc = qb_ipcc_is_connected(client->ipc); if (rc == FALSE) { client->pfd.fd = -EINVAL; } return rc; } /*! * \brief Check whether an IPC connection is ready to be read * * \param[in] client Connection to check * * \return Positive value if ready to be read, 0 if not ready, -errno on error */ int crm_ipc_ready(crm_ipc_t *client) { int rc; CRM_ASSERT(client != NULL); if (crm_ipc_connected(client) == FALSE) { return -ENOTCONN; } client->pfd.revents = 0; rc = poll(&(client->pfd), 1, 0); return (rc < 0)? -errno : rc; } // \return Standard Pacemaker return code static int crm_ipc_decompress(crm_ipc_t * client) { pcmk__ipc_header_t *header = (pcmk__ipc_header_t *)(void*)client->buffer; if (header->size_compressed) { int rc = 0; unsigned int size_u = 1 + header->size_uncompressed; /* never let buf size fall below our max size required for ipc reads. */ unsigned int new_buf_size = QB_MAX((sizeof(pcmk__ipc_header_t) + size_u), client->max_buf_size); char *uncompressed = calloc(1, new_buf_size); crm_trace("Decompressing message data %u bytes into %u bytes", header->size_compressed, size_u); rc = BZ2_bzBuffToBuffDecompress(uncompressed + sizeof(pcmk__ipc_header_t), &size_u, client->buffer + sizeof(pcmk__ipc_header_t), header->size_compressed, 1, 0); if (rc != BZ_OK) { crm_err("Decompression failed: %s " CRM_XS " bzerror=%d", bz2_strerror(rc), rc); free(uncompressed); return EILSEQ; } /* * This assert no longer holds true. For an identical msg, some clients may * require compression, and others may not. If that same msg (event) is sent * to multiple clients, it could result in some clients receiving a compressed * msg even though compression was not explicitly required for them. * * CRM_ASSERT((header->size_uncompressed + sizeof(pcmk__ipc_header_t)) >= ipc_buffer_max); */ CRM_ASSERT(size_u == header->size_uncompressed); memcpy(uncompressed, client->buffer, sizeof(pcmk__ipc_header_t)); /* Preserve the header */ header = (pcmk__ipc_header_t *)(void*)uncompressed; free(client->buffer); client->buf_size = new_buf_size; client->buffer = uncompressed; } CRM_ASSERT(client->buffer[sizeof(pcmk__ipc_header_t) + header->size_uncompressed - 1] == 0); return pcmk_rc_ok; } long crm_ipc_read(crm_ipc_t * client) { pcmk__ipc_header_t *header = NULL; CRM_ASSERT(client != NULL); CRM_ASSERT(client->ipc != NULL); CRM_ASSERT(client->buffer != NULL); client->buffer[0] = 0; client->msg_size = qb_ipcc_event_recv(client->ipc, client->buffer, client->buf_size, 0); if (client->msg_size >= 0) { int rc = crm_ipc_decompress(client); if (rc != pcmk_rc_ok) { return pcmk_rc2legacy(rc); } header = (pcmk__ipc_header_t *)(void*)client->buffer; if (!pcmk__valid_ipc_header(header)) { return -EBADMSG; } crm_trace("Received %s event %d, size=%u, rc=%d, text: %.100s", client->name, header->qb.id, header->qb.size, client->msg_size, client->buffer + sizeof(pcmk__ipc_header_t)); } else { crm_trace("No message from %s received: %s", client->name, pcmk_strerror(client->msg_size)); } if (crm_ipc_connected(client) == FALSE || client->msg_size == -ENOTCONN) { crm_err("Connection to %s failed", client->name); } if (header) { /* Data excluding the header */ return header->size_uncompressed; } return -ENOMSG; } const char * crm_ipc_buffer(crm_ipc_t * client) { CRM_ASSERT(client != NULL); return client->buffer + sizeof(pcmk__ipc_header_t); } uint32_t crm_ipc_buffer_flags(crm_ipc_t * client) { pcmk__ipc_header_t *header = NULL; CRM_ASSERT(client != NULL); if (client->buffer == NULL) { return 0; } header = (pcmk__ipc_header_t *)(void*)client->buffer; return header->flags; } const char * crm_ipc_name(crm_ipc_t * client) { CRM_ASSERT(client != NULL); return client->name; } // \return Standard Pacemaker return code static int internal_ipc_get_reply(crm_ipc_t *client, int request_id, int ms_timeout, ssize_t *bytes) { time_t timeout = time(NULL) + 1 + (ms_timeout / 1000); int rc = pcmk_rc_ok; /* get the reply */ crm_trace("client %s waiting on reply to msg id %d", client->name, request_id); do { *bytes = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, 1000); if (*bytes > 0) { pcmk__ipc_header_t *hdr = NULL; rc = crm_ipc_decompress(client); if (rc != pcmk_rc_ok) { return rc; } hdr = (pcmk__ipc_header_t *)(void*)client->buffer; if (hdr->qb.id == request_id) { /* Got it */ break; } else if (hdr->qb.id < request_id) { xmlNode *bad = string2xml(crm_ipc_buffer(client)); crm_err("Discarding old reply %d (need %d)", hdr->qb.id, request_id); crm_log_xml_notice(bad, "OldIpcReply"); } else { xmlNode *bad = string2xml(crm_ipc_buffer(client)); crm_err("Discarding newer reply %d (need %d)", hdr->qb.id, request_id); crm_log_xml_notice(bad, "ImpossibleReply"); CRM_ASSERT(hdr->qb.id <= request_id); } } else if (crm_ipc_connected(client) == FALSE) { crm_err("Server disconnected client %s while waiting for msg id %d", client->name, request_id); break; } } while (time(NULL) < timeout); if (*bytes < 0) { rc = (int) -*bytes; // System errno } return rc; } /*! * \brief Send an IPC XML message * * \param[in] client Connection to IPC server * \param[in] message XML message to send * \param[in] flags Bitmask of crm_ipc_flags * \param[in] ms_timeout Give up if not sent within this much time * (5 seconds if 0, or no timeout if negative) * \param[out] reply Reply from server (or NULL if none) * * \return Negative errno on error, otherwise size of reply received in bytes * if reply was needed, otherwise number of bytes sent */ int crm_ipc_send(crm_ipc_t * client, xmlNode * message, enum crm_ipc_flags flags, int32_t ms_timeout, xmlNode ** reply) { int rc = 0; ssize_t qb_rc = 0; ssize_t bytes = 0; struct iovec *iov; static uint32_t id = 0; static int factor = 8; pcmk__ipc_header_t *header; if (client == NULL) { crm_notice("Can't send IPC request without connection (bug?): %.100s", message); return -ENOTCONN; } else if (crm_ipc_connected(client) == FALSE) { /* Don't even bother */ crm_notice("Can't send IPC request to %s: Connection closed", client->name); return -ENOTCONN; } if (ms_timeout == 0) { ms_timeout = 5000; } if (client->need_reply) { qb_rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, ms_timeout); if (qb_rc < 0) { crm_warn("Sending IPC to %s disabled until pending reply received", client->name); return -EALREADY; } else { crm_notice("Sending IPC to %s re-enabled after pending reply received", client->name); client->need_reply = FALSE; } } id++; CRM_LOG_ASSERT(id != 0); /* Crude wrap-around detection */ rc = pcmk__ipc_prepare_iov(id, message, client->max_buf_size, &iov, &bytes); if (rc != pcmk_rc_ok) { crm_warn("Couldn't prepare IPC request to %s: %s " CRM_XS " rc=%d", client->name, pcmk_rc_str(rc), rc); return pcmk_rc2legacy(rc); } header = iov[0].iov_base; header->flags |= flags; if(is_set(flags, crm_ipc_proxied)) { /* Don't look for a synchronous response */ clear_bit(flags, crm_ipc_client_response); } if(header->size_compressed) { if(factor < 10 && (client->max_buf_size / 10) < (bytes / factor)) { crm_notice("Compressed message exceeds %d0%% of configured IPC " "limit (%u bytes); consider setting PCMK_ipc_buffer to " "%u or higher", factor, client->max_buf_size, 2 * client->max_buf_size); factor++; } } crm_trace("Sending %s IPC request %d of %u bytes using %dms timeout", client->name, header->qb.id, header->qb.size, ms_timeout); if (ms_timeout > 0 || is_not_set(flags, crm_ipc_client_response)) { time_t timeout = time(NULL) + 1 + (ms_timeout / 1000); do { /* @TODO Is this check really needed? Won't qb_ipcc_sendv() return * an error if it's not connected? */ if (!crm_ipc_connected(client)) { goto send_cleanup; } qb_rc = qb_ipcc_sendv(client->ipc, iov, 2); } while ((qb_rc == -EAGAIN) && (time(NULL) < timeout)); rc = (int) qb_rc; // Negative of system errno, or bytes sent if (qb_rc <= 0) { goto send_cleanup; } else if (is_not_set(flags, crm_ipc_client_response)) { crm_trace("Not waiting for reply to %s IPC request %d", client->name, header->qb.id); goto send_cleanup; } rc = internal_ipc_get_reply(client, header->qb.id, ms_timeout, &bytes); if (rc != pcmk_rc_ok) { /* We didn't get the reply in time, so disable future sends for now. * The only alternative would be to close the connection since we * don't know how to detect and discard out-of-sequence replies. * * @TODO Implement out-of-sequence detection */ client->need_reply = TRUE; } rc = (int) bytes; // Negative system errno, or size of reply received } else { // No timeout, and client response needed do { qb_rc = qb_ipcc_sendv_recv(client->ipc, iov, 2, client->buffer, client->buf_size, -1); } while ((qb_rc == -EAGAIN) && crm_ipc_connected(client)); rc = (int) qb_rc; // Negative system errno, or size of reply received } if (rc > 0) { pcmk__ipc_header_t *hdr = (pcmk__ipc_header_t *)(void*)client->buffer; crm_trace("Received %d-byte reply %d to %s IPC %d: %.100s", rc, hdr->qb.id, client->name, header->qb.id, crm_ipc_buffer(client)); if (reply) { *reply = string2xml(crm_ipc_buffer(client)); } } else { crm_trace("No reply to %s IPC %d: rc=%d", client->name, header->qb.id, rc); } send_cleanup: if (crm_ipc_connected(client) == FALSE) { crm_notice("Couldn't send %s IPC request %d: Connection closed " CRM_XS " rc=%d", client->name, header->qb.id, rc); } else if (rc == -ETIMEDOUT) { crm_warn("%s IPC request %d failed: %s after %dms " CRM_XS " rc=%d", client->name, header->qb.id, pcmk_strerror(rc), ms_timeout, rc); crm_write_blackbox(0, NULL); } else if (rc <= 0) { crm_warn("%s IPC request %d failed: %s " CRM_XS " rc=%d", client->name, header->qb.id, ((rc == 0)? "No bytes sent" : pcmk_strerror(rc)), rc); } pcmk_free_ipc_event(iov); return rc; } int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid) { int ret = 0; pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0; #if defined(US_AUTH_PEERCRED_UCRED) struct ucred ucred; socklen_t ucred_len = sizeof(ucred); if (!getsockopt(sock, SOL_SOCKET, SO_PEERCRED, &ucred, &ucred_len) && ucred_len == sizeof(ucred)) { found_pid = ucred.pid; found_uid = ucred.uid; found_gid = ucred.gid; #elif defined(US_AUTH_PEERCRED_SOCKPEERCRED) struct sockpeercred sockpeercred; socklen_t sockpeercred_len = sizeof(sockpeercred); if (!getsockopt(sock, SOL_SOCKET, SO_PEERCRED, &sockpeercred, &sockpeercred_len) && sockpeercred_len == sizeof(sockpeercred_len)) { found_pid = sockpeercred.pid; found_uid = sockpeercred.uid; found_gid = sockpeercred.gid; #elif defined(US_AUTH_GETPEEREID) if (!getpeereid(sock, &found_uid, &found_gid)) { found_pid = PCMK__SPECIAL_PID; /* cannot obtain PID (FreeBSD) */ #elif defined(US_AUTH_GETPEERUCRED) ucred_t *ucred; if (!getpeerucred(sock, &ucred)) { errno = 0; found_pid = ucred_getpid(ucred); found_uid = ucred_geteuid(ucred); found_gid = ucred_getegid(ucred); ret = -errno; ucred_free(ucred); if (ret) { return (ret < 0) ? ret : -pcmk_err_generic; } #else # error "No way to authenticate a Unix socket peer" errno = 0; if (0) { #endif if (gotpid != NULL) { *gotpid = found_pid; } if (gotuid != NULL) { *gotuid = found_uid; } if (gotgid != NULL) { *gotgid = found_gid; } ret = (found_uid == 0 || found_uid == refuid || found_gid == refgid); } else { ret = (errno > 0) ? -errno : -pcmk_err_generic; } return ret; } int pcmk__ipc_is_authentic_process_active(const char *name, uid_t refuid, gid_t refgid, pid_t *gotpid) { static char last_asked_name[PATH_MAX / 2] = ""; /* log spam prevention */ int fd; int rc = pcmk_rc_ipc_unresponsive; int auth_rc = 0; int32_t qb_rc; pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0; qb_ipcc_connection_t *c; c = qb_ipcc_connect(name, 0); if (c == NULL) { crm_info("Could not connect to %s IPC: %s", name, strerror(errno)); rc = pcmk_rc_ipc_unresponsive; goto bail; } qb_rc = qb_ipcc_fd_get(c, &fd); if (qb_rc != 0) { rc = (int) -qb_rc; // System errno crm_err("Could not get fd from %s IPC: %s " CRM_XS " rc=%d", name, pcmk_rc_str(rc), rc); goto bail; } auth_rc = crm_ipc_is_authentic_process(fd, refuid, refgid, &found_pid, &found_uid, &found_gid); if (auth_rc < 0) { rc = pcmk_legacy2rc(auth_rc); crm_err("Could not get peer credentials from %s IPC: %s " CRM_XS " rc=%d", name, pcmk_rc_str(rc), rc); goto bail; } if (gotpid != NULL) { *gotpid = found_pid; } if (auth_rc == 0) { crm_err("Daemon (IPC %s) effectively blocked with unauthorized" " process %lld (uid: %lld, gid: %lld)", name, (long long) PCMK__SPECIAL_PID_AS_0(found_pid), (long long) found_uid, (long long) found_gid); rc = pcmk_rc_ipc_unauthorized; goto bail; } rc = pcmk_rc_ok; if ((found_uid != refuid || found_gid != refgid) && strncmp(last_asked_name, name, sizeof(last_asked_name))) { if ((found_uid == 0) && (refuid != 0)) { crm_warn("Daemon (IPC %s) runs as root, whereas the expected" " credentials are %lld:%lld, hazard of violating" " the least privilege principle", name, (long long) refuid, (long long) refgid); } else { crm_notice("Daemon (IPC %s) runs as %lld:%lld, whereas the" " expected credentials are %lld:%lld, which may" " mean a different set of privileges than expected", name, (long long) found_uid, (long long) found_gid, (long long) refuid, (long long) refgid); } memccpy(last_asked_name, name, '\0', sizeof(last_asked_name)); } bail: if (c != NULL) { qb_ipcc_disconnect(c); } return rc; } xmlNode * create_hello_message(const char *uuid, const char *client_name, const char *major_version, const char *minor_version) { xmlNode *hello_node = NULL; xmlNode *hello = NULL; if (pcmk__str_empty(uuid) || pcmk__str_empty(client_name) || pcmk__str_empty(major_version) || pcmk__str_empty(minor_version)) { crm_err("Could not create IPC hello message from %s (UUID %s): " "missing information", client_name? client_name : "unknown client", uuid? uuid : "unknown"); return NULL; } hello_node = create_xml_node(NULL, XML_TAG_OPTIONS); if (hello_node == NULL) { crm_err("Could not create IPC hello message from %s (UUID %s): " "Message data creation failed", client_name, uuid); return NULL; } crm_xml_add(hello_node, "major_version", major_version); crm_xml_add(hello_node, "minor_version", minor_version); crm_xml_add(hello_node, "client_name", client_name); crm_xml_add(hello_node, "client_uuid", uuid); hello = create_request(CRM_OP_HELLO, hello_node, NULL, NULL, client_name, uuid); if (hello == NULL) { crm_err("Could not create IPC hello message from %s (UUID %s): " "Request creation failed", client_name, uuid); return NULL; } free_xml(hello_node); crm_trace("Created hello message from %s (UUID %s)", client_name, uuid); return hello; } diff --git a/lib/common/mainloop.c b/lib/common/mainloop.c index 634eead71b..e942e578c0 100644 --- a/lib/common/mainloop.c +++ b/lib/common/mainloop.c @@ -1,1417 +1,1451 @@ /* * Copyright 2004-2020 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #include #ifndef _GNU_SOURCE # define _GNU_SOURCE #endif #include #include #include #include #include #include #include #include #include #include struct mainloop_child_s { pid_t pid; char *desc; unsigned timerid; gboolean timeout; void *privatedata; enum mainloop_child_flags flags; /* Called when a process dies */ void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode); }; struct trigger_s { GSource source; gboolean running; gboolean trigger; void *user_data; guint id; }; static gboolean crm_trigger_prepare(GSource * source, gint * timeout) { crm_trigger_t *trig = (crm_trigger_t *) source; /* cluster-glue's FD and IPC related sources make use of * g_source_add_poll() but do not set a timeout in their prepare * functions * * This means mainloop's poll() will block until an event for one * of these sources occurs - any /other/ type of source, such as * this one or g_idle_*, that doesn't use g_source_add_poll() is * S-O-L and won't be processed until there is something fd-based * happens. * * Luckily the timeout we can set here affects all sources and * puts an upper limit on how long poll() can take. * * So unconditionally set a small-ish timeout, not too small that * we're in constant motion, which will act as an upper bound on * how long the signal handling might be delayed for. */ *timeout = 500; /* Timeout in ms */ return trig->trigger; } static gboolean crm_trigger_check(GSource * source) { crm_trigger_t *trig = (crm_trigger_t *) source; return trig->trigger; } static gboolean crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata) { int rc = TRUE; crm_trigger_t *trig = (crm_trigger_t *) source; if (trig->running) { /* Wait until the existing job is complete before starting the next one */ return TRUE; } trig->trigger = FALSE; if (callback) { rc = callback(trig->user_data); if (rc < 0) { crm_trace("Trigger handler %p not yet complete", trig); trig->running = TRUE; rc = TRUE; } } return rc; } static void crm_trigger_finalize(GSource * source) { crm_trace("Trigger %p destroyed", source); } static GSourceFuncs crm_trigger_funcs = { crm_trigger_prepare, crm_trigger_check, crm_trigger_dispatch, crm_trigger_finalize, }; static crm_trigger_t * mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data), gpointer userdata) { crm_trigger_t *trigger = NULL; trigger = (crm_trigger_t *) source; trigger->id = 0; trigger->trigger = FALSE; trigger->user_data = userdata; if (dispatch) { g_source_set_callback(source, dispatch, trigger, NULL); } g_source_set_priority(source, priority); g_source_set_can_recurse(source, FALSE); trigger->id = g_source_attach(source, NULL); return trigger; } void mainloop_trigger_complete(crm_trigger_t * trig) { crm_trace("Trigger handler %p complete", trig); trig->running = FALSE; } /* If dispatch returns: * -1: Job running but not complete * 0: Remove the trigger from mainloop * 1: Leave the trigger in mainloop */ crm_trigger_t * mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data), gpointer userdata) { GSource *source = NULL; CRM_ASSERT(sizeof(crm_trigger_t) > sizeof(GSource)); source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t)); CRM_ASSERT(source != NULL); return mainloop_setup_trigger(source, priority, dispatch, userdata); } void mainloop_set_trigger(crm_trigger_t * source) { if(source) { source->trigger = TRUE; } } gboolean mainloop_destroy_trigger(crm_trigger_t * source) { GSource *gs = NULL; if(source == NULL) { return TRUE; } gs = (GSource *)source; g_source_destroy(gs); /* Remove from mainloop, ref_count-- */ g_source_unref(gs); /* The caller no longer carries a reference to source * * At this point the source should be free'd, * unless we're currently processing said * source, in which case mainloop holds an * additional reference and it will be free'd * once our processing completes */ return TRUE; } // Define a custom glib source for signal handling // Data structure for custom glib source typedef struct signal_s { crm_trigger_t trigger; // trigger that invoked source (must be first) void (*handler) (int sig); // signal handler int signal; // signal that was received } crm_signal_t; // Table to associate signal handlers with signal numbers static crm_signal_t *crm_signals[NSIG]; /*! * \internal * \brief Dispatch an event from custom glib source for signals * * Given an signal event, clear the event trigger and call any registered * signal handler. * * \param[in] source glib source that triggered this dispatch * \param[in] callback (ignored) * \param[in] userdata (ignored) */ static gboolean crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata) { crm_signal_t *sig = (crm_signal_t *) source; if(sig->signal != SIGCHLD) { crm_notice("Caught '%s' signal "CRM_XS" %d (%s handler)", strsignal(sig->signal), sig->signal, (sig->handler? "invoking" : "no")); } sig->trigger.trigger = FALSE; if (sig->handler) { sig->handler(sig->signal); } return TRUE; } /*! * \internal * \brief Handle a signal by setting a trigger for signal source * * \param[in] sig Signal number that was received * * \note This is the true signal handler for the mainloop signal source, and * must be async-safe. */ static void mainloop_signal_handler(int sig) { if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) { mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]); } } // Functions implementing our custom glib source for signal handling static GSourceFuncs crm_signal_funcs = { crm_trigger_prepare, crm_trigger_check, crm_signal_dispatch, crm_trigger_finalize, }; /*! * \internal * \brief Set a true signal handler * * signal()-like interface to sigaction() * * \param[in] sig Signal number to register handler for * \param[in] dispatch Signal handler * * \return The previous value of the signal handler, or SIG_ERR on error * \note The dispatch function must be async-safe. */ sighandler_t crm_signal_handler(int sig, sighandler_t dispatch) { sigset_t mask; struct sigaction sa; struct sigaction old; if (sigemptyset(&mask) < 0) { crm_err("Could not set handler for signal %d: %s", sig, pcmk_strerror(errno)); return SIG_ERR; } memset(&sa, 0, sizeof(struct sigaction)); sa.sa_handler = dispatch; sa.sa_flags = SA_RESTART; sa.sa_mask = mask; if (sigaction(sig, &sa, &old) < 0) { crm_err("Could not set handler for signal %d: %s", sig, pcmk_strerror(errno)); return SIG_ERR; } return old.sa_handler; } static void mainloop_destroy_signal_entry(int sig) { crm_signal_t *tmp = crm_signals[sig]; crm_signals[sig] = NULL; crm_trace("Destroying signal %d", sig); mainloop_destroy_trigger((crm_trigger_t *) tmp); } /*! * \internal * \brief Add a signal handler to a mainloop * * \param[in] sig Signal number to handle * \param[in] dispatch Signal handler function * * \note The true signal handler merely sets a mainloop trigger to call this * dispatch function via the mainloop. Therefore, the dispatch function * does not need to be async-safe. */ gboolean mainloop_add_signal(int sig, void (*dispatch) (int sig)) { GSource *source = NULL; int priority = G_PRIORITY_HIGH - 1; if (sig == SIGTERM) { /* TERM is higher priority than other signals, * signals are higher priority than other ipc. * Yes, minus: smaller is "higher" */ priority--; } if (sig >= NSIG || sig < 0) { crm_err("Signal %d is out of range", sig); return FALSE; } else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) { crm_trace("Signal handler for %d is already installed", sig); return TRUE; } else if (crm_signals[sig] != NULL) { crm_err("Different signal handler for %d is already installed", sig); return FALSE; } CRM_ASSERT(sizeof(crm_signal_t) > sizeof(GSource)); source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t)); crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL); CRM_ASSERT(crm_signals[sig] != NULL); crm_signals[sig]->handler = dispatch; crm_signals[sig]->signal = sig; if (crm_signal_handler(sig, mainloop_signal_handler) == SIG_ERR) { mainloop_destroy_signal_entry(sig); return FALSE; } #if 0 /* If we want signals to interrupt mainloop's poll(), instead of waiting for * the timeout, then we should call siginterrupt() below * * For now, just enforce a low timeout */ if (siginterrupt(sig, 1) < 0) { crm_perror(LOG_INFO, "Could not enable system call interruptions for signal %d", sig); } #endif return TRUE; } gboolean mainloop_destroy_signal(int sig) { if (sig >= NSIG || sig < 0) { crm_err("Signal %d is out of range", sig); return FALSE; } else if (crm_signal_handler(sig, NULL) == SIG_ERR) { crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig); return FALSE; } else if (crm_signals[sig] == NULL) { return TRUE; } mainloop_destroy_signal_entry(sig); return TRUE; } static qb_array_t *gio_map = NULL; void mainloop_cleanup(void) { if (gio_map) { qb_array_free(gio_map); } for (int sig = 0; sig < NSIG; ++sig) { mainloop_destroy_signal_entry(sig); } } /* * libqb... */ struct gio_to_qb_poll { int32_t is_used; guint source; int32_t events; void *data; qb_ipcs_dispatch_fn_t fn; enum qb_loop_priority p; }; static gboolean gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data) { struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; gint fd = g_io_channel_unix_get_fd(gio); crm_trace("%p.%d %d", data, fd, condition); /* if this assert get's hit, then there is a race condition between * when we destroy a fd and when mainloop actually gives it up */ CRM_ASSERT(adaptor->is_used > 0); return (adaptor->fn(fd, condition, adaptor->data) == 0); } static void gio_poll_destroy(gpointer data) { struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; adaptor->is_used--; CRM_ASSERT(adaptor->is_used >= 0); if (adaptor->is_used == 0) { crm_trace("Marking adaptor %p unused", adaptor); adaptor->source = 0; } } /*! * \internal * \brief Convert libqb's poll priority into GLib's one * * \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback) * * \return best matching GLib's priority */ static gint conv_prio_libqb2glib(enum qb_loop_priority prio) { gint ret = G_PRIORITY_DEFAULT; switch (prio) { case QB_LOOP_LOW: ret = G_PRIORITY_LOW; break; case QB_LOOP_HIGH: ret = G_PRIORITY_HIGH; break; default: crm_trace("Invalid libqb's loop priority %d, assuming QB_LOOP_MED", prio); /* fall-through */ case QB_LOOP_MED: break; } return ret; } /*! * \internal * \brief Convert libqb's poll priority to rate limiting spec * * \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback) * * \return best matching rate limiting spec */ static enum qb_ipcs_rate_limit conv_libqb_prio2ratelimit(enum qb_loop_priority prio) { /* this is an inversion of what libqb's qb_ipcs_request_rate_limit does */ enum qb_ipcs_rate_limit ret = QB_IPCS_RATE_NORMAL; switch (prio) { case QB_LOOP_LOW: ret = QB_IPCS_RATE_SLOW; break; case QB_LOOP_HIGH: ret = QB_IPCS_RATE_FAST; break; default: crm_trace("Invalid libqb's loop priority %d, assuming QB_LOOP_MED", prio); /* fall-through */ case QB_LOOP_MED: break; } return ret; } static int32_t gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn, int32_t add) { struct gio_to_qb_poll *adaptor; GIOChannel *channel; int32_t res = 0; res = qb_array_index(gio_map, fd, (void **)&adaptor); if (res < 0) { crm_err("Array lookup failed for fd=%d: %d", fd, res); return res; } crm_trace("Adding fd=%d to mainloop as adaptor %p", fd, adaptor); if (add && adaptor->source) { crm_err("Adaptor for descriptor %d is still in-use", fd); return -EEXIST; } if (!add && !adaptor->is_used) { crm_err("Adaptor for descriptor %d is not in-use", fd); return -ENOENT; } /* channel is created with ref_count = 1 */ channel = g_io_channel_unix_new(fd); if (!channel) { crm_err("No memory left to add fd=%d", fd); return -ENOMEM; } if (adaptor->source) { g_source_remove(adaptor->source); adaptor->source = 0; } /* Because unlike the poll() API, glib doesn't tell us about HUPs by default */ evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR); adaptor->fn = fn; adaptor->events = evts; adaptor->data = data; adaptor->p = p; adaptor->is_used++; adaptor->source = g_io_add_watch_full(channel, conv_prio_libqb2glib(p), evts, gio_read_socket, adaptor, gio_poll_destroy); /* Now that mainloop now holds a reference to channel, * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new(). * * This means that channel will be free'd by: * g_main_context_dispatch() * -> g_source_destroy_internal() * -> g_source_callback_unref() * shortly after gio_poll_destroy() completes */ g_io_channel_unref(channel); crm_trace("Added to mainloop with gsource id=%d", adaptor->source); if (adaptor->source > 0) { return 0; } return -EINVAL; } static int32_t gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE); } static int32_t gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE); } static int32_t gio_poll_dispatch_del(int32_t fd) { struct gio_to_qb_poll *adaptor; crm_trace("Looking for fd=%d", fd); if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) { if (adaptor->source) { g_source_remove(adaptor->source); adaptor->source = 0; } } return 0; } struct qb_ipcs_poll_handlers gio_poll_funcs = { .job_add = NULL, .dispatch_add = gio_poll_dispatch_add, .dispatch_mod = gio_poll_dispatch_mod, .dispatch_del = gio_poll_dispatch_del, }; static enum qb_ipc_type pick_ipc_type(enum qb_ipc_type requested) { const char *env = getenv("PCMK_ipc_type"); if (env && strcmp("shared-mem", env) == 0) { return QB_IPC_SHM; } else if (env && strcmp("socket", env) == 0) { return QB_IPC_SOCKET; } else if (env && strcmp("posix", env) == 0) { return QB_IPC_POSIX_MQ; } else if (env && strcmp("sysv", env) == 0) { return QB_IPC_SYSV_MQ; } else if (requested == QB_IPC_NATIVE) { /* We prefer shared memory because the server never blocks on * send. If part of a message fits into the socket, libqb * needs to block until the remainder can be sent also. * Otherwise the client will wait forever for the remaining * bytes. */ return QB_IPC_SHM; } return requested; } qb_ipcs_service_t * mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks) { return mainloop_add_ipc_server_with_prio(name, type, callbacks, QB_LOOP_MED); } qb_ipcs_service_t * mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks, enum qb_loop_priority prio) { int rc = 0; qb_ipcs_service_t *server = NULL; if (gio_map == NULL) { gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1); } server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks); if (server == NULL) { crm_err("Could not create %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc); return NULL; } if (prio != QB_LOOP_MED) { qb_ipcs_request_rate_limit(server, conv_libqb_prio2ratelimit(prio)); } #ifdef HAVE_IPCS_GET_BUFFER_SIZE /* All clients should use at least ipc_buffer_max as their buffer size */ qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size()); #endif qb_ipcs_poll_handlers_set(server, &gio_poll_funcs); rc = qb_ipcs_run(server); if (rc < 0) { crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc); return NULL; } return server; } void mainloop_del_ipc_server(qb_ipcs_service_t * server) { if (server) { qb_ipcs_destroy(server); } } struct mainloop_io_s { char *name; void *userdata; int fd; guint source; crm_ipc_t *ipc; GIOChannel *channel; int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata); int (*dispatch_fn_io) (gpointer userdata); void (*destroy_fn) (gpointer userdata); }; static gboolean mainloop_gio_callback(GIOChannel * gio, GIOCondition condition, gpointer data) { gboolean keep = TRUE; mainloop_io_t *client = data; CRM_ASSERT(client->fd == g_io_channel_unix_get_fd(gio)); if (condition & G_IO_IN) { if (client->ipc) { long rc = 0; int max = 10; do { rc = crm_ipc_read(client->ipc); if (rc <= 0) { crm_trace("Message acquisition from %s[%p] failed: %s (%ld)", client->name, client, pcmk_strerror(rc), rc); } else if (client->dispatch_fn_ipc) { const char *buffer = crm_ipc_buffer(client->ipc); crm_trace("New message from %s[%p] = %ld (I/O condition=%d)", client->name, client, rc, condition); if (client->dispatch_fn_ipc(buffer, rc, client->userdata) < 0) { crm_trace("Connection to %s no longer required", client->name); keep = FALSE; } } } while (keep && rc > 0 && --max > 0); } else { crm_trace("New message from %s[%p] %u", client->name, client, condition); if (client->dispatch_fn_io) { if (client->dispatch_fn_io(client->userdata) < 0) { crm_trace("Connection to %s no longer required", client->name); keep = FALSE; } } } } if (client->ipc && crm_ipc_connected(client->ipc) == FALSE) { crm_err("Connection to %s closed " CRM_XS "client=%p condition=%d", client->name, client, condition); keep = FALSE; } else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) { crm_trace("The connection %s[%p] has been closed (I/O condition=%d)", client->name, client, condition); keep = FALSE; } else if ((condition & G_IO_IN) == 0) { /* #define GLIB_SYSDEF_POLLIN =1 #define GLIB_SYSDEF_POLLPRI =2 #define GLIB_SYSDEF_POLLOUT =4 #define GLIB_SYSDEF_POLLERR =8 #define GLIB_SYSDEF_POLLHUP =16 #define GLIB_SYSDEF_POLLNVAL =32 typedef enum { G_IO_IN GLIB_SYSDEF_POLLIN, G_IO_OUT GLIB_SYSDEF_POLLOUT, G_IO_PRI GLIB_SYSDEF_POLLPRI, G_IO_ERR GLIB_SYSDEF_POLLERR, G_IO_HUP GLIB_SYSDEF_POLLHUP, G_IO_NVAL GLIB_SYSDEF_POLLNVAL } GIOCondition; A bitwise combination representing a condition to watch for on an event source. G_IO_IN There is data to read. G_IO_OUT Data can be written (without blocking). G_IO_PRI There is urgent data to read. G_IO_ERR Error condition. G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets). G_IO_NVAL Invalid request. The file descriptor is not open. */ crm_err("Strange condition: %d", condition); } /* keep == FALSE results in mainloop_gio_destroy() being called * just before the source is removed from mainloop */ return keep; } static void mainloop_gio_destroy(gpointer c) { mainloop_io_t *client = c; char *c_name = strdup(client->name); /* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c * client->channel will still have ref_count > 0... should be == 1 */ crm_trace("Destroying client %s[%p]", c_name, c); if (client->ipc) { crm_ipc_close(client->ipc); } if (client->destroy_fn) { void (*destroy_fn) (gpointer userdata) = client->destroy_fn; client->destroy_fn = NULL; destroy_fn(client->userdata); } if (client->ipc) { crm_ipc_t *ipc = client->ipc; client->ipc = NULL; crm_ipc_destroy(ipc); } crm_trace("Destroyed client %s[%p]", c_name, c); free(client->name); client->name = NULL; free(client); free(c_name); } -mainloop_io_t * -mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata, - struct ipc_client_callbacks *callbacks) +/*! + * \brief Connect to IPC and add it as a main loop source + * + * \param[in] ipc IPC connection to add + * \param[in] priority Event source priority to use for connection + * \param[in] userdata Data to register with callbacks + * \param[in] callbacks Dispatch and destroy callbacks for connection + * \param[out] source Newly allocated event source + * + * \return Standard Pacemaker return code + * + * \note On failure, the caller is still responsible for ipc. On success, the + * caller should call mainloop_del_ipc_client() when source is no longer + * needed, which will lead to the disconnection of the IPC later in the + * main loop if it is connected. However the IPC disconnects, + * mainloop_gio_destroy() will free ipc and source after calling the + * destroy callback. + */ +int +pcmk__add_mainloop_ipc(crm_ipc_t *ipc, int priority, void *userdata, + struct ipc_client_callbacks *callbacks, + mainloop_io_t **source) { - mainloop_io_t *client = NULL; - crm_ipc_t *conn = crm_ipc_new(name, max_size); + CRM_CHECK((ipc != NULL) && (callbacks != NULL), return EINVAL); - if (conn && crm_ipc_connect(conn)) { - int32_t fd = crm_ipc_get_fd(conn); + if (!crm_ipc_connect(ipc)) { + return ENOTCONN; + } + *source = mainloop_add_fd(crm_ipc_name(ipc), priority, crm_ipc_get_fd(ipc), + userdata, NULL); + if (*source == NULL) { + int rc = errno; - client = mainloop_add_fd(name, priority, fd, userdata, NULL); + crm_ipc_close(ipc); + return rc; } + (*source)->ipc = ipc; + (*source)->destroy_fn = callbacks->destroy; + (*source)->dispatch_fn_ipc = callbacks->dispatch; + return pcmk_rc_ok; +} - if (client == NULL) { - crm_perror(LOG_TRACE, "Connection to %s failed", name); - if (conn) { - crm_ipc_close(conn); - crm_ipc_destroy(conn); +mainloop_io_t * +mainloop_add_ipc_client(const char *name, int priority, size_t max_size, + void *userdata, struct ipc_client_callbacks *callbacks) +{ + crm_ipc_t *ipc = crm_ipc_new(name, max_size); + mainloop_io_t *source = NULL; + int rc = pcmk__add_mainloop_ipc(ipc, priority, userdata, callbacks, + &source); + + if (rc != pcmk_rc_ok) { + if (crm_log_level == LOG_STDOUT) { + fprintf(stderr, "Connection to %s failed: %s", + name, pcmk_rc_str(rc)); } + crm_ipc_destroy(ipc); return NULL; } - - client->ipc = conn; - client->destroy_fn = callbacks->destroy; - client->dispatch_fn_ipc = callbacks->dispatch; - return client; + return source; } void mainloop_del_ipc_client(mainloop_io_t * client) { mainloop_del_fd(client); } crm_ipc_t * mainloop_get_ipc_client(mainloop_io_t * client) { if (client) { return client->ipc; } return NULL; } mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks * callbacks) { mainloop_io_t *client = NULL; if (fd >= 0) { client = calloc(1, sizeof(mainloop_io_t)); if (client == NULL) { return NULL; } client->name = strdup(name); client->userdata = userdata; if (callbacks) { client->destroy_fn = callbacks->destroy; client->dispatch_fn_io = callbacks->dispatch; } client->fd = fd; client->channel = g_io_channel_unix_new(fd); client->source = g_io_add_watch_full(client->channel, priority, (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback, client, mainloop_gio_destroy); /* Now that mainloop now holds a reference to channel, * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new(). * * This means that channel will be free'd by: * g_main_context_dispatch() or g_source_remove() * -> g_source_destroy_internal() * -> g_source_callback_unref() * shortly after mainloop_gio_destroy() completes */ g_io_channel_unref(client->channel); crm_trace("Added connection %d for %s[%p].%d", client->source, client->name, client, fd); } else { errno = EINVAL; } return client; } void mainloop_del_fd(mainloop_io_t * client) { if (client != NULL) { crm_trace("Removing client %s[%p]", client->name, client); if (client->source) { /* Results in mainloop_gio_destroy() being called just * before the source is removed from mainloop */ g_source_remove(client->source); } } } static GListPtr child_list = NULL; pid_t mainloop_child_pid(mainloop_child_t * child) { return child->pid; } const char * mainloop_child_name(mainloop_child_t * child) { return child->desc; } int mainloop_child_timeout(mainloop_child_t * child) { return child->timeout; } void * mainloop_child_userdata(mainloop_child_t * child) { return child->privatedata; } void mainloop_clear_child_userdata(mainloop_child_t * child) { child->privatedata = NULL; } /* good function name */ static void child_free(mainloop_child_t *child) { if (child->timerid != 0) { crm_trace("Removing timer %d", child->timerid); g_source_remove(child->timerid); child->timerid = 0; } free(child->desc); free(child); } /* terrible function name */ static int child_kill_helper(mainloop_child_t *child) { int rc; if (child->flags & mainloop_leave_pid_group) { crm_debug("Kill pid %d only. leave group intact.", child->pid); rc = kill(child->pid, SIGKILL); } else { crm_debug("Kill pid %d's group", child->pid); rc = kill(-child->pid, SIGKILL); } if (rc < 0) { if (errno != ESRCH) { crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid); } return -errno; } return 0; } static gboolean child_timeout_callback(gpointer p) { mainloop_child_t *child = p; int rc = 0; child->timerid = 0; if (child->timeout) { crm_crit("%s process (PID %d) will not die!", child->desc, (int)child->pid); return FALSE; } rc = child_kill_helper(child); if (rc == -ESRCH) { /* Nothing left to do. pid doesn't exist */ return FALSE; } child->timeout = TRUE; crm_warn("%s process (PID %d) timed out", child->desc, (int)child->pid); child->timerid = g_timeout_add(5000, child_timeout_callback, child); return FALSE; } static bool child_waitpid(mainloop_child_t *child, int flags) { int rc = 0; int core = 0; int signo = 0; int status = 0; int exitcode = 0; bool callback_needed = true; rc = waitpid(child->pid, &status, flags); if (rc == 0) { // WNOHANG in flags, and child status is not available crm_trace("Child process %d (%s) still active", child->pid, child->desc); callback_needed = false; } else if (rc != child->pid) { /* According to POSIX, possible conditions: * - child->pid was non-positive (process group or any child), * and rc is specific child * - errno ECHILD (pid does not exist or is not child) * - errno EINVAL (invalid flags) * - errno EINTR (caller interrupted by signal) * * @TODO Handle these cases more specifically. */ signo = SIGCHLD; exitcode = 1; crm_notice("Wait for child process %d (%s) interrupted: %s", child->pid, child->desc, pcmk_strerror(errno)); } else if (WIFEXITED(status)) { exitcode = WEXITSTATUS(status); crm_trace("Child process %d (%s) exited with status %d", child->pid, child->desc, exitcode); } else if (WIFSIGNALED(status)) { signo = WTERMSIG(status); crm_trace("Child process %d (%s) exited with signal %d (%s)", child->pid, child->desc, signo, strsignal(signo)); #ifdef WCOREDUMP // AIX, SunOS, maybe others } else if (WCOREDUMP(status)) { core = 1; crm_err("Child process %d (%s) dumped core", child->pid, child->desc); #endif } else { // flags must contain WUNTRACED and/or WCONTINUED to reach this crm_trace("Child process %d (%s) stopped or continued", child->pid, child->desc); callback_needed = false; } if (callback_needed && child->callback) { child->callback(child, child->pid, core, signo, exitcode); } return callback_needed; } static void child_death_dispatch(int signal) { for (GList *iter = child_list; iter; ) { GList *saved = iter; mainloop_child_t *child = iter->data; iter = iter->next; if (child_waitpid(child, WNOHANG)) { crm_trace("Removing completed process %d from child list", child->pid); child_list = g_list_remove_link(child_list, saved); g_list_free(saved); child_free(child); } } } static gboolean child_signal_init(gpointer p) { crm_trace("Installed SIGCHLD handler"); /* Do NOT use g_child_watch_add() and friends, they rely on pthreads */ mainloop_add_signal(SIGCHLD, child_death_dispatch); /* In case they terminated before the signal handler was installed */ child_death_dispatch(SIGCHLD); return FALSE; } gboolean mainloop_child_kill(pid_t pid) { GListPtr iter; mainloop_child_t *child = NULL; mainloop_child_t *match = NULL; /* It is impossible to block SIGKILL, this allows us to * call waitpid without WNOHANG flag.*/ int waitflags = 0, rc = 0; for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) { child = iter->data; if (pid == child->pid) { match = child; } } if (match == NULL) { return FALSE; } rc = child_kill_helper(match); if(rc == -ESRCH) { /* It's gone, but hasn't shown up in waitpid() yet. Wait until we get * SIGCHLD and let handler clean it up as normal (so we get the correct * return code/status). The blocking alternative would be to call * child_waitpid(match, 0). */ crm_trace("Waiting for signal that child process %d completed", match->pid); return TRUE; } else if(rc != 0) { /* If KILL for some other reason set the WNOHANG flag since we * can't be certain what happened. */ waitflags = WNOHANG; } if (!child_waitpid(match, waitflags)) { /* not much we can do if this occurs */ return FALSE; } child_list = g_list_remove(child_list, match); child_free(match); return TRUE; } /* Create/Log a new tracked process * To track a process group, use -pid * * @TODO Using a non-positive pid (i.e. any child, or process group) would * likely not be useful since we will free the child after the first * completed process. */ void mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags, void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode)) { static bool need_init = TRUE; mainloop_child_t *child = g_new(mainloop_child_t, 1); child->pid = pid; child->timerid = 0; child->timeout = FALSE; child->privatedata = privatedata; child->callback = callback; child->flags = flags; if(desc) { child->desc = strdup(desc); } if (timeout) { child->timerid = g_timeout_add(timeout, child_timeout_callback, child); } child_list = g_list_append(child_list, child); if(need_init) { need_init = FALSE; /* SIGCHLD processing has to be invoked from mainloop. * We do not want it to be possible to both add a child pid * to mainloop, and have the pid's exit callback invoked within * the same callstack. */ g_timeout_add(1, child_signal_init, NULL); } } void mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata, void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode)) { mainloop_child_add_with_flags(pid, timeout, desc, privatedata, 0, callback); } struct mainloop_timer_s { guint id; guint period_ms; bool repeat; char *name; GSourceFunc cb; void *userdata; }; static gboolean mainloop_timer_cb(gpointer user_data) { int id = 0; bool repeat = FALSE; struct mainloop_timer_s *t = user_data; CRM_ASSERT(t != NULL); id = t->id; t->id = 0; /* Ensure it's unset during callbacks so that * mainloop_timer_running() works as expected */ if(t->cb) { crm_trace("Invoking callbacks for timer %s", t->name); repeat = t->repeat; if(t->cb(t->userdata) == FALSE) { crm_trace("Timer %s complete", t->name); repeat = FALSE; } } if(repeat) { /* Restore if repeating */ t->id = id; } return repeat; } bool mainloop_timer_running(mainloop_timer_t *t) { if(t && t->id != 0) { return TRUE; } return FALSE; } void mainloop_timer_start(mainloop_timer_t *t) { mainloop_timer_stop(t); if(t && t->period_ms > 0) { crm_trace("Starting timer %s", t->name); t->id = g_timeout_add(t->period_ms, mainloop_timer_cb, t); } } void mainloop_timer_stop(mainloop_timer_t *t) { if(t && t->id != 0) { crm_trace("Stopping timer %s", t->name); g_source_remove(t->id); t->id = 0; } } guint mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms) { guint last = 0; if(t) { last = t->period_ms; t->period_ms = period_ms; } if(t && t->id != 0 && last != t->period_ms) { mainloop_timer_start(t); } return last; } mainloop_timer_t * mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata) { mainloop_timer_t *t = calloc(1, sizeof(mainloop_timer_t)); if(t) { if(name) { t->name = crm_strdup_printf("%s-%u-%d", name, period_ms, repeat); } else { t->name = crm_strdup_printf("%p-%u-%d", t, period_ms, repeat); } t->id = 0; t->period_ms = period_ms; t->repeat = repeat; t->cb = cb; t->userdata = userdata; crm_trace("Created timer %s with %p %p", t->name, userdata, t->userdata); } return t; } void mainloop_timer_del(mainloop_timer_t *t) { if(t) { crm_trace("Destroying timer %s", t->name); mainloop_timer_stop(t); free(t->name); free(t); } } /* * Helpers to make sure certain events aren't lost at shutdown */ static gboolean drain_timeout_cb(gpointer user_data) { bool *timeout_popped = (bool*) user_data; *timeout_popped = TRUE; return FALSE; } /*! * \brief Drain some remaining main loop events then quit it * * \param[in] mloop Main loop to drain and quit * \param[in] n Drain up to this many pending events */ void pcmk_quit_main_loop(GMainLoop *mloop, unsigned int n) { if ((mloop != NULL) && g_main_loop_is_running(mloop)) { GMainContext *ctx = g_main_loop_get_context(mloop); /* Drain up to n events in case some memory clean-up is pending * (helpful to reduce noise in valgrind output). */ for (int i = 0; (i < n) && g_main_context_pending(ctx); ++i) { g_main_context_dispatch(ctx); } g_main_loop_quit(mloop); } } /*! * \brief Process main loop events while a certain condition is met * * \param[in] mloop Main loop to process * \param[in] timer_ms Don't process longer than this amount of time * \param[in] check Function that returns TRUE if events should be processed * * \note This function is intended to be called at shutdown if certain important * events should not be missed. The caller would likely quit the main loop * or exit after calling this function. The check() function will be * passed the remaining timeout in milliseconds. */ void pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool (*check)(guint)) { bool timeout_popped = FALSE; guint timer = 0; GMainContext *ctx = NULL; CRM_CHECK(mloop && check, return); ctx = g_main_loop_get_context(mloop); if (ctx) { time_t start_time = time(NULL); timer = g_timeout_add(timer_ms, drain_timeout_cb, &timeout_popped); while (!timeout_popped && check(timer_ms - (time(NULL) - start_time) * 1000)) { g_main_context_iteration(ctx, TRUE); } } if (!timeout_popped && (timer > 0)) { g_source_remove(timer); } } // Deprecated functions kept only for backward API compatibility gboolean crm_signal(int sig, void (*dispatch) (int sig)); /* * \brief Use crm_signal_handler() instead * \deprecated */ gboolean crm_signal(int sig, void (*dispatch) (int sig)) { return crm_signal_handler(sig, dispatch) != SIG_ERR; }