Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F4512136
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
226 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/daemons/controld/controld_fsa.c b/daemons/controld/controld_fsa.c
index a0beff0b3d..4ec1b5f31f 100644
--- a/daemons/controld/controld_fsa.c
+++ b/daemons/controld/controld_fsa.c
@@ -1,730 +1,732 @@
/*
- * Copyright 2004-2024 the Pacemaker project contributors
+ * Copyright 2004-2025 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU General Public License version 2
* or later (GPLv2+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
+#include <inttypes.h> // PRIx64
#include <sys/param.h>
#include <stdio.h>
#include <stdint.h> // uint64_t
#include <string.h>
#include <time.h>
#include <crm/crm.h>
#include <crm/lrmd.h>
#include <crm/cib.h>
#include <crm/common/xml.h>
#include <crm/cluster/election_internal.h>
#include <crm/cluster.h>
#include <pacemaker-controld.h>
//! Triggers an FSA invocation
static crm_trigger_t *fsa_trigger = NULL;
#define DOT_PREFIX "actions:trace: "
#define do_dot_log(fmt, args...) crm_trace( fmt, ##args)
static void do_state_transition(enum crmd_fsa_state cur_state,
enum crmd_fsa_state next_state,
fsa_data_t *msg_data);
void s_crmd_fsa_actions(fsa_data_t * fsa_data);
void log_fsa_input(fsa_data_t * stored_msg);
void init_dotfile(void);
void
init_dotfile(void)
{
do_dot_log(DOT_PREFIX "digraph \"g\" {");
do_dot_log(DOT_PREFIX " size = \"30,30\"");
do_dot_log(DOT_PREFIX " graph [");
do_dot_log(DOT_PREFIX " fontsize = \"12\"");
do_dot_log(DOT_PREFIX " fontname = \"Times-Roman\"");
do_dot_log(DOT_PREFIX " fontcolor = \"black\"");
do_dot_log(DOT_PREFIX " bb = \"0,0,398.922306,478.927856\"");
do_dot_log(DOT_PREFIX " color = \"black\"");
do_dot_log(DOT_PREFIX " ]");
do_dot_log(DOT_PREFIX " node [");
do_dot_log(DOT_PREFIX " fontsize = \"12\"");
do_dot_log(DOT_PREFIX " fontname = \"Times-Roman\"");
do_dot_log(DOT_PREFIX " fontcolor = \"black\"");
do_dot_log(DOT_PREFIX " shape = \"ellipse\"");
do_dot_log(DOT_PREFIX " color = \"black\"");
do_dot_log(DOT_PREFIX " ]");
do_dot_log(DOT_PREFIX " edge [");
do_dot_log(DOT_PREFIX " fontsize = \"12\"");
do_dot_log(DOT_PREFIX " fontname = \"Times-Roman\"");
do_dot_log(DOT_PREFIX " fontcolor = \"black\"");
do_dot_log(DOT_PREFIX " color = \"black\"");
do_dot_log(DOT_PREFIX " ]");
do_dot_log(DOT_PREFIX "// special nodes");
do_dot_log(DOT_PREFIX " \"S_PENDING\" ");
do_dot_log(DOT_PREFIX " [");
do_dot_log(DOT_PREFIX " color = \"blue\"");
do_dot_log(DOT_PREFIX " fontcolor = \"blue\"");
do_dot_log(DOT_PREFIX " ]");
do_dot_log(DOT_PREFIX " \"S_TERMINATE\" ");
do_dot_log(DOT_PREFIX " [");
do_dot_log(DOT_PREFIX " color = \"red\"");
do_dot_log(DOT_PREFIX " fontcolor = \"red\"");
do_dot_log(DOT_PREFIX " ]");
do_dot_log(DOT_PREFIX "// DC only nodes");
do_dot_log(DOT_PREFIX " \"S_INTEGRATION\" [ fontcolor = \"green\" ]");
do_dot_log(DOT_PREFIX " \"S_POLICY_ENGINE\" [ fontcolor = \"green\" ]");
do_dot_log(DOT_PREFIX " \"S_TRANSITION_ENGINE\" [ fontcolor = \"green\" ]");
do_dot_log(DOT_PREFIX " \"S_RELEASE_DC\" [ fontcolor = \"green\" ]");
do_dot_log(DOT_PREFIX " \"S_IDLE\" [ fontcolor = \"green\" ]");
}
static void
do_fsa_action(fsa_data_t * fsa_data, long long an_action,
void (*function) (long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input cur_input, fsa_data_t * msg_data))
{
controld_clear_fsa_action_flags(an_action);
crm_trace(DOT_PREFIX "\t// %s", fsa_action2string(an_action));
function(an_action, fsa_data->fsa_cause, controld_globals.fsa_state,
fsa_data->fsa_input, fsa_data);
}
static const uint64_t startup_actions =
A_STARTUP | A_CIB_START | A_LRM_CONNECT | A_HA_CONNECT | A_READCONFIG |
A_STARTED | A_CL_JOIN_QUERY;
// A_LOG, A_WARN, A_ERROR
void
do_log(long long action, enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input, fsa_data_t *msg_data)
{
unsigned log_type = LOG_TRACE;
if (action & A_LOG) {
log_type = LOG_INFO;
} else if (action & A_WARN) {
log_type = LOG_WARNING;
} else if (action & A_ERROR) {
log_type = LOG_ERR;
}
do_crm_log(log_type, "Input %s received in state %s from %s",
fsa_input2string(msg_data->fsa_input),
fsa_state2string(cur_state), msg_data->origin);
if (msg_data->data_type == fsa_dt_ha_msg) {
ha_msg_input_t *input = fsa_typed_data(msg_data->data_type);
crm_log_xml_debug(input->msg, __func__);
} else if (msg_data->data_type == fsa_dt_xml) {
xmlNode *input = fsa_typed_data(msg_data->data_type);
crm_log_xml_debug(input, __func__);
} else if (msg_data->data_type == fsa_dt_lrm) {
lrmd_event_data_t *input = fsa_typed_data(msg_data->data_type);
do_crm_log(log_type,
"Resource %s: Call ID %d returned %d (%d)."
" New status if rc=0: %s",
input->rsc_id, input->call_id, input->rc,
input->op_status, (char *)input->user_data);
}
}
/*!
* \internal
* \brief Initialize the FSA trigger
*/
void
controld_init_fsa_trigger(void)
{
fsa_trigger = mainloop_add_trigger(G_PRIORITY_HIGH, crm_fsa_trigger, NULL);
}
/*!
* \internal
* \brief Destroy the FSA trigger
*/
void
controld_destroy_fsa_trigger(void)
{
// This basically will not work, since mainloop has a reference to it
mainloop_destroy_trigger(fsa_trigger);
fsa_trigger = NULL;
}
/*!
* \internal
* \brief Trigger an FSA invocation
*
* \param[in] fn Calling function name
* \param[in] line Line number where call occurred
*/
void
controld_trigger_fsa_as(const char *fn, int line)
{
if (fsa_trigger != NULL) {
crm_trace("%s:%d - Triggered FSA invocation", fn, line);
mainloop_set_trigger(fsa_trigger);
}
}
enum crmd_fsa_state
s_crmd_fsa(enum crmd_fsa_cause cause)
{
controld_globals_t *globals = &controld_globals;
fsa_data_t *fsa_data = NULL;
uint64_t register_copy = controld_globals.fsa_input_register;
uint64_t new_actions = A_NOTHING;
enum crmd_fsa_state last_state;
crm_trace("FSA invoked with Cause: %s\tState: %s",
fsa_cause2string(cause),
fsa_state2string(globals->fsa_state));
fsa_dump_actions(controld_globals.fsa_actions, "Initial");
controld_clear_global_flags(controld_fsa_is_stalled);
if ((controld_globals.fsa_message_queue == NULL)
&& (controld_globals.fsa_actions != A_NOTHING)) {
/* fake the first message so we can get into the loop */
fsa_data = pcmk__assert_alloc(1, sizeof(fsa_data_t));
fsa_data->fsa_input = I_NULL;
fsa_data->fsa_cause = C_FSA_INTERNAL;
fsa_data->origin = __func__;
fsa_data->data_type = fsa_dt_none;
controld_globals.fsa_message_queue
= g_list_append(controld_globals.fsa_message_queue, fsa_data);
}
while ((controld_globals.fsa_message_queue != NULL)
&& !pcmk_is_set(controld_globals.flags, controld_fsa_is_stalled)) {
crm_trace("Checking messages (%d remaining)",
g_list_length(controld_globals.fsa_message_queue));
fsa_data = get_message();
if(fsa_data == NULL) {
continue;
}
log_fsa_input(fsa_data);
/* add any actions back to the queue */
controld_set_fsa_action_flags(fsa_data->actions);
fsa_dump_actions(fsa_data->actions, "Restored actions");
/* get the next batch of actions */
new_actions = controld_fsa_get_action(fsa_data->fsa_input);
controld_set_fsa_action_flags(new_actions);
fsa_dump_actions(new_actions, "New actions");
if (fsa_data->fsa_input != I_NULL && fsa_data->fsa_input != I_ROUTER) {
crm_debug("Processing %s: [ state=%s cause=%s origin=%s ]",
fsa_input2string(fsa_data->fsa_input),
fsa_state2string(globals->fsa_state),
fsa_cause2string(fsa_data->fsa_cause), fsa_data->origin);
}
/* logging : *before* the state is changed */
if (pcmk_is_set(controld_globals.fsa_actions, A_ERROR)) {
do_fsa_action(fsa_data, A_ERROR, do_log);
}
if (pcmk_is_set(controld_globals.fsa_actions, A_WARN)) {
do_fsa_action(fsa_data, A_WARN, do_log);
}
if (pcmk_is_set(controld_globals.fsa_actions, A_LOG)) {
do_fsa_action(fsa_data, A_LOG, do_log);
}
/* update state variables */
last_state = globals->fsa_state;
globals->fsa_state = controld_fsa_get_next_state(fsa_data->fsa_input);
/*
* Remove certain actions during shutdown
*/
if ((globals->fsa_state == S_STOPPING)
|| pcmk_is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) {
controld_clear_fsa_action_flags(startup_actions);
}
/*
* Hook for change of state.
* Allows actions to be added or removed when entering a state
*/
if (last_state != globals->fsa_state) {
do_state_transition(last_state, globals->fsa_state, fsa_data);
} else {
do_dot_log(DOT_PREFIX "\t// FSA input: State=%s \tCause=%s"
" \tInput=%s \tOrigin=%s() \tid=%d",
fsa_state2string(globals->fsa_state),
fsa_cause2string(fsa_data->fsa_cause),
fsa_input2string(fsa_data->fsa_input), fsa_data->origin, fsa_data->id);
}
/* start doing things... */
s_crmd_fsa_actions(fsa_data);
delete_fsa_input(fsa_data);
}
if ((controld_globals.fsa_message_queue != NULL)
|| (controld_globals.fsa_actions != A_NOTHING)
|| pcmk_is_set(controld_globals.flags, controld_fsa_is_stalled)) {
- crm_debug("Exiting the FSA: queue=%d, fsa_actions=%#llx, stalled=%s",
+ crm_debug("Exiting the FSA: queue=%d, fsa_actions=%" PRIx64
+ ", stalled=%s",
g_list_length(controld_globals.fsa_message_queue),
- (unsigned long long) controld_globals.fsa_actions,
+ controld_globals.fsa_actions,
pcmk__flag_text(controld_globals.flags,
controld_fsa_is_stalled));
} else {
crm_trace("Exiting the FSA");
}
/* cleanup inputs? */
if (register_copy != controld_globals.fsa_input_register) {
uint64_t same = register_copy & controld_globals.fsa_input_register;
fsa_dump_inputs(LOG_DEBUG, "Added",
controld_globals.fsa_input_register ^ same);
fsa_dump_inputs(LOG_DEBUG, "Removed", register_copy ^ same);
}
fsa_dump_actions(controld_globals.fsa_actions, "Remaining");
fsa_dump_queue(LOG_DEBUG);
return globals->fsa_state;
}
void
s_crmd_fsa_actions(fsa_data_t * fsa_data)
{
/*
* Process actions in order of priority but do only one
* action at a time to avoid complicating the ordering.
*/
CRM_CHECK(fsa_data != NULL, return);
while ((controld_globals.fsa_actions != A_NOTHING)
&& !pcmk_is_set(controld_globals.flags, controld_fsa_is_stalled)) {
/* regular action processing in order of action priority
*
* Make sure all actions that connect to required systems
* are performed first
*/
if (pcmk_is_set(controld_globals.fsa_actions, A_ERROR)) {
do_fsa_action(fsa_data, A_ERROR, do_log);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_WARN)) {
do_fsa_action(fsa_data, A_WARN, do_log);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_LOG)) {
do_fsa_action(fsa_data, A_LOG, do_log);
/* get out of here NOW! before anything worse happens */
} else if (pcmk_is_set(controld_globals.fsa_actions, A_EXIT_1)) {
do_fsa_action(fsa_data, A_EXIT_1, do_exit);
/* sub-system restart */
} else if (pcmk_all_flags_set(controld_globals.fsa_actions,
O_LRM_RECONNECT)) {
do_fsa_action(fsa_data, O_LRM_RECONNECT, do_lrm_control);
} else if (pcmk_all_flags_set(controld_globals.fsa_actions,
O_CIB_RESTART)) {
do_fsa_action(fsa_data, O_CIB_RESTART, do_cib_control);
} else if (pcmk_all_flags_set(controld_globals.fsa_actions,
O_PE_RESTART)) {
do_fsa_action(fsa_data, O_PE_RESTART, do_pe_control);
} else if (pcmk_all_flags_set(controld_globals.fsa_actions,
O_TE_RESTART)) {
do_fsa_action(fsa_data, O_TE_RESTART, do_te_control);
/* essential start tasks */
} else if (pcmk_is_set(controld_globals.fsa_actions, A_STARTUP)) {
do_fsa_action(fsa_data, A_STARTUP, do_startup);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_CIB_START)) {
do_fsa_action(fsa_data, A_CIB_START, do_cib_control);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_HA_CONNECT)) {
do_fsa_action(fsa_data, A_HA_CONNECT, do_ha_control);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_READCONFIG)) {
do_fsa_action(fsa_data, A_READCONFIG, do_read_config);
/* sub-system start/connect */
} else if (pcmk_is_set(controld_globals.fsa_actions, A_LRM_CONNECT)) {
do_fsa_action(fsa_data, A_LRM_CONNECT, do_lrm_control);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_TE_START)) {
do_fsa_action(fsa_data, A_TE_START, do_te_control);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_PE_START)) {
do_fsa_action(fsa_data, A_PE_START, do_pe_control);
/* Timers */
} else if (pcmk_is_set(controld_globals.fsa_actions, A_DC_TIMER_STOP)) {
do_fsa_action(fsa_data, A_DC_TIMER_STOP, do_timer_control);
} else if (pcmk_is_set(controld_globals.fsa_actions,
A_INTEGRATE_TIMER_STOP)) {
do_fsa_action(fsa_data, A_INTEGRATE_TIMER_STOP, do_timer_control);
} else if (pcmk_is_set(controld_globals.fsa_actions,
A_INTEGRATE_TIMER_START)) {
do_fsa_action(fsa_data, A_INTEGRATE_TIMER_START, do_timer_control);
} else if (pcmk_is_set(controld_globals.fsa_actions,
A_FINALIZE_TIMER_STOP)) {
do_fsa_action(fsa_data, A_FINALIZE_TIMER_STOP, do_timer_control);
} else if (pcmk_is_set(controld_globals.fsa_actions,
A_FINALIZE_TIMER_START)) {
do_fsa_action(fsa_data, A_FINALIZE_TIMER_START, do_timer_control);
/*
* Highest priority actions
*/
} else if (pcmk_is_set(controld_globals.fsa_actions, A_MSG_ROUTE)) {
do_fsa_action(fsa_data, A_MSG_ROUTE, do_msg_route);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_RECOVER)) {
do_fsa_action(fsa_data, A_RECOVER, do_recover);
} else if (pcmk_is_set(controld_globals.fsa_actions,
A_CL_JOIN_RESULT)) {
do_fsa_action(fsa_data, A_CL_JOIN_RESULT,
do_cl_join_finalize_respond);
} else if (pcmk_is_set(controld_globals.fsa_actions,
A_CL_JOIN_REQUEST)) {
do_fsa_action(fsa_data, A_CL_JOIN_REQUEST,
do_cl_join_offer_respond);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_SHUTDOWN_REQ)) {
do_fsa_action(fsa_data, A_SHUTDOWN_REQ, do_shutdown_req);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_ELECTION_VOTE)) {
do_fsa_action(fsa_data, A_ELECTION_VOTE, do_election_vote);
} else if (pcmk_is_set(controld_globals.fsa_actions,
A_ELECTION_COUNT)) {
do_fsa_action(fsa_data, A_ELECTION_COUNT, do_election_count_vote);
/*
* High priority actions
*/
} else if (pcmk_is_set(controld_globals.fsa_actions, A_STARTED)) {
do_fsa_action(fsa_data, A_STARTED, do_started);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_CL_JOIN_QUERY)) {
do_fsa_action(fsa_data, A_CL_JOIN_QUERY, do_cl_join_query);
} else if (pcmk_is_set(controld_globals.fsa_actions,
A_DC_TIMER_START)) {
do_fsa_action(fsa_data, A_DC_TIMER_START, do_timer_control);
/*
* Medium priority actions
* - Membership
*/
} else if (pcmk_is_set(controld_globals.fsa_actions, A_DC_TAKEOVER)) {
do_fsa_action(fsa_data, A_DC_TAKEOVER, do_dc_takeover);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_DC_RELEASE)) {
do_fsa_action(fsa_data, A_DC_RELEASE, do_dc_release);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_DC_JOIN_FINAL)) {
do_fsa_action(fsa_data, A_DC_JOIN_FINAL, do_dc_join_final);
} else if (pcmk_is_set(controld_globals.fsa_actions,
A_ELECTION_CHECK)) {
do_fsa_action(fsa_data, A_ELECTION_CHECK, do_election_check);
} else if (pcmk_is_set(controld_globals.fsa_actions,
A_ELECTION_START)) {
do_fsa_action(fsa_data, A_ELECTION_START, do_election_vote);
} else if (pcmk_is_set(controld_globals.fsa_actions,
A_DC_JOIN_OFFER_ALL)) {
do_fsa_action(fsa_data, A_DC_JOIN_OFFER_ALL, do_dc_join_offer_all);
} else if (pcmk_is_set(controld_globals.fsa_actions,
A_DC_JOIN_OFFER_ONE)) {
do_fsa_action(fsa_data, A_DC_JOIN_OFFER_ONE, do_dc_join_offer_one);
} else if (pcmk_is_set(controld_globals.fsa_actions,
A_DC_JOIN_PROCESS_REQ)) {
do_fsa_action(fsa_data, A_DC_JOIN_PROCESS_REQ,
do_dc_join_filter_offer);
} else if (pcmk_is_set(controld_globals.fsa_actions,
A_DC_JOIN_PROCESS_ACK)) {
do_fsa_action(fsa_data, A_DC_JOIN_PROCESS_ACK, do_dc_join_ack);
} else if (pcmk_is_set(controld_globals.fsa_actions,
A_DC_JOIN_FINALIZE)) {
do_fsa_action(fsa_data, A_DC_JOIN_FINALIZE, do_dc_join_finalize);
} else if (pcmk_is_set(controld_globals.fsa_actions,
A_CL_JOIN_ANNOUNCE)) {
do_fsa_action(fsa_data, A_CL_JOIN_ANNOUNCE, do_cl_join_announce);
/*
* Low(er) priority actions
* Make sure the CIB is always updated before invoking the
* scheduler, and the scheduler before the transition engine.
*/
} else if (pcmk_is_set(controld_globals.fsa_actions, A_TE_HALT)) {
do_fsa_action(fsa_data, A_TE_HALT, do_te_invoke);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_TE_CANCEL)) {
do_fsa_action(fsa_data, A_TE_CANCEL, do_te_invoke);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_LRM_INVOKE)) {
do_fsa_action(fsa_data, A_LRM_INVOKE, do_lrm_invoke);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_PE_INVOKE)) {
do_fsa_action(fsa_data, A_PE_INVOKE, do_pe_invoke);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_TE_INVOKE)) {
do_fsa_action(fsa_data, A_TE_INVOKE, do_te_invoke);
/* Shutdown actions */
} else if (pcmk_is_set(controld_globals.fsa_actions, A_DC_RELEASED)) {
do_fsa_action(fsa_data, A_DC_RELEASED, do_dc_release);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_PE_STOP)) {
do_fsa_action(fsa_data, A_PE_STOP, do_pe_control);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_TE_STOP)) {
do_fsa_action(fsa_data, A_TE_STOP, do_te_control);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_SHUTDOWN)) {
do_fsa_action(fsa_data, A_SHUTDOWN, do_shutdown);
} else if (pcmk_is_set(controld_globals.fsa_actions,
A_LRM_DISCONNECT)) {
do_fsa_action(fsa_data, A_LRM_DISCONNECT, do_lrm_control);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_HA_DISCONNECT)) {
do_fsa_action(fsa_data, A_HA_DISCONNECT, do_ha_control);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_CIB_STOP)) {
do_fsa_action(fsa_data, A_CIB_STOP, do_cib_control);
} else if (pcmk_is_set(controld_globals.fsa_actions, A_STOP)) {
do_fsa_action(fsa_data, A_STOP, do_stop);
/* exit gracefully */
} else if (pcmk_is_set(controld_globals.fsa_actions, A_EXIT_0)) {
do_fsa_action(fsa_data, A_EXIT_0, do_exit);
/* Error checking and reporting */
} else {
- crm_err("Action %s not supported " QB_XS " %#llx",
+ crm_err("Action %s not supported " QB_XS " %" PRIx64,
fsa_action2string(controld_globals.fsa_actions),
- (unsigned long long) controld_globals.fsa_actions);
+ controld_globals.fsa_actions);
register_fsa_error_adv(C_FSA_INTERNAL, I_ERROR, fsa_data, NULL,
__func__);
}
}
}
void
log_fsa_input(fsa_data_t * stored_msg)
{
pcmk__assert(stored_msg != NULL);
crm_trace("Processing queued input %d", stored_msg->id);
if (stored_msg->fsa_cause == C_LRM_OP_CALLBACK) {
crm_trace("FSA processing LRM callback from %s", stored_msg->origin);
} else if (stored_msg->data == NULL) {
crm_trace("FSA processing input from %s", stored_msg->origin);
} else {
ha_msg_input_t *ha_input = fsa_typed_data_adv(stored_msg, fsa_dt_ha_msg,
__func__);
crm_trace("FSA processing XML message from %s", stored_msg->origin);
crm_log_xml_trace(ha_input->xml, "FSA message data");
}
}
static void
check_join_counts(fsa_data_t *msg_data)
{
int count;
guint npeers;
count = crmd_join_phase_count(controld_join_finalized);
if (count > 0) {
crm_err("%d cluster node%s failed to confirm join",
count, pcmk__plural_s(count));
crmd_join_phase_log(LOG_NOTICE);
return;
}
npeers = pcmk__cluster_num_active_nodes();
count = crmd_join_phase_count(controld_join_confirmed);
if (count == npeers) {
if (npeers == 1) {
crm_debug("Sole active cluster node is fully joined");
} else {
crm_debug("All %d active cluster nodes are fully joined", count);
}
} else if (count > npeers) {
crm_err("New election needed because more nodes confirmed join "
"than are in membership (%d > %u)", count, npeers);
register_fsa_input(C_FSA_INTERNAL, I_ELECTION, NULL);
} else if (controld_globals.membership_id != controld_globals.peer_seq) {
crm_info("New join needed because membership changed (%llu -> %llu)",
controld_globals.membership_id, controld_globals.peer_seq);
register_fsa_input_before(C_FSA_INTERNAL, I_NODE_JOIN, NULL);
} else {
crm_warn("Only %d of %u active cluster nodes fully joined "
"(%d did not respond to offer)",
count, npeers, crmd_join_phase_count(controld_join_welcomed));
}
}
static void
do_state_transition(enum crmd_fsa_state cur_state,
enum crmd_fsa_state next_state, fsa_data_t *msg_data)
{
int level = LOG_INFO;
int count = 0;
gboolean clear_recovery_bit = TRUE;
#if 0
uint64_t original_fsa_actions = controld_globals.fsa_actions;
#endif
enum crmd_fsa_cause cause = msg_data->fsa_cause;
enum crmd_fsa_input current_input = msg_data->fsa_input;
const char *state_from = fsa_state2string(cur_state);
const char *state_to = fsa_state2string(next_state);
const char *input = fsa_input2string(current_input);
CRM_LOG_ASSERT(cur_state != next_state);
do_dot_log(DOT_PREFIX "\t%s -> %s [ label=%s cause=%s origin=%s ]",
state_from, state_to, input, fsa_cause2string(cause), msg_data->origin);
if (cur_state == S_IDLE || next_state == S_IDLE) {
level = LOG_NOTICE;
} else if (cur_state == S_NOT_DC || next_state == S_NOT_DC) {
level = LOG_NOTICE;
} else if (cur_state == S_ELECTION) {
level = LOG_NOTICE;
} else if (cur_state == S_STARTING) {
level = LOG_NOTICE;
} else if (next_state == S_RECOVERY) {
level = LOG_WARNING;
}
do_crm_log(level, "State transition %s -> %s "
QB_XS " input=%s cause=%s origin=%s",
state_from, state_to, input, fsa_cause2string(cause),
msg_data->origin);
if (next_state != S_ELECTION && cur_state != S_RELEASE_DC) {
controld_stop_current_election_timeout();
}
if (next_state == S_INTEGRATION) {
controld_set_fsa_action_flags(A_INTEGRATE_TIMER_START);
} else {
controld_set_fsa_action_flags(A_INTEGRATE_TIMER_STOP);
}
if (next_state == S_FINALIZE_JOIN) {
controld_set_fsa_action_flags(A_FINALIZE_TIMER_START);
} else {
controld_set_fsa_action_flags(A_FINALIZE_TIMER_STOP);
}
if (next_state != S_PENDING) {
controld_set_fsa_action_flags(A_DC_TIMER_STOP);
}
if (next_state != S_IDLE) {
controld_stop_recheck_timer();
}
if (cur_state == S_FINALIZE_JOIN && next_state == S_POLICY_ENGINE) {
populate_cib_nodes(node_update_quick|node_update_all, __func__);
}
switch (next_state) {
case S_PENDING:
{
cib_t *cib_conn = controld_globals.cib_conn;
cib_conn->cmds->set_secondary(cib_conn, cib_none);
}
update_dc(NULL);
break;
case S_ELECTION:
update_dc(NULL);
break;
case S_NOT_DC:
controld_reset_counter_election_timer();
purge_stonith_cleanup();
if (pcmk_is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) {
crm_info("(Re)Issuing shutdown request now" " that we have a new DC");
controld_set_fsa_action_flags(A_SHUTDOWN_REQ);
}
CRM_LOG_ASSERT(controld_globals.dc_name != NULL);
if (controld_globals.dc_name == NULL) {
crm_err("Reached S_NOT_DC without a DC" " being recorded");
}
break;
case S_RECOVERY:
clear_recovery_bit = FALSE;
break;
case S_FINALIZE_JOIN:
CRM_LOG_ASSERT(AM_I_DC);
if (cause == C_TIMER_POPPED) {
crm_warn("Progressed to state %s after %s",
fsa_state2string(next_state), fsa_cause2string(cause));
}
count = crmd_join_phase_count(controld_join_welcomed);
if (count > 0) {
crm_warn("%d cluster node%s failed to respond to join offer",
count, pcmk__plural_s(count));
crmd_join_phase_log(LOG_NOTICE);
} else {
crm_debug("All cluster nodes (%d) responded to join offer",
crmd_join_phase_count(controld_join_integrated));
}
break;
case S_POLICY_ENGINE:
controld_reset_counter_election_timer();
CRM_LOG_ASSERT(AM_I_DC);
if (cause == C_TIMER_POPPED) {
crm_info("Progressed to state %s after %s",
fsa_state2string(next_state), fsa_cause2string(cause));
}
check_join_counts(msg_data);
break;
case S_STOPPING:
case S_TERMINATE:
/* possibly redundant */
controld_set_fsa_input_flags(R_SHUTDOWN);
break;
case S_IDLE:
CRM_LOG_ASSERT(AM_I_DC);
if (pcmk_is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) {
crm_info("(Re)Issuing shutdown request now" " that we are the DC");
controld_set_fsa_action_flags(A_SHUTDOWN_REQ);
}
controld_start_recheck_timer();
break;
default:
break;
}
if (clear_recovery_bit && next_state != S_PENDING) {
controld_clear_fsa_action_flags(A_RECOVER);
} else if (clear_recovery_bit == FALSE) {
controld_set_fsa_action_flags(A_RECOVER);
}
#if 0
if (original_fsa_actions != controld_globals.fsa_actions) {
fsa_dump_actions(original_fsa_actions ^ controld_globals.fsa_actions,
"New actions");
}
#endif
}
diff --git a/daemons/controld/controld_messages.c b/daemons/controld/controld_messages.c
index 1f4b3891ce..978bd0cae8 100644
--- a/daemons/controld/controld_messages.c
+++ b/daemons/controld/controld_messages.c
@@ -1,1384 +1,1384 @@
/*
* Copyright 2004-2025 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU General Public License version 2
* or later (GPLv2+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
-#include <sys/param.h>
+#include <inttypes.h> // PRIx64
+#include <stdint.h> // uint64_t
#include <string.h>
+#include <sys/param.h>
#include <time.h>
#include <crm/crm.h>
#include <crm/common/xml.h>
#include <crm/cluster/internal.h>
#include <crm/cib.h>
#include <crm/common/ipc_internal.h>
#include <pacemaker-controld.h>
static enum crmd_fsa_input handle_message(xmlNode *msg,
enum crmd_fsa_cause cause);
static xmlNode* create_ping_reply(const xmlNode *msg);
static void handle_response(xmlNode *stored_msg);
static enum crmd_fsa_input handle_request(xmlNode *stored_msg,
enum crmd_fsa_cause cause);
static enum crmd_fsa_input handle_shutdown_request(xmlNode *stored_msg);
static void send_msg_via_ipc(xmlNode * msg, const char *sys, const char *src);
/* debug only, can wrap all it likes */
static int last_data_id = 0;
void
register_fsa_error_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input,
fsa_data_t * cur_data, void *new_data, const char *raised_from)
{
/* save the current actions if any */
if (controld_globals.fsa_actions != A_NOTHING) {
register_fsa_input_adv(cur_data ? cur_data->fsa_cause : C_FSA_INTERNAL,
I_NULL, cur_data ? cur_data->data : NULL,
controld_globals.fsa_actions, TRUE, __func__);
}
/* reset the action list */
crm_info("Resetting the current action list");
fsa_dump_actions(controld_globals.fsa_actions, "Drop");
controld_globals.fsa_actions = A_NOTHING;
/* register the error */
register_fsa_input_adv(cause, input, new_data, A_NOTHING, TRUE, raised_from);
}
void
register_fsa_input_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input,
void *data, uint64_t with_actions,
gboolean prepend, const char *raised_from)
{
unsigned old_len = g_list_length(controld_globals.fsa_message_queue);
fsa_data_t *fsa_data = NULL;
if (raised_from == NULL) {
raised_from = "<unknown>";
}
if (input == I_NULL && with_actions == A_NOTHING /* && data == NULL */ ) {
/* no point doing anything */
crm_err("Cannot add entry to queue: no input and no action");
return;
}
if (input == I_WAIT_FOR_EVENT) {
controld_set_global_flags(controld_fsa_is_stalled);
crm_debug("Stalling the FSA pending further input: source=%s cause=%s data=%p queue=%d",
raised_from, fsa_cause2string(cause), data, old_len);
if (old_len > 0) {
fsa_dump_queue(LOG_TRACE);
prepend = FALSE;
}
if (data == NULL) {
controld_set_fsa_action_flags(with_actions);
fsa_dump_actions(with_actions, "Restored");
return;
}
/* Store everything in the new event and reset
* controld_globals.fsa_actions
*/
with_actions |= controld_globals.fsa_actions;
controld_globals.fsa_actions = A_NOTHING;
}
last_data_id++;
crm_trace("%s %s FSA input %d (%s) due to %s, %s data",
raised_from, (prepend? "prepended" : "appended"), last_data_id,
fsa_input2string(input), fsa_cause2string(cause),
(data? "with" : "without"));
fsa_data = pcmk__assert_alloc(1, sizeof(fsa_data_t));
fsa_data->id = last_data_id;
fsa_data->fsa_input = input;
fsa_data->fsa_cause = cause;
fsa_data->origin = raised_from;
fsa_data->data = NULL;
fsa_data->data_type = fsa_dt_none;
fsa_data->actions = with_actions;
if (with_actions != A_NOTHING) {
- crm_trace("Adding actions %.16llx to input",
- (unsigned long long) with_actions);
+ crm_trace("Adding actions %.16" PRIx64 " to input", with_actions);
}
if (data != NULL) {
switch (cause) {
case C_FSA_INTERNAL:
case C_CRMD_STATUS_CALLBACK:
case C_IPC_MESSAGE:
case C_HA_MESSAGE:
CRM_CHECK(((ha_msg_input_t *) data)->msg != NULL,
crm_err("Bogus data from %s", raised_from));
crm_trace("Copying %s data from %s as cluster message data",
fsa_cause2string(cause), raised_from);
fsa_data->data = copy_ha_msg_input(data);
fsa_data->data_type = fsa_dt_ha_msg;
break;
case C_LRM_OP_CALLBACK:
crm_trace("Copying %s data from %s as lrmd_event_data_t",
fsa_cause2string(cause), raised_from);
fsa_data->data = lrmd_copy_event((lrmd_event_data_t *) data);
fsa_data->data_type = fsa_dt_lrm;
break;
case C_TIMER_POPPED:
case C_SHUTDOWN:
case C_UNKNOWN:
case C_STARTUP:
crm_crit("Copying %s data (from %s) is not yet implemented",
fsa_cause2string(cause), raised_from);
crmd_exit(CRM_EX_SOFTWARE);
break;
}
}
/* make sure to free it properly later */
if (prepend) {
controld_globals.fsa_message_queue
= g_list_prepend(controld_globals.fsa_message_queue, fsa_data);
} else {
controld_globals.fsa_message_queue
= g_list_append(controld_globals.fsa_message_queue, fsa_data);
}
crm_trace("FSA message queue length is %d",
g_list_length(controld_globals.fsa_message_queue));
/* fsa_dump_queue(LOG_TRACE); */
if (old_len == g_list_length(controld_globals.fsa_message_queue)) {
crm_err("Couldn't add message to the queue");
}
if (input != I_WAIT_FOR_EVENT) {
controld_trigger_fsa();
}
}
void
fsa_dump_queue(int log_level)
{
int offset = 0;
for (GList *iter = controld_globals.fsa_message_queue; iter != NULL;
iter = iter->next) {
fsa_data_t *data = (fsa_data_t *) iter->data;
do_crm_log_unlikely(log_level,
"queue[%d.%d]: input %s raised by %s(%p.%d)\t(cause=%s)",
offset++, data->id, fsa_input2string(data->fsa_input),
data->origin, data->data, data->data_type,
fsa_cause2string(data->fsa_cause));
}
}
ha_msg_input_t *
copy_ha_msg_input(ha_msg_input_t * orig)
{
xmlNode *wrapper = NULL;
ha_msg_input_t *copy = pcmk__assert_alloc(1, sizeof(ha_msg_input_t));
copy->msg = (orig != NULL)? pcmk__xml_copy(NULL, orig->msg) : NULL;
wrapper = pcmk__xe_first_child(copy->msg, PCMK__XE_CRM_XML, NULL, NULL);
copy->xml = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
return copy;
}
void
delete_fsa_input(fsa_data_t * fsa_data)
{
lrmd_event_data_t *op = NULL;
xmlNode *foo = NULL;
if (fsa_data == NULL) {
return;
}
crm_trace("About to free %s data", fsa_cause2string(fsa_data->fsa_cause));
if (fsa_data->data != NULL) {
switch (fsa_data->data_type) {
case fsa_dt_ha_msg:
delete_ha_msg_input(fsa_data->data);
break;
case fsa_dt_xml:
foo = fsa_data->data;
pcmk__xml_free(foo);
break;
case fsa_dt_lrm:
op = (lrmd_event_data_t *) fsa_data->data;
lrmd_free_event(op);
break;
case fsa_dt_none:
if (fsa_data->data != NULL) {
crm_err("Don't know how to free %s data from %s",
fsa_cause2string(fsa_data->fsa_cause), fsa_data->origin);
crmd_exit(CRM_EX_SOFTWARE);
}
break;
}
crm_trace("%s data freed", fsa_cause2string(fsa_data->fsa_cause));
}
free(fsa_data);
}
/* returns the next message */
fsa_data_t *
get_message(void)
{
fsa_data_t *message
= (fsa_data_t *) controld_globals.fsa_message_queue->data;
controld_globals.fsa_message_queue
= g_list_remove(controld_globals.fsa_message_queue, message);
crm_trace("Processing input %d", message->id);
return message;
}
void *
fsa_typed_data_adv(fsa_data_t * fsa_data, enum fsa_data_type a_type, const char *caller)
{
void *ret_val = NULL;
if (fsa_data == NULL) {
crm_err("%s: No FSA data available", caller);
} else if (fsa_data->data == NULL) {
crm_err("%s: No message data available. Origin: %s", caller, fsa_data->origin);
} else if (fsa_data->data_type != a_type) {
crm_crit("%s: Message data was the wrong type! %d vs. requested=%d. Origin: %s",
caller, fsa_data->data_type, a_type, fsa_data->origin);
pcmk__assert(fsa_data->data_type == a_type);
} else {
ret_val = fsa_data->data;
}
return ret_val;
}
/* A_MSG_ROUTE */
void
do_msg_route(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input, fsa_data_t * msg_data)
{
ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg);
route_message(msg_data->fsa_cause, input->msg);
}
void
route_message(enum crmd_fsa_cause cause, xmlNode * input)
{
ha_msg_input_t fsa_input;
enum crmd_fsa_input result = I_NULL;
fsa_input.msg = input;
CRM_CHECK(cause == C_IPC_MESSAGE || cause == C_HA_MESSAGE, return);
/* try passing the buck first */
if (relay_message(input, cause == C_IPC_MESSAGE)) {
return;
}
/* handle locally */
result = handle_message(input, cause);
/* done or process later? */
switch (result) {
case I_NULL:
case I_ROUTER:
case I_NODE_JOIN:
case I_JOIN_REQUEST:
case I_JOIN_RESULT:
break;
default:
/* Defering local processing of message */
register_fsa_input_later(cause, result, &fsa_input);
return;
}
if (result != I_NULL) {
/* add to the front of the queue */
register_fsa_input(cause, result, &fsa_input);
}
}
gboolean
relay_message(xmlNode * msg, gboolean originated_locally)
{
enum pcmk_ipc_server dest = pcmk_ipc_unknown;
bool is_for_dc = false;
bool is_for_dcib = false;
bool is_for_te = false;
bool is_for_crm = false;
bool is_for_cib = false;
bool is_local = false;
bool broadcast = false;
const char *host_to = NULL;
const char *sys_to = NULL;
const char *sys_from = NULL;
const char *type = NULL;
const char *task = NULL;
const char *ref = NULL;
pcmk__node_status_t *node_to = NULL;
CRM_CHECK(msg != NULL, return TRUE);
host_to = crm_element_value(msg, PCMK__XA_CRM_HOST_TO);
sys_to = crm_element_value(msg, PCMK__XA_CRM_SYS_TO);
sys_from = crm_element_value(msg, PCMK__XA_CRM_SYS_FROM);
type = crm_element_value(msg, PCMK__XA_T);
task = crm_element_value(msg, PCMK__XA_CRM_TASK);
ref = crm_element_value(msg, PCMK_XA_REFERENCE);
broadcast = pcmk__str_empty(host_to);
if (ref == NULL) {
ref = "without reference ID";
}
if (pcmk__str_eq(task, CRM_OP_HELLO, pcmk__str_casei)) {
crm_trace("Received hello %s from %s (no processing needed)",
ref, pcmk__s(sys_from, "unidentified source"));
crm_log_xml_trace(msg, "hello");
return TRUE;
}
// Require message type (set by pcmk__new_request())
if (!pcmk__str_eq(type, PCMK__VALUE_CRMD, pcmk__str_none)) {
crm_warn("Ignoring invalid message %s with type '%s' "
"(not '" PCMK__VALUE_CRMD "')",
ref, pcmk__s(type, ""));
crm_log_xml_trace(msg, "ignored");
return TRUE;
}
// Require a destination subsystem (also set by pcmk__new_request())
if (sys_to == NULL) {
crm_warn("Ignoring invalid message %s with no " PCMK__XA_CRM_SYS_TO,
ref);
crm_log_xml_trace(msg, "ignored");
return TRUE;
}
// Get the message type appropriate to the destination subsystem
if (pcmk_get_cluster_layer() == pcmk_cluster_layer_corosync) {
dest = pcmk__parse_server(sys_to);
if (dest == pcmk_ipc_unknown) {
/* Unrecognized value, use a sane default
*
* @TODO Maybe we should bail instead
*/
dest = pcmk_ipc_controld;
}
}
is_for_dc = (strcasecmp(CRM_SYSTEM_DC, sys_to) == 0);
is_for_dcib = (strcasecmp(CRM_SYSTEM_DCIB, sys_to) == 0);
is_for_te = (strcasecmp(CRM_SYSTEM_TENGINE, sys_to) == 0);
is_for_cib = (strcasecmp(CRM_SYSTEM_CIB, sys_to) == 0);
is_for_crm = (strcasecmp(CRM_SYSTEM_CRMD, sys_to) == 0);
// Check whether message should be processed locally
is_local = false;
if (broadcast) {
if (is_for_dc || is_for_te) {
is_local = false;
} else if (is_for_crm) {
if (pcmk__strcase_any_of(task, CRM_OP_NODE_INFO,
PCMK__CONTROLD_CMD_NODES, NULL)) {
/* Node info requests do not specify a host, which is normally
* treated as "all hosts", because the whole point is that the
* client may not know the local node name. Always handle these
* requests locally.
*/
is_local = true;
} else {
is_local = !originated_locally;
}
} else {
is_local = true;
}
} else if (controld_is_local_node(host_to)) {
is_local = true;
} else if (is_for_crm && pcmk__str_eq(task, CRM_OP_LRM_DELETE, pcmk__str_casei)) {
xmlNode *wrapper = pcmk__xe_first_child(msg, PCMK__XE_CRM_XML, NULL,
NULL);
xmlNode *msg_data = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
const char *mode = crm_element_value(msg_data, PCMK__XA_MODE);
if (pcmk__str_eq(mode, PCMK__VALUE_CIB, pcmk__str_none)) {
// Local delete of an offline node's resource history
is_local = true;
}
}
// If is for DC and DC is not yet selected
if (is_for_dc && pcmk__str_eq(task, CRM_OP_PING, pcmk__str_casei)
&& (controld_globals.dc_name == NULL)) {
xmlNode *reply = create_ping_reply(msg);
sys_to = crm_element_value(reply, PCMK__XA_CRM_SYS_TO);
// Explicitly leave src empty. It indicates that dc is "not yet selected"
send_msg_via_ipc(reply, sys_to, NULL);
pcmk__xml_free(reply);
return TRUE;
}
// Check whether message should be relayed
if (is_for_dc || is_for_dcib || is_for_te) {
if (AM_I_DC) {
if (is_for_te) {
crm_trace("Route message %s locally as transition request",
ref);
crm_log_xml_trace(msg, sys_to);
send_msg_via_ipc(msg, sys_to, controld_globals.cluster->priv->node_name);
return TRUE; // No further processing of message is needed
}
crm_trace("Route message %s locally as DC request", ref);
return FALSE; // More to be done by caller
}
if (originated_locally
&& !pcmk__strcase_any_of(sys_from, CRM_SYSTEM_PENGINE,
CRM_SYSTEM_TENGINE, NULL)) {
crm_trace("Relay message %s to DC (via %s)",
ref, pcmk__s(host_to, "broadcast"));
crm_log_xml_trace(msg, "relayed");
if (!broadcast) {
node_to = pcmk__get_node(0, host_to, NULL,
pcmk__node_search_cluster_member);
}
pcmk__cluster_send_message(node_to, dest, msg);
return TRUE;
}
/* Transition engine and scheduler messages are sent only to the DC on
* the same node. If we are no longer the DC, discard this message.
*/
crm_trace("Ignoring message %s because we are no longer DC", ref);
crm_log_xml_trace(msg, "ignored");
return TRUE; // No further processing of message is needed
}
if (is_local) {
if (is_for_crm || is_for_cib) {
crm_trace("Route message %s locally as controller request", ref);
return FALSE; // More to be done by caller
}
crm_trace("Relay message %s locally to %s", ref, sys_to);
crm_log_xml_trace(msg, "IPC-relay");
send_msg_via_ipc(msg, sys_to, controld_globals.cluster->priv->node_name);
return TRUE;
}
if (!broadcast) {
node_to = pcmk__search_node_caches(0, host_to,
pcmk__node_search_cluster_member);
if (node_to == NULL) {
crm_warn("Ignoring message %s because node %s is unknown",
ref, host_to);
crm_log_xml_trace(msg, "ignored");
return TRUE;
}
}
crm_trace("Relay message %s to %s",
ref, pcmk__s(host_to, "all peers"));
crm_log_xml_trace(msg, "relayed");
pcmk__cluster_send_message(node_to, dest, msg);
return TRUE;
}
// Return true if field contains a positive integer
static bool
authorize_version(xmlNode *message_data, const char *field,
const char *client_name, const char *ref, const char *uuid)
{
const char *version = crm_element_value(message_data, field);
long long version_num;
if ((pcmk__scan_ll(version, &version_num, -1LL) != pcmk_rc_ok)
|| (version_num < 0LL)) {
crm_warn("Rejected IPC hello from %s: '%s' is not a valid protocol %s "
QB_XS " ref=%s uuid=%s",
client_name, ((version == NULL)? "" : version),
field, (ref? ref : "none"), uuid);
return false;
}
return true;
}
/*!
* \internal
* \brief Check whether a client IPC message is acceptable
*
* If a given client IPC message is a hello, "authorize" it by ensuring it has
* valid information such as a protocol version, and return false indicating
* that nothing further needs to be done with the message. If the message is not
* a hello, just return true to indicate it needs further processing.
*
* \param[in] client_msg XML of IPC message
* \param[in,out] curr_client If IPC is not proxied, client that sent message
* \param[in] proxy_session If IPC is proxied, the session ID
*
* \return true if message needs further processing, false if it doesn't
*/
bool
controld_authorize_ipc_message(const xmlNode *client_msg, pcmk__client_t *curr_client,
const char *proxy_session)
{
xmlNode *wrapper = NULL;
xmlNode *message_data = NULL;
const char *client_name = NULL;
const char *op = crm_element_value(client_msg, PCMK__XA_CRM_TASK);
const char *ref = crm_element_value(client_msg, PCMK_XA_REFERENCE);
const char *uuid = (curr_client? curr_client->id : proxy_session);
if (uuid == NULL) {
crm_warn("IPC message from client rejected: No client identifier "
QB_XS " ref=%s", (ref? ref : "none"));
goto rejected;
}
if (!pcmk__str_eq(CRM_OP_HELLO, op, pcmk__str_casei)) {
// Only hello messages need to be authorized
return true;
}
wrapper = pcmk__xe_first_child(client_msg, PCMK__XE_CRM_XML, NULL, NULL);
message_data = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
client_name = crm_element_value(message_data, PCMK__XA_CLIENT_NAME);
if (pcmk__str_empty(client_name)) {
crm_warn("IPC hello from client rejected: No client name",
QB_XS " ref=%s uuid=%s", (ref? ref : "none"), uuid);
goto rejected;
}
if (!authorize_version(message_data, PCMK__XA_MAJOR_VERSION, client_name,
ref, uuid)) {
goto rejected;
}
if (!authorize_version(message_data, PCMK__XA_MINOR_VERSION, client_name,
ref, uuid)) {
goto rejected;
}
crm_trace("Validated IPC hello from client %s", client_name);
crm_log_xml_trace(client_msg, "hello");
if (curr_client) {
curr_client->userdata = pcmk__str_copy(client_name);
}
controld_trigger_fsa();
return false;
rejected:
crm_log_xml_trace(client_msg, "rejected");
if (curr_client) {
qb_ipcs_disconnect(curr_client->ipcs);
}
return false;
}
static enum crmd_fsa_input
handle_message(xmlNode *msg, enum crmd_fsa_cause cause)
{
const char *type = NULL;
CRM_CHECK(msg != NULL, return I_NULL);
type = crm_element_value(msg, PCMK__XA_SUBT);
if (pcmk__str_eq(type, PCMK__VALUE_REQUEST, pcmk__str_none)) {
return handle_request(msg, cause);
}
if (pcmk__str_eq(type, PCMK__VALUE_RESPONSE, pcmk__str_none)) {
handle_response(msg);
return I_NULL;
}
crm_warn("Ignoring message with unknown " PCMK__XA_SUBT" '%s'",
pcmk__s(type, ""));
crm_log_xml_trace(msg, "bad");
return I_NULL;
}
static enum crmd_fsa_input
handle_failcount_op(xmlNode * stored_msg)
{
const char *rsc = NULL;
const char *uname = NULL;
const char *op = NULL;
char *interval_spec = NULL;
guint interval_ms = 0;
gboolean is_remote_node = FALSE;
xmlNode *wrapper = pcmk__xe_first_child(stored_msg, PCMK__XE_CRM_XML, NULL,
NULL);
xmlNode *xml_op = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
if (xml_op) {
xmlNode *xml_rsc = pcmk__xe_first_child(xml_op, PCMK_XE_PRIMITIVE, NULL,
NULL);
xmlNode *xml_attrs = pcmk__xe_first_child(xml_op, PCMK__XE_ATTRIBUTES,
NULL, NULL);
if (xml_rsc) {
rsc = pcmk__xe_id(xml_rsc);
}
if (xml_attrs) {
op = crm_element_value(xml_attrs,
CRM_META "_" PCMK__META_CLEAR_FAILURE_OP);
crm_element_value_ms(xml_attrs,
CRM_META "_" PCMK__META_CLEAR_FAILURE_INTERVAL,
&interval_ms);
}
}
uname = crm_element_value(xml_op, PCMK__META_ON_NODE);
if ((rsc == NULL) || (uname == NULL)) {
crm_log_xml_warn(stored_msg, "invalid failcount op");
return I_NULL;
}
if (crm_element_value(xml_op, PCMK__XA_ROUTER_NODE)) {
is_remote_node = TRUE;
}
crm_debug("Clearing failures for %s-interval %s on %s "
"from attribute manager, CIB, and executor state",
pcmk__readable_interval(interval_ms), rsc, uname);
if (interval_ms) {
interval_spec = crm_strdup_printf("%ums", interval_ms);
}
update_attrd_clear_failures(uname, rsc, op, interval_spec, is_remote_node);
free(interval_spec);
controld_cib_delete_last_failure(rsc, uname, op, interval_ms);
lrm_clear_last_failure(rsc, uname, op, interval_ms);
return I_NULL;
}
static enum crmd_fsa_input
handle_lrm_delete(xmlNode *stored_msg)
{
const char *mode = NULL;
xmlNode *wrapper = pcmk__xe_first_child(stored_msg, PCMK__XE_CRM_XML, NULL,
NULL);
xmlNode *msg_data = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
CRM_CHECK(msg_data != NULL, return I_NULL);
/* CRM_OP_LRM_DELETE has two distinct modes. The default behavior is to
* relay the operation to the affected node, which will unregister the
* resource from the local executor, clear the resource's history from the
* CIB, and do some bookkeeping in the controller.
*
* However, if the affected node is offline, the client will specify
* mode=PCMK__VALUE_CIB which means the controller receiving the operation
* should clear the resource's history from the CIB and nothing else. This
* is used to clear shutdown locks.
*/
mode = crm_element_value(msg_data, PCMK__XA_MODE);
if (!pcmk__str_eq(mode, PCMK__VALUE_CIB, pcmk__str_none)) {
// Relay to affected node
crm_xml_add(stored_msg, PCMK__XA_CRM_SYS_TO, CRM_SYSTEM_LRMD);
return I_ROUTER;
} else {
// Delete CIB history locally (compare with do_lrm_delete())
const char *from_sys = NULL;
const char *user_name = NULL;
const char *rsc_id = NULL;
const char *node = NULL;
xmlNode *rsc_xml = NULL;
int rc = pcmk_rc_ok;
rsc_xml = pcmk__xe_first_child(msg_data, PCMK_XE_PRIMITIVE, NULL, NULL);
CRM_CHECK(rsc_xml != NULL, return I_NULL);
rsc_id = pcmk__xe_id(rsc_xml);
from_sys = crm_element_value(stored_msg, PCMK__XA_CRM_SYS_FROM);
node = crm_element_value(msg_data, PCMK__META_ON_NODE);
user_name = pcmk__update_acl_user(stored_msg, PCMK__XA_CRM_USER, NULL);
crm_debug("Handling " CRM_OP_LRM_DELETE " for %s on %s locally%s%s "
"(clearing CIB resource history only)", rsc_id, node,
(user_name? " for user " : ""), (user_name? user_name : ""));
rc = controld_delete_resource_history(rsc_id, node, user_name,
cib_dryrun|cib_sync_call);
if (rc == pcmk_rc_ok) {
rc = controld_delete_resource_history(rsc_id, node, user_name,
crmd_cib_smart_opt());
}
/* Notify client. Also notify tengine if mode=PCMK__VALUE_CIB and
* op=CRM_OP_LRM_DELETE.
*/
if (from_sys) {
lrmd_event_data_t *op = NULL;
const char *from_host = crm_element_value(stored_msg, PCMK__XA_SRC);
const char *transition;
if (strcmp(from_sys, CRM_SYSTEM_TENGINE)) {
transition = crm_element_value(msg_data,
PCMK__XA_TRANSITION_KEY);
} else {
transition = crm_element_value(stored_msg,
PCMK__XA_TRANSITION_KEY);
}
crm_info("Notifying %s on %s that %s was%s deleted",
from_sys, (from_host? from_host : "local node"), rsc_id,
((rc == pcmk_rc_ok)? "" : " not"));
op = lrmd_new_event(rsc_id, PCMK_ACTION_DELETE, 0);
op->type = lrmd_event_exec_complete;
op->user_data = pcmk__str_copy(pcmk__s(transition, FAKE_TE_ID));
op->params = pcmk__strkey_table(free, free);
pcmk__insert_dup(op->params, PCMK_XA_CRM_FEATURE_SET,
CRM_FEATURE_SET);
controld_rc2event(op, rc);
controld_ack_event_directly(from_host, from_sys, NULL, op, rsc_id);
lrmd_free_event(op);
controld_trigger_delete_refresh(from_sys, rsc_id);
}
return I_NULL;
}
}
/*!
* \brief Handle a CRM_OP_REMOTE_STATE message by updating remote peer cache
*
* \param[in] msg Message XML
*
* \return Next FSA input
*/
static enum crmd_fsa_input
handle_remote_state(const xmlNode *msg)
{
const char *conn_host = NULL;
const char *remote_uname = pcmk__xe_id(msg);
pcmk__node_status_t *remote_peer;
bool remote_is_up = false;
int rc = pcmk_rc_ok;
rc = pcmk__xe_get_bool_attr(msg, PCMK__XA_IN_CCM, &remote_is_up);
CRM_CHECK(remote_uname && rc == pcmk_rc_ok, return I_NULL);
remote_peer = pcmk__cluster_lookup_remote_node(remote_uname);
CRM_CHECK(remote_peer, return I_NULL);
pcmk__update_peer_state(__func__, remote_peer,
remote_is_up ? PCMK_VALUE_MEMBER : PCMK__VALUE_LOST,
0);
conn_host = crm_element_value(msg, PCMK__XA_CONNECTION_HOST);
if (conn_host) {
pcmk__str_update(&remote_peer->conn_host, conn_host);
} else if (remote_peer->conn_host) {
free(remote_peer->conn_host);
remote_peer->conn_host = NULL;
}
return I_NULL;
}
/*!
* \brief Handle a CRM_OP_PING message
*
* \param[in] msg Message XML
*
* \return Next FSA input
*/
static xmlNode*
create_ping_reply(const xmlNode *msg)
{
const char *value = NULL;
xmlNode *ping = NULL;
xmlNode *reply = NULL;
// Build reply
ping = pcmk__xe_create(NULL, PCMK__XE_PING_RESPONSE);
value = crm_element_value(msg, PCMK__XA_CRM_SYS_TO);
crm_xml_add(ping, PCMK__XA_CRM_SUBSYSTEM, value);
// Add controller state
value = fsa_state2string(controld_globals.fsa_state);
crm_xml_add(ping, PCMK__XA_CRMD_STATE, value);
crm_notice("Current ping state: %s", value); // CTS needs this
// Add controller health
// @TODO maybe do some checks to determine meaningful status
crm_xml_add(ping, PCMK_XA_RESULT, "ok");
reply = pcmk__new_reply(msg, ping);
pcmk__xml_free(ping);
return reply;
}
static enum crmd_fsa_input
handle_ping(const xmlNode *msg)
{
xmlNode *reply = create_ping_reply(msg);
if (reply != NULL) {
(void) relay_message(reply, TRUE);
pcmk__xml_free(reply);
}
// Nothing further to do
return I_NULL;
}
/*!
* \brief Handle a PCMK__CONTROLD_CMD_NODES message
*
* \param[in] request Message XML
*
* \return Next FSA input
*/
static enum crmd_fsa_input
handle_node_list(const xmlNode *request)
{
GHashTableIter iter;
pcmk__node_status_t *node = NULL;
xmlNode *reply = NULL;
xmlNode *reply_data = NULL;
// Create message data for reply
reply_data = pcmk__xe_create(NULL, PCMK_XE_NODES);
g_hash_table_iter_init(&iter, pcmk__peer_cache);
while (g_hash_table_iter_next(&iter, NULL, (gpointer *) & node)) {
xmlNode *xml = pcmk__xe_create(reply_data, PCMK_XE_NODE);
crm_xml_add_ll(xml, PCMK_XA_ID,
(long long) node->cluster_layer_id); // uint32_t
crm_xml_add(xml, PCMK_XA_UNAME, node->name);
crm_xml_add(xml, PCMK__XA_IN_CCM, node->state);
}
// Create and send reply
reply = pcmk__new_reply(request, reply_data);
pcmk__xml_free(reply_data);
if (reply) {
(void) relay_message(reply, TRUE);
pcmk__xml_free(reply);
}
// Nothing further to do
return I_NULL;
}
/*!
* \brief Handle a CRM_OP_NODE_INFO request
*
* \param[in] msg Message XML
*
* \return Next FSA input
*/
static enum crmd_fsa_input
handle_node_info_request(const xmlNode *msg)
{
const char *value = NULL;
pcmk__node_status_t *node = NULL;
int node_id = 0;
xmlNode *reply = NULL;
xmlNode *reply_data = NULL;
// Build reply
reply_data = pcmk__xe_create(NULL, PCMK_XE_NODE);
crm_xml_add(reply_data, PCMK__XA_CRM_SUBSYSTEM, CRM_SYSTEM_CRMD);
// Add whether current partition has quorum
pcmk__xe_set_bool_attr(reply_data, PCMK_XA_HAVE_QUORUM,
pcmk_is_set(controld_globals.flags,
controld_has_quorum));
/* Check whether client requested node info by ID and/or name
*
* @TODO A Corosync-layer node ID is of type uint32_t. We should be able to
* handle legitimate node IDs greater than INT_MAX, but currently we do not.
*/
crm_element_value_int(msg, PCMK_XA_ID, &node_id);
if (node_id < 0) {
node_id = 0;
}
value = crm_element_value(msg, PCMK_XA_UNAME);
// Default to local node if none given
if ((node_id == 0) && (value == NULL)) {
value = controld_globals.cluster->priv->node_name;
}
node = pcmk__search_node_caches(node_id, value, pcmk__node_search_any);
if (node) {
crm_xml_add(reply_data, PCMK_XA_ID, node->xml_id);
crm_xml_add(reply_data, PCMK_XA_UNAME, node->name);
crm_xml_add(reply_data, PCMK_XA_CRMD, node->state);
pcmk__xe_set_bool_attr(reply_data, PCMK_XA_REMOTE_NODE,
pcmk_is_set(node->flags,
pcmk__node_status_remote));
}
// Send reply
reply = pcmk__new_reply(msg, reply_data);
pcmk__xml_free(reply_data);
if (reply != NULL) {
(void) relay_message(reply, TRUE);
pcmk__xml_free(reply);
}
// Nothing further to do
return I_NULL;
}
static void
verify_feature_set(xmlNode *msg)
{
const char *dc_version = crm_element_value(msg, PCMK_XA_CRM_FEATURE_SET);
if (dc_version == NULL) {
/* All we really know is that the DC feature set is older than 3.1.0,
* but that's also all that really matters.
*/
dc_version = "3.0.14";
}
if (feature_set_compatible(dc_version, CRM_FEATURE_SET)) {
crm_trace("Local feature set (%s) is compatible with DC's (%s)",
CRM_FEATURE_SET, dc_version);
} else {
crm_err("Local feature set (%s) is incompatible with DC's (%s)",
CRM_FEATURE_SET, dc_version);
// Nothing is likely to improve without administrator involvement
controld_set_fsa_input_flags(R_STAYDOWN);
crmd_exit(CRM_EX_FATAL);
}
}
// DC gets own shutdown all-clear
static enum crmd_fsa_input
handle_shutdown_self_ack(xmlNode *stored_msg)
{
const char *host_from = crm_element_value(stored_msg, PCMK__XA_SRC);
if (pcmk_is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) {
// The expected case -- we initiated own shutdown sequence
crm_info("Shutting down controller");
return I_STOP;
}
if (pcmk__str_eq(host_from, controld_globals.dc_name, pcmk__str_casei)) {
// Must be logic error -- DC confirming its own unrequested shutdown
crm_err("Shutting down controller immediately due to "
"unexpected shutdown confirmation");
return I_TERMINATE;
}
if (controld_globals.fsa_state != S_STOPPING) {
// Shouldn't happen -- non-DC confirming unrequested shutdown
crm_err("Starting new DC election because %s is "
"confirming shutdown we did not request",
(host_from? host_from : "another node"));
return I_ELECTION;
}
// Shouldn't happen, but we are already stopping anyway
crm_debug("Ignoring unexpected shutdown confirmation from %s",
(host_from? host_from : "another node"));
return I_NULL;
}
// Non-DC gets shutdown all-clear from DC
static enum crmd_fsa_input
handle_shutdown_ack(xmlNode *stored_msg)
{
const char *host_from = crm_element_value(stored_msg, PCMK__XA_SRC);
if (host_from == NULL) {
crm_warn("Ignoring shutdown request without origin specified");
return I_NULL;
}
if (pcmk__str_eq(host_from, controld_globals.dc_name,
pcmk__str_null_matches|pcmk__str_casei)) {
if (pcmk_is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) {
crm_info("Shutting down controller after confirmation from %s",
host_from);
} else {
crm_err("Shutting down controller after unexpected "
"shutdown request from %s", host_from);
controld_set_fsa_input_flags(R_STAYDOWN);
}
return I_STOP;
}
crm_warn("Ignoring shutdown request from %s because DC is %s",
host_from, controld_globals.dc_name);
return I_NULL;
}
static enum crmd_fsa_input
handle_request(xmlNode *stored_msg, enum crmd_fsa_cause cause)
{
xmlNode *msg = NULL;
const char *op = crm_element_value(stored_msg, PCMK__XA_CRM_TASK);
/* Optimize this for the DC - it has the most to do */
crm_log_xml_trace(stored_msg, "request");
if (op == NULL) {
crm_warn("Ignoring request without " PCMK__XA_CRM_TASK);
return I_NULL;
}
if (strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0) {
const char *from = crm_element_value(stored_msg, PCMK__XA_SRC);
pcmk__node_status_t *node =
pcmk__search_node_caches(0, from, pcmk__node_search_cluster_member);
pcmk__update_peer_expected(__func__, node, CRMD_JOINSTATE_DOWN);
if(AM_I_DC == FALSE) {
return I_NULL; /* Done */
}
}
/*========== DC-Only Actions ==========*/
if (AM_I_DC) {
if (strcmp(op, CRM_OP_JOIN_ANNOUNCE) == 0) {
return I_NODE_JOIN;
} else if (strcmp(op, CRM_OP_JOIN_REQUEST) == 0) {
return I_JOIN_REQUEST;
} else if (strcmp(op, CRM_OP_JOIN_CONFIRM) == 0) {
return I_JOIN_RESULT;
} else if (strcmp(op, CRM_OP_SHUTDOWN) == 0) {
return handle_shutdown_self_ack(stored_msg);
} else if (strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0) {
// Another controller wants to shut down its node
return handle_shutdown_request(stored_msg);
}
}
/*========== common actions ==========*/
if (strcmp(op, CRM_OP_NOVOTE) == 0) {
ha_msg_input_t fsa_input;
fsa_input.msg = stored_msg;
register_fsa_input_adv(C_HA_MESSAGE, I_NULL, &fsa_input,
A_ELECTION_COUNT | A_ELECTION_CHECK, FALSE,
__func__);
} else if (strcmp(op, CRM_OP_REMOTE_STATE) == 0) {
/* a remote connection host is letting us know the node state */
return handle_remote_state(stored_msg);
} else if (strcmp(op, CRM_OP_THROTTLE) == 0) {
throttle_update(stored_msg);
if (AM_I_DC && (controld_globals.transition_graph != NULL)
&& !controld_globals.transition_graph->complete) {
crm_debug("The throttle changed. Trigger a graph.");
trigger_graph();
}
return I_NULL;
} else if (strcmp(op, CRM_OP_CLEAR_FAILCOUNT) == 0) {
return handle_failcount_op(stored_msg);
} else if (strcmp(op, CRM_OP_VOTE) == 0) {
/* count the vote and decide what to do after that */
ha_msg_input_t fsa_input;
fsa_input.msg = stored_msg;
register_fsa_input_adv(C_HA_MESSAGE, I_NULL, &fsa_input,
A_ELECTION_COUNT | A_ELECTION_CHECK, FALSE,
__func__);
/* Sometimes we _must_ go into S_ELECTION */
if (controld_globals.fsa_state == S_HALT) {
crm_debug("Forcing an election from S_HALT");
return I_ELECTION;
}
} else if (strcmp(op, CRM_OP_JOIN_OFFER) == 0) {
verify_feature_set(stored_msg);
crm_debug("Raising I_JOIN_OFFER: join-%s",
crm_element_value(stored_msg, PCMK__XA_JOIN_ID));
return I_JOIN_OFFER;
} else if (strcmp(op, CRM_OP_JOIN_ACKNAK) == 0) {
crm_debug("Raising I_JOIN_RESULT: join-%s",
crm_element_value(stored_msg, PCMK__XA_JOIN_ID));
return I_JOIN_RESULT;
} else if (strcmp(op, CRM_OP_LRM_DELETE) == 0) {
return handle_lrm_delete(stored_msg);
} else if ((strcmp(op, CRM_OP_LRM_FAIL) == 0)
|| (strcmp(op, CRM_OP_REPROBE) == 0)) {
crm_xml_add(stored_msg, PCMK__XA_CRM_SYS_TO, CRM_SYSTEM_LRMD);
return I_ROUTER;
} else if (strcmp(op, CRM_OP_NOOP) == 0) {
return I_NULL;
} else if (strcmp(op, CRM_OP_PING) == 0) {
return handle_ping(stored_msg);
} else if (strcmp(op, CRM_OP_NODE_INFO) == 0) {
return handle_node_info_request(stored_msg);
} else if (strcmp(op, CRM_OP_RM_NODE_CACHE) == 0) {
int id = 0;
const char *name = NULL;
crm_element_value_int(stored_msg, PCMK_XA_ID, &id);
name = crm_element_value(stored_msg, PCMK_XA_UNAME);
if(cause == C_IPC_MESSAGE) {
msg = pcmk__new_request(pcmk_ipc_controld, CRM_SYSTEM_CRMD, NULL,
CRM_SYSTEM_CRMD, CRM_OP_RM_NODE_CACHE,
NULL);
if (!pcmk__cluster_send_message(NULL, pcmk_ipc_controld, msg)) {
crm_err("Could not instruct peers to remove references to node %s/%u", name, id);
} else {
crm_notice("Instructing peers to remove references to node %s/%u", name, id);
}
pcmk__xml_free(msg);
} else {
pcmk__cluster_forget_cluster_node(id, name);
/* If we're forgetting this node, also forget any failures to fence
* it, so we don't carry that over to any node added later with the
* same name.
*/
st_fail_count_reset(name);
}
} else if (strcmp(op, CRM_OP_MAINTENANCE_NODES) == 0) {
xmlNode *wrapper = pcmk__xe_first_child(stored_msg, PCMK__XE_CRM_XML,
NULL, NULL);
xmlNode *xml = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
remote_ra_process_maintenance_nodes(xml);
} else if (strcmp(op, PCMK__CONTROLD_CMD_NODES) == 0) {
return handle_node_list(stored_msg);
/*========== (NOT_DC)-Only Actions ==========*/
} else if (!AM_I_DC) {
if (strcmp(op, CRM_OP_SHUTDOWN) == 0) {
return handle_shutdown_ack(stored_msg);
}
} else {
crm_err("Unexpected request (%s) sent to %s", op, AM_I_DC ? "the DC" : "non-DC node");
crm_log_xml_err(stored_msg, "Unexpected");
}
return I_NULL;
}
static void
handle_response(xmlNode *stored_msg)
{
const char *op = crm_element_value(stored_msg, PCMK__XA_CRM_TASK);
crm_log_xml_trace(stored_msg, "reply");
if (op == NULL) {
crm_warn("Ignoring reply without " PCMK__XA_CRM_TASK);
} else if (AM_I_DC && strcmp(op, CRM_OP_PECALC) == 0) {
// Check whether scheduler answer been superseded by subsequent request
const char *msg_ref = crm_element_value(stored_msg, PCMK_XA_REFERENCE);
if (msg_ref == NULL) {
crm_err("%s - Ignoring calculation with no reference", op);
} else if (pcmk__str_eq(msg_ref, controld_globals.fsa_pe_ref,
pcmk__str_none)) {
ha_msg_input_t fsa_input;
controld_stop_sched_timer();
fsa_input.msg = stored_msg;
register_fsa_input_later(C_IPC_MESSAGE, I_PE_SUCCESS, &fsa_input);
} else {
crm_info("%s calculation %s is obsolete", op, msg_ref);
}
} else if (strcmp(op, CRM_OP_VOTE) == 0
|| strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0 || strcmp(op, CRM_OP_SHUTDOWN) == 0) {
} else {
const char *host_from = crm_element_value(stored_msg, PCMK__XA_SRC);
crm_err("Unexpected response (op=%s, src=%s) sent to the %s",
op, host_from, AM_I_DC ? "DC" : "controller");
}
}
static enum crmd_fsa_input
handle_shutdown_request(xmlNode * stored_msg)
{
/* handle here to avoid potential version issues
* where the shutdown message/procedure may have
* been changed in later versions.
*
* This way the DC is always in control of the shutdown
*/
char *now_s = NULL;
const char *host_from = crm_element_value(stored_msg, PCMK__XA_SRC);
if (host_from == NULL) {
/* we're shutting down and the DC */
host_from = controld_globals.cluster->priv->node_name;
}
crm_info("Creating shutdown request for %s (state=%s)", host_from,
fsa_state2string(controld_globals.fsa_state));
crm_log_xml_trace(stored_msg, "message");
now_s = pcmk__ttoa(time(NULL));
update_attrd(host_from, PCMK__NODE_ATTR_SHUTDOWN, now_s, NULL, FALSE);
free(now_s);
/* will be picked up by the TE as long as its running */
return I_NULL;
}
static void
send_msg_via_ipc(xmlNode * msg, const char *sys, const char *src)
{
pcmk__client_t *client_channel = NULL;
CRM_CHECK(sys != NULL, return);
client_channel = pcmk__find_client_by_id(sys);
if (crm_element_value(msg, PCMK__XA_SRC) == NULL) {
crm_xml_add(msg, PCMK__XA_SRC, src);
}
if (client_channel != NULL) {
/* Transient clients such as crmadmin */
pcmk__ipc_send_xml(client_channel, 0, msg, crm_ipc_server_event);
} else if (pcmk__str_eq(sys, CRM_SYSTEM_TENGINE, pcmk__str_none)) {
xmlNode *wrapper = pcmk__xe_first_child(msg, PCMK__XE_CRM_XML, NULL,
NULL);
xmlNode *data = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
process_te_message(msg, data);
} else if (pcmk__str_eq(sys, CRM_SYSTEM_LRMD, pcmk__str_none)) {
fsa_data_t fsa_data;
ha_msg_input_t fsa_input;
xmlNode *wrapper = NULL;
fsa_input.msg = msg;
wrapper = pcmk__xe_first_child(msg, PCMK__XE_CRM_XML, NULL, NULL);
fsa_input.xml = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
fsa_data.id = 0;
fsa_data.actions = 0;
fsa_data.data = &fsa_input;
fsa_data.fsa_input = I_MESSAGE;
fsa_data.fsa_cause = C_IPC_MESSAGE;
fsa_data.origin = __func__;
fsa_data.data_type = fsa_dt_ha_msg;
do_lrm_invoke(A_LRM_INVOKE, C_IPC_MESSAGE, controld_globals.fsa_state,
I_MESSAGE, &fsa_data);
} else if (crmd_is_proxy_session(sys)) {
crmd_proxy_send(sys, msg);
} else {
crm_info("Received invalid request: unknown subsystem '%s'", sys);
}
}
void
delete_ha_msg_input(ha_msg_input_t * orig)
{
if (orig == NULL) {
return;
}
pcmk__xml_free(orig->msg);
free(orig);
}
/*!
* \internal
* \brief Notify the cluster of a remote node state change
*
* \param[in] node_name Node's name
* \param[in] node_up true if node is up, false if down
*/
void
broadcast_remote_state_message(const char *node_name, bool node_up)
{
xmlNode *msg = pcmk__new_request(pcmk_ipc_controld, CRM_SYSTEM_CRMD, NULL,
CRM_SYSTEM_CRMD, CRM_OP_REMOTE_STATE,
NULL);
crm_info("Notifying cluster of Pacemaker Remote node %s %s",
node_name, node_up? "coming up" : "going down");
crm_xml_add(msg, PCMK_XA_ID, node_name);
pcmk__xe_set_bool_attr(msg, PCMK__XA_IN_CCM, node_up);
if (node_up) {
crm_xml_add(msg, PCMK__XA_CONNECTION_HOST,
controld_globals.cluster->priv->node_name);
}
pcmk__cluster_send_message(NULL, pcmk_ipc_controld, msg);
pcmk__xml_free(msg);
}
-
diff --git a/lib/cluster/cpg.c b/lib/cluster/cpg.c
index 559dd408e0..405551ecac 100644
--- a/lib/cluster/cpg.c
+++ b/lib/cluster/cpg.c
@@ -1,1053 +1,1049 @@
/*
- * Copyright 2004-2024 the Pacemaker project contributors
+ * Copyright 2004-2025 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <arpa/inet.h>
#include <inttypes.h> // PRIu32
#include <netdb.h>
#include <netinet/in.h>
#include <stdbool.h>
#include <stdint.h> // uint32_t
#include <sys/socket.h>
#include <sys/types.h> // size_t
#include <sys/utsname.h>
#include <bzlib.h>
#include <corosync/corodefs.h>
#include <corosync/corotypes.h>
#include <corosync/hdb.h>
#include <corosync/cpg.h>
#include <qb/qbipc_common.h>
#include <qb/qbipcc.h>
#include <qb/qbutil.h>
#include <crm/cluster/internal.h>
#include <crm/common/ipc.h>
#include <crm/common/ipc_internal.h> // PCMK__SPECIAL_PID
#include <crm/common/mainloop.h>
#include <crm/common/xml.h>
#include "crmcluster_private.h"
/* @TODO Once we can update the public API to require pcmk_cluster_t* in more
* functions, we can ditch this in favor of cluster->cpg_handle.
*/
static cpg_handle_t pcmk_cpg_handle = 0;
// @TODO These could be moved to pcmk_cluster_t* at that time as well
static bool cpg_evicted = false;
static GList *cs_message_queue = NULL;
static int cs_message_timer = 0;
/* @COMPAT Any changes to these structs (other than renames) will break all
* rolling upgrades, and should be avoided if possible or done at a major
* version bump if not
*/
struct pcmk__cpg_host_s {
uint32_t id;
uint32_t pid;
gboolean local; // Unused but needed for compatibility
enum pcmk_ipc_server type; // For logging only
uint32_t size;
char uname[MAX_NAME];
} __attribute__ ((packed));
typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
struct pcmk__cpg_msg_s {
struct qb_ipc_response_header header __attribute__ ((aligned(8)));
uint32_t id;
gboolean is_compressed;
pcmk__cpg_host_t host;
pcmk__cpg_host_t sender;
uint32_t size;
uint32_t compressed_size;
/* 584 bytes */
char data[0];
} __attribute__ ((packed));
typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
static void crm_cs_flush(gpointer data);
#define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
#define cs_repeat(rc, counter, max, code) do { \
rc = code; \
if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
counter++; \
crm_debug("Retrying operation after %ds", counter); \
sleep(counter); \
} else { \
break; \
} \
} while (counter < max)
/*!
* \internal
* \brief Get the local Corosync node ID (via CPG)
*
* \param[in] handle CPG connection to use (or 0 to use new connection)
*
* \return Corosync ID of local node (or 0 if not known)
*/
uint32_t
pcmk__cpg_local_nodeid(cpg_handle_t handle)
{
cs_error_t rc = CS_OK;
int retries = 0;
static uint32_t local_nodeid = 0;
cpg_handle_t local_handle = handle;
cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
int fd = -1;
uid_t found_uid = 0;
gid_t found_gid = 0;
pid_t found_pid = 0;
int rv = 0;
if (local_nodeid != 0) {
return local_nodeid;
}
if (handle == 0) {
crm_trace("Creating connection");
cs_repeat(rc, retries, 5,
cpg_model_initialize(&local_handle, CPG_MODEL_V1,
(cpg_model_data_t *) &cpg_model_info,
NULL));
if (rc != CS_OK) {
crm_err("Could not connect to the CPG API: %s (%d)",
cs_strerror(rc), rc);
return 0;
}
rc = cpg_fd_get(local_handle, &fd);
if (rc != CS_OK) {
crm_err("Could not obtain the CPG API connection: %s (%d)",
cs_strerror(rc), rc);
goto bail;
}
// CPG provider run as root (at least in given user namespace)?
rv = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid,
&found_uid, &found_gid);
if (rv == 0) {
crm_err("CPG provider is not authentic:"
" process %lld (uid: %lld, gid: %lld)",
(long long) PCMK__SPECIAL_PID_AS_0(found_pid),
(long long) found_uid, (long long) found_gid);
goto bail;
} else if (rv < 0) {
crm_err("Could not verify authenticity of CPG provider: %s (%d)",
strerror(-rv), -rv);
goto bail;
}
}
if (rc == CS_OK) {
retries = 0;
crm_trace("Performing lookup");
cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
}
if (rc != CS_OK) {
crm_err("Could not get local node id from the CPG API: %s (%d)",
pcmk__cs_err_str(rc), rc);
}
bail:
if (handle == 0) {
crm_trace("Closing connection");
cpg_finalize(local_handle);
}
crm_debug("Local nodeid is %u", local_nodeid);
return local_nodeid;
}
/*!
* \internal
* \brief Callback function for Corosync message queue timer
*
* \param[in] data CPG handle
*
* \return FALSE (to indicate to glib that timer should not be removed)
*/
static gboolean
crm_cs_flush_cb(gpointer data)
{
cs_message_timer = 0;
crm_cs_flush(data);
return FALSE;
}
// Send no more than this many CPG messages in one flush
#define CS_SEND_MAX 200
/*!
* \internal
* \brief Send messages in Corosync CPG message queue
*
* \param[in] data CPG handle
*/
static void
crm_cs_flush(gpointer data)
{
unsigned int sent = 0;
guint queue_len = 0;
cs_error_t rc = 0;
cpg_handle_t *handle = (cpg_handle_t *) data;
if (*handle == 0) {
crm_trace("Connection is dead");
return;
}
queue_len = g_list_length(cs_message_queue);
if (((queue_len % 1000) == 0) && (queue_len > 1)) {
crm_err("CPG queue has grown to %d", queue_len);
} else if (queue_len == CS_SEND_MAX) {
crm_warn("CPG queue has grown to %d", queue_len);
}
if (cs_message_timer != 0) {
/* There is already a timer, wait until it goes off */
crm_trace("Timer active %d", cs_message_timer);
return;
}
while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
struct iovec *iov = cs_message_queue->data;
rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
if (rc != CS_OK) {
break;
}
sent++;
- crm_trace("CPG message sent, size=%llu",
- (unsigned long long) iov->iov_len);
+ crm_trace("CPG message sent, size=%zu", iov->iov_len);
cs_message_queue = g_list_remove(cs_message_queue, iov);
free(iov->iov_base);
free(iov);
}
queue_len -= sent;
do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
"Sent %u CPG message%s (%d still queued): %s (rc=%d)",
sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
(int) rc);
if (cs_message_queue) {
uint32_t delay_ms = 100;
if (rc != CS_OK) {
/* Proportionally more if sending failed but cap at 1s */
delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
}
cs_message_timer = pcmk__create_timer(delay_ms, crm_cs_flush_cb, data);
}
}
/*!
* \internal
* \brief Dispatch function for CPG handle
*
* \param[in,out] user_data Cluster object
*
* \return 0 on success, -1 on error (per mainloop_io_t interface)
*/
static int
pcmk_cpg_dispatch(gpointer user_data)
{
cs_error_t rc = CS_OK;
pcmk_cluster_t *cluster = (pcmk_cluster_t *) user_data;
rc = cpg_dispatch(cluster->priv->cpg_handle, CS_DISPATCH_ONE);
if (rc != CS_OK) {
crm_err("Connection to the CPG API failed: %s (%d)",
pcmk__cs_err_str(rc), rc);
cpg_finalize(cluster->priv->cpg_handle);
cluster->priv->cpg_handle = 0;
return -1;
} else if (cpg_evicted) {
crm_err("Evicted from CPG membership");
return -1;
}
return 0;
}
static inline const char *
ais_dest(const pcmk__cpg_host_t *host)
{
return (host->size > 0)? host->uname : "<all>";
}
static inline const char *
msg_type2text(enum pcmk_ipc_server type)
{
const char *name = pcmk__server_message_type(type);
return pcmk__s(name, "unknown");
}
/*!
* \internal
* \brief Check whether a Corosync CPG message is valid
*
* \param[in] msg Corosync CPG message to check
*
* \return true if \p msg is valid, otherwise false
*/
static bool
check_message_sanity(const pcmk__cpg_msg_t *msg)
{
int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
if (payload_size < 1) {
crm_err("%sCPG message %d from %s invalid: "
"Claimed size of %d bytes is too small "
QB_XS " from %s[%u] to %s@%s",
(msg->is_compressed? "Compressed " : ""),
msg->id, ais_dest(&(msg->sender)),
(int) msg->header.size,
msg_type2text(msg->sender.type), msg->sender.pid,
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return false;
}
if (msg->header.error != CS_OK) {
crm_err("%sCPG message %d from %s invalid: "
"Sender indicated error %d "
QB_XS " from %s[%u] to %s@%s",
(msg->is_compressed? "Compressed " : ""),
msg->id, ais_dest(&(msg->sender)),
msg->header.error,
msg_type2text(msg->sender.type), msg->sender.pid,
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return false;
}
if (msg_data_len(msg) != payload_size) {
crm_err("%sCPG message %d from %s invalid: "
"Total size %d inconsistent with payload size %d "
QB_XS " from %s[%u] to %s@%s",
(msg->is_compressed? "Compressed " : ""),
msg->id, ais_dest(&(msg->sender)),
(int) msg->header.size, (int) msg_data_len(msg),
msg_type2text(msg->sender.type), msg->sender.pid,
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return false;
}
if (!msg->is_compressed &&
/* msg->size != (strlen(msg->data) + 1) would be a stronger check,
* but checking the last byte or two should be quick
*/
(((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
|| (msg->data[msg->size - 1] != '\0'))) {
crm_err("CPG message %d from %s invalid: "
- "Payload does not end at byte %llu "
+ "Payload does not end at byte %" PRIu32 " "
QB_XS " from %s[%u] to %s@%s",
- msg->id, ais_dest(&(msg->sender)),
- (unsigned long long) msg->size,
+ msg->id, ais_dest(&(msg->sender)), msg->size,
msg_type2text(msg->sender.type), msg->sender.pid,
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return false;
}
crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
(int) msg->header.size, (msg->is_compressed? "compressed " : ""),
msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
ais_dest(&(msg->sender)),
msg_type2text(msg->host.type), ais_dest(&(msg->host)));
return true;
}
/*!
* \internal
* \brief Extract text data from a Corosync CPG message
*
* \param[in] handle CPG connection (to get local node ID if not known)
* \param[in] sender_id Corosync ID of node that sent message
* \param[in] pid Process ID of message sender (for logging only)
* \param[in,out] content CPG message
* \param[out] from If not \c NULL, will be set to sender uname
* (valid for the lifetime of \p content)
*
* \return Newly allocated string with message data, or NULL for errors and
* messages not intended for the local node
*
* \note The caller is responsible for freeing the return value using \c free().
*/
char *
pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid,
void *content, const char **from)
{
char *data = NULL;
pcmk__cpg_msg_t *msg = content;
if (from != NULL) {
*from = NULL;
}
if (handle != 0) {
uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
const char *local_name = pcmk__cluster_local_node_name();
// Update or validate message sender ID
if (msg->sender.id == 0) {
msg->sender.id = sender_id;
} else if (msg->sender.id != sender_id) {
crm_warn("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
": claimed ID %" PRIu32,
sender_id, pid, msg->sender.id);
return NULL;
}
// Ignore messages that aren't for the local node
if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) {
crm_trace("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
": for ID %" PRIu32 " not %" PRIu32,
sender_id, pid, msg->host.id, local_nodeid);
return NULL;
}
if ((msg->host.size > 0)
&& !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
crm_trace("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
": for name %s not %s",
sender_id, pid, msg->host.uname, local_name);
return NULL;
}
// Add sender name if not in original message
if (msg->sender.size == 0) {
const pcmk__node_status_t *peer =
pcmk__get_node(sender_id, NULL, NULL,
pcmk__node_search_cluster_member);
if (peer->name == NULL) {
crm_debug("Received CPG message from node with ID %" PRIu32
" but its name is unknown", sender_id);
} else {
crm_debug("Updating name of CPG message sender with ID %" PRIu32
" to %s", sender_id, peer->name);
msg->sender.size = strlen(peer->name);
memset(msg->sender.uname, 0, MAX_NAME);
memcpy(msg->sender.uname, peer->name, msg->sender.size);
}
}
}
// Ensure sender is in peer cache (though it should already be)
pcmk__get_node(msg->sender.id, msg->sender.uname, NULL,
pcmk__node_search_cluster_member);
if (from != NULL) {
*from = msg->sender.uname;
}
if (!check_message_sanity(msg)) {
return NULL;
}
if (msg->is_compressed && (msg->size > 0)) {
int rc = BZ_OK;
unsigned int new_size = msg->size + 1;
char *uncompressed = pcmk__assert_alloc(1, new_size);
rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data,
msg->compressed_size, 1, 0);
rc = pcmk__bzlib2rc(rc);
if ((rc == pcmk_rc_ok) && (msg->size != new_size)) { // libbz2 bug?
rc = pcmk_rc_compression;
}
if (rc != pcmk_rc_ok) {
free(uncompressed);
crm_warn("Ignoring compressed CPG message %d from %s (ID %" PRIu32
" PID %" PRIu32 "): %s",
msg->id, ais_dest(&(msg->sender)), sender_id, pid,
pcmk_rc_str(rc));
return NULL;
}
data = uncompressed;
} else {
data = pcmk__str_copy(msg->data);
}
crm_trace("Received %sCPG message %d from %s (ID %" PRIu32
" PID %" PRIu32 "): %.40s...",
(msg->is_compressed? "compressed " : ""),
msg->id, ais_dest(&(msg->sender)), sender_id, pid, msg->data);
return data;
}
/*!
* \internal
* \brief Compare cpg_address objects by node ID
*
* \param[in] first First cpg_address structure to compare
* \param[in] second Second cpg_address structure to compare
*
* \return Negative number if first's node ID is lower,
* positive number if first's node ID is greater,
* or 0 if both node IDs are equal
*/
static int
cmp_member_list_nodeid(const void *first, const void *second)
{
const struct cpg_address *const a = *((const struct cpg_address **) first),
*const b = *((const struct cpg_address **) second);
if (a->nodeid < b->nodeid) {
return -1;
} else if (a->nodeid > b->nodeid) {
return 1;
}
/* don't bother with "reason" nor "pid" */
return 0;
}
/*!
* \internal
* \brief Get a readable string equivalent of a cpg_reason_t value
*
* \param[in] reason CPG reason value
*
* \return Readable string suitable for logging
*/
static const char *
cpgreason2str(cpg_reason_t reason)
{
switch (reason) {
case CPG_REASON_JOIN: return " via cpg_join";
case CPG_REASON_LEAVE: return " via cpg_leave";
case CPG_REASON_NODEDOWN: return " via cluster exit";
case CPG_REASON_NODEUP: return " via cluster join";
case CPG_REASON_PROCDOWN: return " for unknown reason";
default: break;
}
return "";
}
/*!
* \internal
* \brief Get a log-friendly node name
*
* \param[in] peer Node to check
*
* \return Node's uname, or readable string if not known
*/
static inline const char *
peer_name(const pcmk__node_status_t *peer)
{
return (peer != NULL)? pcmk__s(peer->name, "peer node") : "unknown node";
}
/*!
* \internal
* \brief Process a CPG peer's leaving the cluster
*
* \param[in] cpg_group_name CPG group name (for logging)
* \param[in] event_counter Event number (for logging)
* \param[in] local_nodeid Node ID of local node
* \param[in] cpg_peer CPG peer that left
* \param[in] sorted_member_list List of remaining members, qsort()-ed by ID
* \param[in] member_list_entries Number of entries in \p sorted_member_list
*/
static void
node_left(const char *cpg_group_name, int event_counter,
uint32_t local_nodeid, const struct cpg_address *cpg_peer,
const struct cpg_address **sorted_member_list,
size_t member_list_entries)
{
pcmk__node_status_t *peer =
pcmk__search_node_caches(cpg_peer->nodeid, NULL,
pcmk__node_search_cluster_member);
const struct cpg_address **rival = NULL;
/* Most CPG-related Pacemaker code assumes that only one process on a node
* can be in the process group, but Corosync does not impose this
* limitation, and more than one can be a member in practice due to a
* daemon attempting to start while another instance is already running.
*
* Check for any such duplicate instances, because we don't want to process
* their leaving as if our actual peer left. If the peer that left still has
* an entry in sorted_member_list (with a different PID), we will ignore the
* leaving.
*
* @TODO Track CPG members' PIDs so we can tell exactly who left.
*/
if (peer != NULL) {
rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
sizeof(const struct cpg_address *),
cmp_member_list_nodeid);
}
if (rival == NULL) {
crm_info("Group %s event %d: %s (node %u pid %u) left%s",
cpg_group_name, event_counter, peer_name(peer),
cpg_peer->nodeid, cpg_peer->pid,
cpgreason2str(cpg_peer->reason));
if (peer != NULL) {
crm_update_peer_proc(__func__, peer, crm_proc_cpg,
PCMK_VALUE_OFFLINE);
}
} else if (cpg_peer->nodeid == local_nodeid) {
crm_warn("Group %s event %d: duplicate local pid %u left%s",
cpg_group_name, event_counter,
cpg_peer->pid, cpgreason2str(cpg_peer->reason));
} else {
crm_warn("Group %s event %d: "
"%s (node %u) duplicate pid %u left%s (%u remains)",
cpg_group_name, event_counter, peer_name(peer),
cpg_peer->nodeid, cpg_peer->pid,
cpgreason2str(cpg_peer->reason), (*rival)->pid);
}
}
/*!
* \internal
* \brief Handle a CPG configuration change event
*
* \param[in] handle CPG connection
* \param[in] group_name CPG group name
* \param[in] member_list List of current CPG members
* \param[in] member_list_entries Number of entries in \p member_list
* \param[in] left_list List of CPG members that left
* \param[in] left_list_entries Number of entries in \p left_list
* \param[in] joined_list List of CPG members that joined
* \param[in] joined_list_entries Number of entries in \p joined_list
*
* \note This is of type \c cpg_confchg_fn_t, intended to be used in a
* \c cpg_callbacks_t object.
*/
void
pcmk__cpg_confchg_cb(cpg_handle_t handle,
const struct cpg_name *group_name,
const struct cpg_address *member_list,
size_t member_list_entries,
const struct cpg_address *left_list,
size_t left_list_entries,
const struct cpg_address *joined_list,
size_t joined_list_entries)
{
static int counter = 0;
bool found = false;
uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
const struct cpg_address **sorted = NULL;
sorted = pcmk__assert_alloc(member_list_entries,
sizeof(const struct cpg_address *));
for (size_t iter = 0; iter < member_list_entries; iter++) {
sorted[iter] = member_list + iter;
}
// So that the cross-matching of multiply-subscribed nodes is then cheap
qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
cmp_member_list_nodeid);
for (int i = 0; i < left_list_entries; i++) {
node_left(group_name->value, counter, local_nodeid, &left_list[i],
sorted, member_list_entries);
}
free(sorted);
sorted = NULL;
for (int i = 0; i < joined_list_entries; i++) {
crm_info("Group %s event %d: node %u pid %u joined%s",
group_name->value, counter, joined_list[i].nodeid,
joined_list[i].pid, cpgreason2str(joined_list[i].reason));
}
for (int i = 0; i < member_list_entries; i++) {
pcmk__node_status_t *peer =
pcmk__get_node(member_list[i].nodeid, NULL, NULL,
pcmk__node_search_cluster_member);
if (member_list[i].nodeid == local_nodeid
&& member_list[i].pid != getpid()) {
// See the note in node_left()
crm_warn("Group %s event %d: detected duplicate local pid %u",
group_name->value, counter, member_list[i].pid);
continue;
}
crm_info("Group %s event %d: %s (node %u pid %u) is member",
group_name->value, counter, peer_name(peer),
member_list[i].nodeid, member_list[i].pid);
/* If the caller left auto-reaping enabled, this will also update the
* state to member.
*/
peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
PCMK_VALUE_ONLINE);
if (peer && peer->state && strcmp(peer->state, PCMK_VALUE_MEMBER)) {
/* The node is a CPG member, but we currently think it's not a
* cluster member. This is possible only if auto-reaping was
* disabled. The node may be joining, and we happened to get the CPG
* notification before the quorum notification; or the node may have
* just died, and we are processing its final messages; or a bug
* has affected the peer cache.
*/
time_t now = time(NULL);
if (peer->when_lost == 0) {
// Track when we first got into this contradictory state
peer->when_lost = now;
} else if (now > (peer->when_lost + 60)) {
// If it persists for more than a minute, update the state
crm_warn("Node %u is member of group %s but was believed "
"offline",
member_list[i].nodeid, group_name->value);
pcmk__update_peer_state(__func__, peer, PCMK_VALUE_MEMBER, 0);
}
}
if (local_nodeid == member_list[i].nodeid) {
found = true;
}
}
if (!found) {
crm_err("Local node was evicted from group %s", group_name->value);
cpg_evicted = true;
}
counter++;
}
/*!
* \brief Set the CPG deliver callback function for a cluster object
*
* \param[in,out] cluster Cluster object
* \param[in] fn Deliver callback function to set
*
* \return Standard Pacemaker return code
*/
int
pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
{
if (cluster == NULL) {
return EINVAL;
}
cluster->cpg.cpg_deliver_fn = fn;
return pcmk_rc_ok;
}
/*!
* \brief Set the CPG config change callback function for a cluster object
*
* \param[in,out] cluster Cluster object
* \param[in] fn Configuration change callback function to set
*
* \return Standard Pacemaker return code
*/
int
pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
{
if (cluster == NULL) {
return EINVAL;
}
cluster->cpg.cpg_confchg_fn = fn;
return pcmk_rc_ok;
}
/*!
* \brief Connect to Corosync CPG
*
* \param[in,out] cluster Initialized cluster object to connect
*
* \return Standard Pacemaker return code
*/
int
pcmk__cpg_connect(pcmk_cluster_t *cluster)
{
cs_error_t rc;
int fd = -1;
int retries = 0;
uint32_t id = 0;
pcmk__node_status_t *peer = NULL;
cpg_handle_t handle = 0;
const char *cpg_group_name = NULL;
uid_t found_uid = 0;
gid_t found_gid = 0;
pid_t found_pid = 0;
int rv;
struct mainloop_fd_callbacks cpg_fd_callbacks = {
.dispatch = pcmk_cpg_dispatch,
.destroy = cluster->destroy,
};
cpg_model_v1_data_t cpg_model_info = {
.model = CPG_MODEL_V1,
.cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
.cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
.cpg_totem_confchg_fn = NULL,
.flags = 0,
};
cpg_evicted = false;
cpg_group_name = pcmk__server_message_type(cluster->priv->server);
if (cpg_group_name == NULL) {
/* The name will already be non-NULL for Pacemaker servers. If a
* command-line tool or external caller connects to the cluster,
* they will join this CPG group.
*/
cpg_group_name = pcmk__s(crm_system_name, "unknown");
}
memset(cluster->priv->group.value, 0, 128);
strncpy(cluster->priv->group.value, cpg_group_name, 127);
cluster->priv->group.length = strlen(cluster->priv->group.value) + 1;
cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
if (rc != CS_OK) {
crm_err("Could not connect to the CPG API: %s (%d)",
cs_strerror(rc), rc);
goto bail;
}
rc = cpg_fd_get(handle, &fd);
if (rc != CS_OK) {
crm_err("Could not obtain the CPG API connection: %s (%d)",
cs_strerror(rc), rc);
goto bail;
}
/* CPG provider run as root (in given user namespace, anyway)? */
if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
&found_uid, &found_gid))) {
crm_err("CPG provider is not authentic:"
" process %lld (uid: %lld, gid: %lld)",
(long long) PCMK__SPECIAL_PID_AS_0(found_pid),
(long long) found_uid, (long long) found_gid);
rc = CS_ERR_ACCESS;
goto bail;
} else if (rv < 0) {
crm_err("Could not verify authenticity of CPG provider: %s (%d)",
strerror(-rv), -rv);
rc = CS_ERR_ACCESS;
goto bail;
}
id = pcmk__cpg_local_nodeid(handle);
if (id == 0) {
crm_err("Could not get local node id from the CPG API");
goto bail;
}
cluster->priv->node_id = id;
retries = 0;
cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->priv->group));
if (rc != CS_OK) {
crm_err("Could not join the CPG group '%s': %d", cpg_group_name, rc);
goto bail;
}
pcmk_cpg_handle = handle;
cluster->priv->cpg_handle = handle;
mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
bail:
if (rc != CS_OK) {
cpg_finalize(handle);
// @TODO Map rc to more specific Pacemaker return code
return ENOTCONN;
}
peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member);
crm_update_peer_proc(__func__, peer, crm_proc_cpg, PCMK_VALUE_ONLINE);
return pcmk_rc_ok;
}
/*!
* \internal
* \brief Disconnect from Corosync CPG
*
* \param[in,out] cluster Cluster object to disconnect
*/
void
pcmk__cpg_disconnect(pcmk_cluster_t *cluster)
{
pcmk_cpg_handle = 0;
if (cluster->priv->cpg_handle != 0) {
crm_trace("Disconnecting CPG");
cpg_leave(cluster->priv->cpg_handle, &cluster->priv->group);
cpg_finalize(cluster->priv->cpg_handle);
cluster->priv->cpg_handle = 0;
} else {
crm_info("No CPG connection");
}
}
/*!
* \internal
* \brief Send string data via Corosync CPG
*
* \param[in] data Data to send
* \param[in] node Cluster node to send message to
* \param[in] dest Type of message to send
*
* \return \c true on success, or \c false otherwise
*/
static bool
send_cpg_text(const char *data, const pcmk__node_status_t *node,
enum pcmk_ipc_server dest)
{
static int msg_id = 0;
static int local_pid = 0;
static int local_name_len = 0;
static const char *local_name = NULL;
char *target = NULL;
struct iovec *iov;
pcmk__cpg_msg_t *msg = NULL;
if (local_name == NULL) {
local_name = pcmk__cluster_local_node_name();
}
if ((local_name_len == 0) && (local_name != NULL)) {
local_name_len = strlen(local_name);
}
if (data == NULL) {
data = "";
}
if (local_pid == 0) {
local_pid = getpid();
}
msg = pcmk__assert_alloc(1, sizeof(pcmk__cpg_msg_t));
msg_id++;
msg->id = msg_id;
msg->header.error = CS_OK;
msg->host.type = dest;
if (node != NULL) {
if (node->name != NULL) {
target = pcmk__str_copy(node->name);
msg->host.size = strlen(node->name);
memset(msg->host.uname, 0, MAX_NAME);
memcpy(msg->host.uname, node->name, msg->host.size);
} else {
target = crm_strdup_printf("%" PRIu32, node->cluster_layer_id);
}
msg->host.id = node->cluster_layer_id;
} else {
target = pcmk__str_copy("all");
}
msg->sender.id = 0;
msg->sender.type = pcmk__parse_server(crm_system_name);
msg->sender.pid = local_pid;
msg->sender.size = local_name_len;
memset(msg->sender.uname, 0, MAX_NAME);
if ((local_name != NULL) && (msg->sender.size != 0)) {
memcpy(msg->sender.uname, local_name, msg->sender.size);
}
msg->size = 1 + strlen(data);
msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
if (msg->size < CRM_BZ2_THRESHOLD) {
msg = pcmk__realloc(msg, msg->header.size);
memcpy(msg->data, data, msg->size);
} else {
char *compressed = NULL;
unsigned int new_size = 0;
if (pcmk__compress(data, (unsigned int) msg->size, 0, &compressed,
&new_size) == pcmk_rc_ok) {
msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
msg = pcmk__realloc(msg, msg->header.size);
memcpy(msg->data, compressed, new_size);
msg->is_compressed = TRUE;
msg->compressed_size = new_size;
} else {
// cppcheck seems not to understand the abort logic in pcmk__realloc
// cppcheck-suppress memleak
msg = pcmk__realloc(msg, msg->header.size);
memcpy(msg->data, data, msg->size);
}
free(compressed);
}
iov = pcmk__assert_alloc(1, sizeof(struct iovec));
iov->iov_base = msg;
iov->iov_len = msg->header.size;
if (msg->compressed_size > 0) {
- crm_trace("Queueing CPG message %u to %s "
- "(%llu bytes, %d bytes compressed payload): %.200s",
- msg->id, target, (unsigned long long) iov->iov_len,
- msg->compressed_size, data);
+ crm_trace("Queueing CPG message %" PRIu32 " to %s "
+ "(%zu bytes, %" PRIu32 " bytes compressed payload): %.200s",
+ msg->id, target, iov->iov_len, msg->compressed_size, data);
} else {
- crm_trace("Queueing CPG message %u to %s "
- "(%llu bytes, %d bytes payload): %.200s",
- msg->id, target, (unsigned long long) iov->iov_len,
- msg->size, data);
+ crm_trace("Queueing CPG message %" PRIu32 " to %s "
+ "(%zu bytes, %" PRIu32 " bytes payload): %.200s",
+ msg->id, target, iov->iov_len, msg->size, data);
}
free(target);
cs_message_queue = g_list_append(cs_message_queue, iov);
crm_cs_flush(&pcmk_cpg_handle);
return true;
}
/*!
* \internal
* \brief Send an XML message via Corosync CPG
*
* \param[in] msg XML message to send
* \param[in] node Cluster node to send message to
* \param[in] dest Type of message to send
*
* \return TRUE on success, otherwise FALSE
*/
bool
pcmk__cpg_send_xml(const xmlNode *msg, const pcmk__node_status_t *node,
enum pcmk_ipc_server dest)
{
bool rc = true;
GString *data = g_string_sized_new(1024);
pcmk__xml_string(msg, 0, data, 0);
rc = send_cpg_text(data->str, node, dest);
g_string_free(data, TRUE);
return rc;
}
diff --git a/lib/common/ipc_server.c b/lib/common/ipc_server.c
index 1912fadd93..2a2d791053 100644
--- a/lib/common/ipc_server.c
+++ b/lib/common/ipc_server.c
@@ -1,1010 +1,1009 @@
/*
- * Copyright 2004-2024 the Pacemaker project contributors
+ * Copyright 2004-2025 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <stdio.h>
#include <errno.h>
#include <bzlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <crm/crm.h>
#include <crm/common/xml.h>
#include <crm/common/ipc.h>
#include <crm/common/ipc_internal.h>
#include "crmcommon_private.h"
/* Evict clients whose event queue grows this large (by default) */
#define PCMK_IPC_DEFAULT_QUEUE_MAX 500
static GHashTable *client_connections = NULL;
/*!
* \internal
* \brief Count IPC clients
*
* \return Number of active IPC client connections
*/
guint
pcmk__ipc_client_count(void)
{
return client_connections? g_hash_table_size(client_connections) : 0;
}
/*!
* \internal
* \brief Execute a function for each active IPC client connection
*
* \param[in] func Function to call
* \param[in,out] user_data Pointer to pass to function
*
* \note The parameters are the same as for g_hash_table_foreach().
*/
void
pcmk__foreach_ipc_client(GHFunc func, gpointer user_data)
{
if ((func != NULL) && (client_connections != NULL)) {
g_hash_table_foreach(client_connections, func, user_data);
}
}
pcmk__client_t *
pcmk__find_client(const qb_ipcs_connection_t *c)
{
if (client_connections) {
return g_hash_table_lookup(client_connections, c);
}
crm_trace("No client found for %p", c);
return NULL;
}
pcmk__client_t *
pcmk__find_client_by_id(const char *id)
{
if ((client_connections != NULL) && (id != NULL)) {
gpointer key;
pcmk__client_t *client = NULL;
GHashTableIter iter;
g_hash_table_iter_init(&iter, client_connections);
while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
if (strcmp(client->id, id) == 0) {
return client;
}
}
}
crm_trace("No client found with id='%s'", pcmk__s(id, ""));
return NULL;
}
/*!
* \internal
* \brief Get a client identifier for use in log messages
*
* \param[in] c Client
*
* \return Client's name, client's ID, or a string literal, as available
* \note This is intended to be used in format strings like "client %s".
*/
const char *
pcmk__client_name(const pcmk__client_t *c)
{
if (c == NULL) {
return "(unspecified)";
} else if (c->name != NULL) {
return c->name;
} else if (c->id != NULL) {
return c->id;
} else {
return "(unidentified)";
}
}
void
pcmk__client_cleanup(void)
{
if (client_connections != NULL) {
int active = g_hash_table_size(client_connections);
if (active > 0) {
crm_warn("Exiting with %d active IPC client%s",
active, pcmk__plural_s(active));
}
g_hash_table_destroy(client_connections);
client_connections = NULL;
}
}
void
pcmk__drop_all_clients(qb_ipcs_service_t *service)
{
qb_ipcs_connection_t *c = NULL;
if (service == NULL) {
return;
}
c = qb_ipcs_connection_first_get(service);
while (c != NULL) {
qb_ipcs_connection_t *last = c;
c = qb_ipcs_connection_next_get(service, last);
/* There really shouldn't be anyone connected at this point */
crm_notice("Disconnecting client %p, pid=%d...",
last, pcmk__client_pid(last));
qb_ipcs_disconnect(last);
qb_ipcs_connection_unref(last);
}
}
/*!
* \internal
* \brief Allocate a new pcmk__client_t object based on an IPC connection
*
* \param[in] c IPC connection (NULL to allocate generic client)
* \param[in] key Connection table key (NULL to use sane default)
* \param[in] uid_client UID corresponding to c (ignored if c is NULL)
*
* \return Pointer to new pcmk__client_t (guaranteed not to be \c NULL)
*/
static pcmk__client_t *
client_from_connection(qb_ipcs_connection_t *c, void *key, uid_t uid_client)
{
pcmk__client_t *client = pcmk__assert_alloc(1, sizeof(pcmk__client_t));
if (c) {
client->user = pcmk__uid2username(uid_client);
if (client->user == NULL) {
client->user = pcmk__str_copy("#unprivileged");
crm_err("Unable to enforce ACLs for user ID %d, assuming unprivileged",
uid_client);
}
client->ipcs = c;
pcmk__set_client_flags(client, pcmk__client_ipc);
client->pid = pcmk__client_pid(c);
if (key == NULL) {
key = c;
}
}
client->id = crm_generate_uuid();
if (key == NULL) {
key = client->id;
}
if (client_connections == NULL) {
crm_trace("Creating IPC client table");
client_connections = g_hash_table_new(g_direct_hash, g_direct_equal);
}
g_hash_table_insert(client_connections, key, client);
return client;
}
/*!
* \brief Allocate a new pcmk__client_t object and generate its ID
*
* \param[in] key What to use as connections hash table key (NULL to use ID)
*
* \return Pointer to new pcmk__client_t (asserts on failure)
*/
pcmk__client_t *
pcmk__new_unauth_client(void *key)
{
return client_from_connection(NULL, key, 0);
}
pcmk__client_t *
pcmk__new_client(qb_ipcs_connection_t *c, uid_t uid_client, gid_t gid_client)
{
gid_t uid_cluster = 0;
gid_t gid_cluster = 0;
pcmk__client_t *client = NULL;
CRM_CHECK(c != NULL, return NULL);
if (pcmk_daemon_user(&uid_cluster, &gid_cluster) < 0) {
static bool need_log = TRUE;
if (need_log) {
crm_warn("Could not find user and group IDs for user %s",
CRM_DAEMON_USER);
need_log = FALSE;
}
}
if (uid_client != 0) {
crm_trace("Giving group %u access to new IPC connection", gid_cluster);
/* Passing -1 to chown(2) means don't change */
qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
}
/* TODO: Do our own auth checking, return NULL if unauthorized */
client = client_from_connection(c, NULL, uid_client);
if ((uid_client == 0) || (uid_client == uid_cluster)) {
/* Remember when a connection came from root or hacluster */
pcmk__set_client_flags(client, pcmk__client_privileged);
}
crm_debug("New IPC client %s for PID %u with uid %d and gid %d",
client->id, client->pid, uid_client, gid_client);
return client;
}
static struct iovec *
pcmk__new_ipc_event(void)
{
return (struct iovec *) pcmk__assert_alloc(2, sizeof(struct iovec));
}
/*!
* \brief Free an I/O vector created by pcmk__ipc_prepare_iov()
*
* \param[in,out] event I/O vector to free
*/
void
pcmk_free_ipc_event(struct iovec *event)
{
if (event != NULL) {
free(event[0].iov_base);
free(event[1].iov_base);
free(event);
}
}
static void
free_event(gpointer data)
{
pcmk_free_ipc_event((struct iovec *) data);
}
static void
add_event(pcmk__client_t *c, struct iovec *iov)
{
if (c->event_queue == NULL) {
c->event_queue = g_queue_new();
}
g_queue_push_tail(c->event_queue, iov);
}
void
pcmk__free_client(pcmk__client_t *c)
{
if (c == NULL) {
return;
}
if (client_connections) {
if (c->ipcs) {
crm_trace("Destroying %p/%p (%d remaining)",
c, c->ipcs, g_hash_table_size(client_connections) - 1);
g_hash_table_remove(client_connections, c->ipcs);
} else {
crm_trace("Destroying remote connection %p (%d remaining)",
c, g_hash_table_size(client_connections) - 1);
g_hash_table_remove(client_connections, c->id);
}
}
if (c->event_timer) {
g_source_remove(c->event_timer);
}
if (c->event_queue) {
crm_debug("Destroying %d events", g_queue_get_length(c->event_queue));
g_queue_free_full(c->event_queue, free_event);
}
free(c->id);
free(c->name);
free(c->user);
if (c->remote) {
if (c->remote->auth_timeout) {
g_source_remove(c->remote->auth_timeout);
}
if (c->remote->tls_session != NULL) {
/* @TODO Reduce duplication at callers. Put here everything
* necessary to tear down and free tls_session.
*/
gnutls_deinit(c->remote->tls_session);
}
free(c->remote->buffer);
free(c->remote);
}
free(c);
}
/*!
* \internal
* \brief Raise IPC eviction threshold for a client, if allowed
*
* \param[in,out] client Client to modify
* \param[in] qmax New threshold
*/
void
pcmk__set_client_queue_max(pcmk__client_t *client, const char *qmax)
{
int rc = pcmk_rc_ok;
long long qmax_ll = 0LL;
unsigned int orig_value = 0U;
CRM_CHECK(client != NULL, return);
orig_value = client->queue_max;
if (pcmk_is_set(client->flags, pcmk__client_privileged)) {
rc = pcmk__scan_ll(qmax, &qmax_ll, 0LL);
if (rc == pcmk_rc_ok) {
if ((qmax_ll <= 0LL) || (qmax_ll > UINT_MAX)) {
rc = ERANGE;
} else {
client->queue_max = (unsigned int) qmax_ll;
}
}
} else {
rc = EACCES;
}
if (rc != pcmk_rc_ok) {
crm_info("Could not set IPC threshold for client %s[%u] to %s: %s",
pcmk__client_name(client), client->pid,
pcmk__s(qmax, "default"), pcmk_rc_str(rc));
} else if (client->queue_max != orig_value) {
crm_debug("IPC threshold for client %s[%u] is now %u (was %u)",
pcmk__client_name(client), client->pid,
client->queue_max, orig_value);
}
}
int
pcmk__client_pid(qb_ipcs_connection_t *c)
{
struct qb_ipcs_connection_stats stats;
stats.client_pid = 0;
qb_ipcs_connection_stats_get(c, &stats, 0);
return stats.client_pid;
}
/*!
* \internal
* \brief Retrieve message XML from data read from client IPC
*
* \param[in,out] c IPC client connection
* \param[in] data Data read from client connection
* \param[out] id Where to store message ID from libqb header
* \param[out] flags Where to store flags from libqb header
*
* \return Message XML on success, NULL otherwise
*/
xmlNode *
pcmk__client_data2xml(pcmk__client_t *c, void *data, uint32_t *id,
uint32_t *flags)
{
xmlNode *xml = NULL;
char *uncompressed = NULL;
char *text = ((char *)data) + sizeof(pcmk__ipc_header_t);
pcmk__ipc_header_t *header = data;
if (!pcmk__valid_ipc_header(header)) {
return NULL;
}
if (id) {
*id = ((struct qb_ipc_response_header *)data)->id;
}
if (flags) {
*flags = header->flags;
}
if (pcmk_is_set(header->flags, crm_ipc_proxied)) {
/* Mark this client as being the endpoint of a proxy connection.
* Proxy connections responses are sent on the event channel, to avoid
* blocking the controller serving as proxy.
*/
pcmk__set_client_flags(c, pcmk__client_proxied);
}
if (header->size_compressed) {
int rc = 0;
unsigned int size_u = 1 + header->size_uncompressed;
uncompressed = pcmk__assert_alloc(1, size_u);
crm_trace("Decompressing message data %u bytes into %u bytes",
header->size_compressed, size_u);
rc = BZ2_bzBuffToBuffDecompress(uncompressed, &size_u, text, header->size_compressed, 1, 0);
text = uncompressed;
rc = pcmk__bzlib2rc(rc);
if (rc != pcmk_rc_ok) {
crm_err("Decompression failed: %s " QB_XS " rc=%d",
pcmk_rc_str(rc), rc);
free(uncompressed);
return NULL;
}
}
pcmk__assert(text[header->size_uncompressed - 1] == 0);
xml = pcmk__xml_parse(text);
crm_log_xml_trace(xml, "[IPC received]");
free(uncompressed);
return xml;
}
static int crm_ipcs_flush_events(pcmk__client_t *c);
static gboolean
crm_ipcs_flush_events_cb(gpointer data)
{
pcmk__client_t *c = data;
c->event_timer = 0;
crm_ipcs_flush_events(c);
return FALSE;
}
/*!
* \internal
* \brief Add progressive delay before next event queue flush
*
* \param[in,out] c Client connection to add delay to
* \param[in] queue_len Current event queue length
*/
static inline void
delay_next_flush(pcmk__client_t *c, unsigned int queue_len)
{
/* Delay a maximum of 1.5 seconds */
guint delay = (queue_len < 5)? (1000 + 100 * queue_len) : 1500;
c->event_timer = pcmk__create_timer(delay, crm_ipcs_flush_events_cb, c);
}
/*!
* \internal
* \brief Send client any messages in its queue
*
* \param[in,out] c Client to flush
*
* \return Standard Pacemaker return value
*/
static int
crm_ipcs_flush_events(pcmk__client_t *c)
{
int rc = pcmk_rc_ok;
ssize_t qb_rc = 0;
unsigned int sent = 0;
unsigned int queue_len = 0;
if (c == NULL) {
return rc;
} else if (c->event_timer) {
/* There is already a timer, wait until it goes off */
crm_trace("Timer active for %p - %d", c->ipcs, c->event_timer);
return rc;
}
if (c->event_queue) {
queue_len = g_queue_get_length(c->event_queue);
}
while (sent < 100) {
pcmk__ipc_header_t *header = NULL;
struct iovec *event = NULL;
if (c->event_queue) {
// We don't pop unless send is successful
event = g_queue_peek_head(c->event_queue);
}
if (event == NULL) { // Queue is empty
break;
}
qb_rc = qb_ipcs_event_sendv(c->ipcs, event, 2);
if (qb_rc < 0) {
rc = (int) -qb_rc;
break;
}
event = g_queue_pop_head(c->event_queue);
sent++;
header = event[0].iov_base;
if (header->size_compressed) {
- crm_trace("Event %d to %p[%d] (%lld compressed bytes) sent",
- header->qb.id, c->ipcs, c->pid, (long long) qb_rc);
+ crm_trace("Event %" PRId32 " to %p[%u] (%zd compressed bytes) sent",
+ header->qb.id, c->ipcs, c->pid, qb_rc);
} else {
- crm_trace("Event %d to %p[%d] (%lld bytes) sent: %.120s",
- header->qb.id, c->ipcs, c->pid, (long long) qb_rc,
+ crm_trace("Event %" PRId32 " to %p[%u] (%zd bytes) sent: %.120s",
+ header->qb.id, c->ipcs, c->pid, qb_rc,
(char *) (event[1].iov_base));
}
pcmk_free_ipc_event(event);
}
queue_len -= sent;
if (sent > 0 || queue_len) {
- crm_trace("Sent %d events (%d remaining) for %p[%d]: %s (%lld)",
- sent, queue_len, c->ipcs, c->pid,
- pcmk_rc_str(rc), (long long) qb_rc);
+ crm_trace("Sent %u events (%u remaining) for %p[%d]: %s (%zd)",
+ sent, queue_len, c->ipcs, c->pid, pcmk_rc_str(rc), qb_rc);
}
if (queue_len) {
/* Allow clients to briefly fall behind on processing incoming messages,
* but drop completely unresponsive clients so the connection doesn't
* consume resources indefinitely.
*/
if (queue_len > QB_MAX(c->queue_max, PCMK_IPC_DEFAULT_QUEUE_MAX)) {
if ((c->queue_backlog <= 1) || (queue_len < c->queue_backlog)) {
/* Don't evict for a new or shrinking backlog */
crm_warn("Client with process ID %u has a backlog of %u messages "
QB_XS " %p", c->pid, queue_len, c->ipcs);
} else {
crm_err("Evicting client with process ID %u due to backlog of %u messages "
QB_XS " %p", c->pid, queue_len, c->ipcs);
c->queue_backlog = 0;
qb_ipcs_disconnect(c->ipcs);
return rc;
}
}
c->queue_backlog = queue_len;
delay_next_flush(c, queue_len);
} else {
/* Event queue is empty, there is no backlog */
c->queue_backlog = 0;
}
return rc;
}
/*!
* \internal
* \brief Create an I/O vector for sending an IPC XML message
*
* \param[in] request Identifier for libqb response header
* \param[in] message XML message to send
* \param[in] max_send_size If 0, default IPC buffer size is used
* \param[out] result Where to store prepared I/O vector
* \param[out] bytes Size of prepared data in bytes
*
* \return Standard Pacemaker return code
*/
int
pcmk__ipc_prepare_iov(uint32_t request, const xmlNode *message,
uint32_t max_send_size, struct iovec **result,
ssize_t *bytes)
{
struct iovec *iov;
unsigned int total = 0;
GString *buffer = NULL;
pcmk__ipc_header_t *header = NULL;
int rc = pcmk_rc_ok;
if ((message == NULL) || (result == NULL)) {
rc = EINVAL;
goto done;
}
header = calloc(1, sizeof(pcmk__ipc_header_t));
if (header == NULL) {
rc = ENOMEM;
goto done;
}
buffer = g_string_sized_new(1024);
pcmk__xml_string(message, 0, buffer, 0);
if (max_send_size == 0) {
max_send_size = crm_ipc_default_buffer_size();
}
CRM_LOG_ASSERT(max_send_size != 0);
*result = NULL;
iov = pcmk__new_ipc_event();
iov[0].iov_len = sizeof(pcmk__ipc_header_t);
iov[0].iov_base = header;
header->version = PCMK__IPC_VERSION;
header->size_uncompressed = 1 + buffer->len;
total = iov[0].iov_len + header->size_uncompressed;
if (total < max_send_size) {
iov[1].iov_base = pcmk__str_copy(buffer->str);
iov[1].iov_len = header->size_uncompressed;
} else {
static unsigned int biggest = 0;
char *compressed = NULL;
unsigned int new_size = 0;
if (pcmk__compress(buffer->str,
(unsigned int) header->size_uncompressed,
(unsigned int) max_send_size, &compressed,
&new_size) == pcmk_rc_ok) {
pcmk__set_ipc_flags(header->flags, "send data", crm_ipc_compressed);
header->size_compressed = new_size;
iov[1].iov_len = header->size_compressed;
iov[1].iov_base = compressed;
biggest = QB_MAX(header->size_compressed, biggest);
} else {
crm_log_xml_trace(message, "EMSGSIZE");
biggest = QB_MAX(header->size_uncompressed, biggest);
crm_err("Could not compress %u-byte message into less than IPC "
"limit of %u bytes; set PCMK_ipc_buffer to higher value "
"(%u bytes suggested)",
header->size_uncompressed, max_send_size, 4 * biggest);
free(compressed);
pcmk_free_ipc_event(iov);
rc = EMSGSIZE;
goto done;
}
}
header->qb.size = iov[0].iov_len + iov[1].iov_len;
header->qb.id = (int32_t)request; /* Replying to a specific request */
*result = iov;
pcmk__assert(header->qb.size > 0);
if (bytes != NULL) {
*bytes = header->qb.size;
}
done:
if (buffer != NULL) {
g_string_free(buffer, TRUE);
}
return rc;
}
int
pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags)
{
int rc = pcmk_rc_ok;
static uint32_t id = 1;
pcmk__ipc_header_t *header = iov[0].iov_base;
if (c->flags & pcmk__client_proxied) {
/* _ALL_ replies to proxied connections need to be sent as events */
if (!pcmk_is_set(flags, crm_ipc_server_event)) {
/* The proxied flag lets us know this was originally meant to be a
* response, even though we're sending it over the event channel.
*/
pcmk__set_ipc_flags(flags, "server event",
crm_ipc_server_event
|crm_ipc_proxied_relay_response);
}
}
pcmk__set_ipc_flags(header->flags, "server event", flags);
if (flags & crm_ipc_server_event) {
header->qb.id = id++; /* We don't really use it, but doesn't hurt to set one */
if (flags & crm_ipc_server_free) {
crm_trace("Sending the original to %p[%d]", c->ipcs, c->pid);
add_event(c, iov);
} else {
struct iovec *iov_copy = pcmk__new_ipc_event();
crm_trace("Sending a copy to %p[%d]", c->ipcs, c->pid);
iov_copy[0].iov_len = iov[0].iov_len;
iov_copy[0].iov_base = malloc(iov[0].iov_len);
memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
iov_copy[1].iov_len = iov[1].iov_len;
iov_copy[1].iov_base = malloc(iov[1].iov_len);
memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
add_event(c, iov_copy);
}
} else {
ssize_t qb_rc;
CRM_LOG_ASSERT(header->qb.id != 0); /* Replying to a specific request */
qb_rc = qb_ipcs_response_sendv(c->ipcs, iov, 2);
if (qb_rc < header->qb.size) {
if (qb_rc < 0) {
rc = (int) -qb_rc;
}
- crm_notice("Response %d to pid %d failed: %s "
- QB_XS " bytes=%u rc=%lld ipcs=%p",
+ crm_notice("Response %" PRId32 " to pid %u failed: %s "
+ QB_XS " bytes=%" PRId32 " rc=%zd ipcs=%p",
header->qb.id, c->pid, pcmk_rc_str(rc),
- header->qb.size, (long long) qb_rc, c->ipcs);
+ header->qb.size, qb_rc, c->ipcs);
} else {
- crm_trace("Response %d sent, %lld bytes to %p[%d]",
- header->qb.id, (long long) qb_rc, c->ipcs, c->pid);
+ crm_trace("Response %" PRId32 " sent, %zd bytes to %p[%u]",
+ header->qb.id, qb_rc, c->ipcs, c->pid);
}
if (flags & crm_ipc_server_free) {
pcmk_free_ipc_event(iov);
}
}
if (flags & crm_ipc_server_event) {
rc = crm_ipcs_flush_events(c);
} else {
crm_ipcs_flush_events(c);
}
if ((rc == EPIPE) || (rc == ENOTCONN)) {
crm_trace("Client %p disconnected", c->ipcs);
}
return rc;
}
int
pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message,
uint32_t flags)
{
struct iovec *iov = NULL;
int rc = pcmk_rc_ok;
if (c == NULL) {
return EINVAL;
}
rc = pcmk__ipc_prepare_iov(request, message, crm_ipc_default_buffer_size(),
&iov, NULL);
if (rc == pcmk_rc_ok) {
pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free);
rc = pcmk__ipc_send_iov(c, iov, flags);
} else {
pcmk_free_ipc_event(iov);
crm_notice("IPC message to pid %d failed: %s " QB_XS " rc=%d",
c->pid, pcmk_rc_str(rc), rc);
}
return rc;
}
/*!
* \internal
* \brief Create an acknowledgement with a status code to send to a client
*
* \param[in] function Calling function
* \param[in] line Source file line within calling function
* \param[in] flags IPC flags to use when sending
* \param[in] tag Element name to use for acknowledgement
* \param[in] ver IPC protocol version (can be NULL)
* \param[in] status Exit status code to add to ack
*
* \return Newly created XML for ack
*
* \note The caller is responsible for freeing the return value with
* \c pcmk__xml_free().
*/
xmlNode *
pcmk__ipc_create_ack_as(const char *function, int line, uint32_t flags,
const char *tag, const char *ver, crm_exit_t status)
{
xmlNode *ack = NULL;
if (pcmk_is_set(flags, crm_ipc_client_response)) {
ack = pcmk__xe_create(NULL, tag);
crm_xml_add(ack, PCMK_XA_FUNCTION, function);
crm_xml_add_int(ack, PCMK__XA_LINE, line);
crm_xml_add_int(ack, PCMK_XA_STATUS, (int) status);
crm_xml_add(ack, PCMK__XA_IPC_PROTO_VERSION, ver);
}
return ack;
}
/*!
* \internal
* \brief Send an acknowledgement with a status code to a client
*
* \param[in] function Calling function
* \param[in] line Source file line within calling function
* \param[in] c Client to send ack to
* \param[in] request Request ID being replied to
* \param[in] flags IPC flags to use when sending
* \param[in] tag Element name to use for acknowledgement
* \param[in] ver IPC protocol version (can be NULL)
* \param[in] status Status code to send with acknowledgement
*
* \return Standard Pacemaker return code
*/
int
pcmk__ipc_send_ack_as(const char *function, int line, pcmk__client_t *c,
uint32_t request, uint32_t flags, const char *tag,
const char *ver, crm_exit_t status)
{
int rc = pcmk_rc_ok;
xmlNode *ack = pcmk__ipc_create_ack_as(function, line, flags, tag, ver, status);
if (ack != NULL) {
crm_trace("Ack'ing IPC message from client %s as <%s status=%d>",
pcmk__client_name(c), tag, status);
crm_log_xml_trace(ack, "sent-ack");
c->request_id = 0;
rc = pcmk__ipc_send_xml(c, request, ack, flags);
pcmk__xml_free(ack);
}
return rc;
}
/*!
* \internal
* \brief Add an IPC server to the main loop for the CIB manager API
*
* \param[out] ipcs_ro New IPC server for read-only CIB manager API
* \param[out] ipcs_rw New IPC server for read/write CIB manager API
* \param[out] ipcs_shm New IPC server for shared-memory CIB manager API
* \param[in] ro_cb IPC callbacks for read-only API
* \param[in] rw_cb IPC callbacks for read/write and shared-memory APIs
*
* \note This function exits fatally if unable to create the servers.
* \note There is no actual difference between the three IPC endpoints other
* than their names.
*/
void pcmk__serve_based_ipc(qb_ipcs_service_t **ipcs_ro,
qb_ipcs_service_t **ipcs_rw,
qb_ipcs_service_t **ipcs_shm,
struct qb_ipcs_service_handlers *ro_cb,
struct qb_ipcs_service_handlers *rw_cb)
{
*ipcs_ro = mainloop_add_ipc_server(PCMK__SERVER_BASED_RO,
QB_IPC_NATIVE, ro_cb);
*ipcs_rw = mainloop_add_ipc_server(PCMK__SERVER_BASED_RW,
QB_IPC_NATIVE, rw_cb);
*ipcs_shm = mainloop_add_ipc_server(PCMK__SERVER_BASED_SHM,
QB_IPC_SHM, rw_cb);
if (*ipcs_ro == NULL || *ipcs_rw == NULL || *ipcs_shm == NULL) {
crm_err("Failed to create the CIB manager: exiting and inhibiting respawn");
crm_warn("Verify pacemaker and pacemaker_remote are not both enabled");
crm_exit(CRM_EX_FATAL);
}
}
/*!
* \internal
* \brief Destroy IPC servers for the CIB manager API
*
* \param[out] ipcs_ro IPC server for read-only the CIB manager API
* \param[out] ipcs_rw IPC server for read/write the CIB manager API
* \param[out] ipcs_shm IPC server for shared-memory the CIB manager API
*
* \note This is a convenience function for calling qb_ipcs_destroy() for each
* argument.
*/
void
pcmk__stop_based_ipc(qb_ipcs_service_t *ipcs_ro,
qb_ipcs_service_t *ipcs_rw,
qb_ipcs_service_t *ipcs_shm)
{
qb_ipcs_destroy(ipcs_ro);
qb_ipcs_destroy(ipcs_rw);
qb_ipcs_destroy(ipcs_shm);
}
/*!
* \internal
* \brief Add an IPC server to the main loop for the controller API
*
* \param[in] cb IPC callbacks
*
* \return Newly created IPC server
*/
qb_ipcs_service_t *
pcmk__serve_controld_ipc(struct qb_ipcs_service_handlers *cb)
{
return mainloop_add_ipc_server(CRM_SYSTEM_CRMD, QB_IPC_NATIVE, cb);
}
/*!
* \internal
* \brief Add an IPC server to the main loop for the attribute manager API
*
* \param[out] ipcs Where to store newly created IPC server
* \param[in] cb IPC callbacks
*
* \note This function exits fatally if unable to create the servers.
*/
void
pcmk__serve_attrd_ipc(qb_ipcs_service_t **ipcs,
struct qb_ipcs_service_handlers *cb)
{
*ipcs = mainloop_add_ipc_server(PCMK__VALUE_ATTRD, QB_IPC_NATIVE, cb);
if (*ipcs == NULL) {
crm_crit("Exiting fatally because unable to serve " PCMK__SERVER_ATTRD
" IPC (verify pacemaker and pacemaker_remote are not both "
"enabled)");
crm_exit(CRM_EX_FATAL);
}
}
/*!
* \internal
* \brief Add an IPC server to the main loop for the fencer API
*
* \param[out] ipcs Where to store newly created IPC server
* \param[in] cb IPC callbacks
*
* \note This function exits fatally if unable to create the servers.
*/
void
pcmk__serve_fenced_ipc(qb_ipcs_service_t **ipcs,
struct qb_ipcs_service_handlers *cb)
{
*ipcs = mainloop_add_ipc_server_with_prio("stonith-ng", QB_IPC_NATIVE, cb,
QB_LOOP_HIGH);
if (*ipcs == NULL) {
crm_err("Failed to create fencer: exiting and inhibiting respawn.");
crm_warn("Verify pacemaker and pacemaker_remote are not both enabled.");
crm_exit(CRM_EX_FATAL);
}
}
/*!
* \internal
* \brief Add an IPC server to the main loop for the pacemakerd API
*
* \param[out] ipcs Where to store newly created IPC server
* \param[in] cb IPC callbacks
*
* \note This function exits with CRM_EX_OSERR if unable to create the servers.
*/
void
pcmk__serve_pacemakerd_ipc(qb_ipcs_service_t **ipcs,
struct qb_ipcs_service_handlers *cb)
{
*ipcs = mainloop_add_ipc_server(CRM_SYSTEM_MCP, QB_IPC_NATIVE, cb);
if (*ipcs == NULL) {
crm_err("Couldn't start pacemakerd IPC server");
crm_warn("Verify pacemaker and pacemaker_remote are not both enabled.");
/* sub-daemons are observed by pacemakerd. Thus we exit CRM_EX_FATAL
* if we want to prevent pacemakerd from restarting them.
* With pacemakerd we leave the exit-code shown to e.g. systemd
* to what it was prior to moving the code here from pacemakerd.c
*/
crm_exit(CRM_EX_OSERR);
}
}
/*!
* \internal
* \brief Add an IPC server to the main loop for the scheduler API
*
* \param[in] cb IPC callbacks
*
* \return Newly created IPC server
* \note This function exits fatally if unable to create the servers.
*/
qb_ipcs_service_t *
pcmk__serve_schedulerd_ipc(struct qb_ipcs_service_handlers *cb)
{
return mainloop_add_ipc_server(CRM_SYSTEM_PENGINE, QB_IPC_NATIVE, cb);
}
diff --git a/lib/common/remote.c b/lib/common/remote.c
index fd0894f885..bb29ad938c 100644
--- a/lib/common/remote.c
+++ b/lib/common/remote.c
@@ -1,1025 +1,1023 @@
/*
* Copyright 2008-2025 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <crm/crm.h>
#include <sys/param.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <stdlib.h>
#include <errno.h>
#include <inttypes.h> // PRIx32
#include <glib.h>
#include <bzlib.h>
#include <crm/common/ipc_internal.h>
#include <crm/common/xml.h>
#include <crm/common/mainloop.h>
#include <crm/common/remote_internal.h>
#include <crm/common/tls_internal.h>
#include <gnutls/gnutls.h>
/* Swab macros from linux/swab.h */
#ifdef HAVE_LINUX_SWAB_H
# include <linux/swab.h>
#else
/*
* casts are necessary for constants, because we never know how for sure
* how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
*/
#define __swab16(x) ((uint16_t)( \
(((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
(((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
#define __swab32(x) ((uint32_t)( \
(((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
(((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
(((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
(((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
#define __swab64(x) ((uint64_t)( \
(((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
(((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
(((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
(((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
(((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
(((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
(((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
(((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
#endif
#define REMOTE_MSG_VERSION 1
#define ENDIAN_LOCAL 0xBADADBBD
struct remote_header_v0 {
uint32_t endian; /* Detect messages from hosts with different endian-ness */
uint32_t version;
uint64_t id;
uint64_t flags;
uint32_t size_total;
uint32_t payload_offset;
uint32_t payload_compressed;
uint32_t payload_uncompressed;
/* New fields get added here */
} __attribute__ ((packed));
/*!
* \internal
* \brief Retrieve remote message header, in local endianness
*
* Return a pointer to the header portion of a remote connection's message
* buffer, converting the header to local endianness if needed.
*
* \param[in,out] remote Remote connection with new message
*
* \return Pointer to message header, localized if necessary
*/
static struct remote_header_v0 *
localized_remote_header(pcmk__remote_t *remote)
{
struct remote_header_v0 *header = (struct remote_header_v0 *)remote->buffer;
if(remote->buffer_offset < sizeof(struct remote_header_v0)) {
return NULL;
} else if(header->endian != ENDIAN_LOCAL) {
uint32_t endian = __swab32(header->endian);
CRM_LOG_ASSERT(endian == ENDIAN_LOCAL);
if(endian != ENDIAN_LOCAL) {
crm_err("Invalid message detected, endian mismatch: %" PRIx32
" is neither %" PRIx32 " nor the swab'd %" PRIx32,
ENDIAN_LOCAL, header->endian, endian);
return NULL;
}
header->id = __swab64(header->id);
header->flags = __swab64(header->flags);
header->endian = __swab32(header->endian);
header->version = __swab32(header->version);
header->size_total = __swab32(header->size_total);
header->payload_offset = __swab32(header->payload_offset);
header->payload_compressed = __swab32(header->payload_compressed);
header->payload_uncompressed = __swab32(header->payload_uncompressed);
}
return header;
}
// \return Standard Pacemaker return code
static int
send_tls(gnutls_session_t session, struct iovec *iov)
{
const char *unsent = iov->iov_base;
size_t unsent_len = iov->iov_len;
ssize_t gnutls_rc;
if (unsent == NULL) {
return EINVAL;
}
- crm_trace("Sending TLS message of %llu bytes",
- (unsigned long long) unsent_len);
+ crm_trace("Sending TLS message of %zu bytes", unsent_len);
+
while (true) {
gnutls_rc = gnutls_record_send(session, unsent, unsent_len);
if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) {
- crm_trace("Retrying to send %llu bytes remaining",
- (unsigned long long) unsent_len);
+ crm_trace("Retrying to send %zu bytes remaining", unsent_len);
} else if (gnutls_rc < 0) {
// Caller can log as error if necessary
- crm_info("TLS connection terminated: %s " QB_XS " rc=%lld",
- gnutls_strerror((int) gnutls_rc),
- (long long) gnutls_rc);
+ crm_info("TLS connection terminated: %s " QB_XS " rc=%zd",
+ gnutls_strerror((int) gnutls_rc), gnutls_rc);
return ECONNABORTED;
} else if (gnutls_rc < unsent_len) {
- crm_trace("Sent %lld of %llu bytes remaining",
- (long long) gnutls_rc, (unsigned long long) unsent_len);
+ crm_trace("Sent %zd of %zu bytes remaining", gnutls_rc, unsent_len);
unsent_len -= gnutls_rc;
unsent += gnutls_rc;
} else {
- crm_trace("Sent all %lld bytes remaining", (long long) gnutls_rc);
+ crm_trace("Sent all %zd bytes remaining", gnutls_rc);
break;
}
}
return pcmk_rc_ok;
}
// \return Standard Pacemaker return code
static int
send_plaintext(int sock, struct iovec *iov)
{
const char *unsent = iov->iov_base;
size_t unsent_len = iov->iov_len;
- ssize_t write_rc;
if (unsent == NULL) {
return EINVAL;
}
- crm_debug("Sending plaintext message of %llu bytes to socket %d",
- (unsigned long long) unsent_len, sock);
+ crm_debug("Sending plaintext message of %zu bytes to socket %d",
+ unsent_len, sock);
while (true) {
- write_rc = write(sock, unsent, unsent_len);
+ ssize_t write_rc = write(sock, unsent, unsent_len);
+
if (write_rc < 0) {
int rc = errno;
- if ((errno == EINTR) || (errno == EAGAIN)) {
- crm_trace("Retrying to send %llu bytes remaining to socket %d",
- (unsigned long long) unsent_len, sock);
+ if ((rc == EINTR) || (rc == EAGAIN) || (rc == EWOULDBLOCK)) {
+ crm_trace("Retrying to send %zu bytes remaining to socket %d",
+ unsent_len, sock);
continue;
}
// Caller can log as error if necessary
crm_info("Could not send message: %s " QB_XS " rc=%d socket=%d",
pcmk_rc_str(rc), rc, sock);
return rc;
} else if (write_rc < unsent_len) {
- crm_trace("Sent %lld of %llu bytes remaining",
- (long long) write_rc, (unsigned long long) unsent_len);
+ crm_trace("Sent %zd of %zu bytes remaining", write_rc, unsent_len);
unsent += write_rc;
unsent_len -= write_rc;
- continue;
} else {
- crm_trace("Sent all %lld bytes remaining: %.100s",
- (long long) write_rc, (char *) (iov->iov_base));
- break;
+ crm_trace("Sent all %zd bytes remaining: %.100s",
+ write_rc, (char *) (iov->iov_base));
+ return pcmk_rc_ok;
}
}
- return pcmk_rc_ok;
}
// \return Standard Pacemaker return code
static int
remote_send_iovs(pcmk__remote_t *remote, struct iovec *iov, int iovs)
{
int rc = pcmk_rc_ok;
for (int lpc = 0; (lpc < iovs) && (rc == pcmk_rc_ok); lpc++) {
if (remote->tls_session) {
rc = send_tls(remote->tls_session, &(iov[lpc]));
continue;
}
if (remote->tcp_socket >= 0) {
rc = send_plaintext(remote->tcp_socket, &(iov[lpc]));
} else {
rc = ESOCKTNOSUPPORT;
}
}
return rc;
}
/*!
* \internal
* \brief Send an XML message over a Pacemaker Remote connection
*
* \param[in,out] remote Pacemaker Remote connection to use
* \param[in] msg XML to send
*
* \return Standard Pacemaker return code
*/
int
pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
{
int rc = pcmk_rc_ok;
static uint64_t id = 0;
GString *xml_text = NULL;
struct iovec iov[2];
struct remote_header_v0 *header;
CRM_CHECK((remote != NULL) && (msg != NULL), return EINVAL);
xml_text = g_string_sized_new(1024);
pcmk__xml_string(msg, 0, xml_text, 0);
CRM_CHECK(xml_text->len > 0,
g_string_free(xml_text, TRUE); return EINVAL);
header = pcmk__assert_alloc(1, sizeof(struct remote_header_v0));
iov[0].iov_base = header;
iov[0].iov_len = sizeof(struct remote_header_v0);
iov[1].iov_len = 1 + xml_text->len;
iov[1].iov_base = g_string_free(xml_text, FALSE);
id++;
header->id = id;
header->endian = ENDIAN_LOCAL;
header->version = REMOTE_MSG_VERSION;
header->payload_offset = iov[0].iov_len;
header->payload_uncompressed = iov[1].iov_len;
header->size_total = iov[0].iov_len + iov[1].iov_len;
rc = remote_send_iovs(remote, iov, 2);
if (rc != pcmk_rc_ok) {
crm_err("Could not send remote message: %s " QB_XS " rc=%d",
pcmk_rc_str(rc), rc);
}
free(iov[0].iov_base);
g_free((gchar *) iov[1].iov_base);
return rc;
}
/*!
* \internal
* \brief Obtain the XML from the currently buffered remote connection message
*
* \param[in,out] remote Remote connection possibly with message available
*
* \return Newly allocated XML object corresponding to message data, or NULL
* \note This effectively removes the message from the connection buffer.
*/
xmlNode *
pcmk__remote_message_xml(pcmk__remote_t *remote)
{
xmlNode *xml = NULL;
struct remote_header_v0 *header = localized_remote_header(remote);
if (header == NULL) {
return NULL;
}
/* Support compression on the receiving end now, in case we ever want to add it later */
if (header->payload_compressed) {
int rc = 0;
unsigned int size_u = 1 + header->payload_uncompressed;
char *uncompressed =
pcmk__assert_alloc(1, header->payload_offset + size_u);
crm_trace("Decompressing message data %d bytes into %d bytes",
header->payload_compressed, size_u);
rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
remote->buffer + header->payload_offset,
header->payload_compressed, 1, 0);
rc = pcmk__bzlib2rc(rc);
if (rc != pcmk_rc_ok && header->version > REMOTE_MSG_VERSION) {
crm_warn("Couldn't decompress v%d message, we only understand v%d",
header->version, REMOTE_MSG_VERSION);
free(uncompressed);
return NULL;
} else if (rc != pcmk_rc_ok) {
crm_err("Decompression failed: %s " QB_XS " rc=%d",
pcmk_rc_str(rc), rc);
free(uncompressed);
return NULL;
}
pcmk__assert(size_u == header->payload_uncompressed);
memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
remote->buffer_size = header->payload_offset + size_u;
free(remote->buffer);
remote->buffer = uncompressed;
header = localized_remote_header(remote);
}
/* take ownership of the buffer */
remote->buffer_offset = 0;
CRM_LOG_ASSERT(remote->buffer[sizeof(struct remote_header_v0) + header->payload_uncompressed - 1] == 0);
xml = pcmk__xml_parse(remote->buffer + header->payload_offset);
if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
crm_warn("Couldn't parse v%d message, we only understand v%d",
header->version, REMOTE_MSG_VERSION);
} else if (xml == NULL) {
crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
}
crm_log_xml_trace(xml, "[remote msg]");
return xml;
}
static int
get_remote_socket(const pcmk__remote_t *remote)
{
if (remote->tls_session != NULL) {
return pcmk__tls_get_client_sock(remote);
}
if (remote->tcp_socket >= 0) {
return remote->tcp_socket;
}
crm_err("Remote connection type undetermined (bug?)");
return -1;
}
/*!
* \internal
* \brief Wait for a remote session to have data to read
*
* \param[in] remote Connection to check
* \param[in] timeout_ms Maximum time (in ms) to wait
*
* \return Standard Pacemaker return code (of particular interest, pcmk_rc_ok if
* there is data ready to be read, and ETIME if there is no data within
* the specified timeout)
*/
int
pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
{
struct pollfd fds = { 0, };
int sock = -1;
int rc = 0;
time_t start;
int timeout = timeout_ms;
sock = get_remote_socket(remote);
if (sock < 0) {
crm_trace("No longer connected");
return ENOTCONN;
}
start = time(NULL);
errno = 0;
do {
fds.fd = sock;
fds.events = POLLIN;
/* If we got an EINTR while polling, and we have a
* specific timeout we are trying to honor, attempt
* to adjust the timeout to the closest second. */
if (errno == EINTR && (timeout > 0)) {
timeout = timeout_ms - ((time(NULL) - start) * 1000);
if (timeout < 1000) {
timeout = 1000;
}
}
rc = poll(&fds, 1, timeout);
} while (rc < 0 && errno == EINTR);
if (rc < 0) {
return errno;
}
return (rc == 0)? ETIME : pcmk_rc_ok;
}
/*!
* \internal
* \brief Read bytes from non-blocking remote connection
*
* \param[in,out] remote Remote connection to read
*
* \return Standard Pacemaker return code (of particular interest, pcmk_rc_ok if
* a full message has been received, or EAGAIN for a partial message)
* \note Use only with non-blocking sockets after polling the socket.
* \note This function will return when the socket read buffer is empty or an
* error is encountered.
*/
int
pcmk__read_available_remote_data(pcmk__remote_t *remote)
{
int rc = pcmk_rc_ok;
size_t read_len = sizeof(struct remote_header_v0);
struct remote_header_v0 *header = localized_remote_header(remote);
ssize_t read_rc;
if(header) {
/* Stop at the end of the current message */
read_len = header->size_total;
}
/* automatically grow the buffer when needed */
if(remote->buffer_size < read_len) {
remote->buffer_size = 2 * read_len;
- crm_trace("Expanding buffer to %llu bytes",
- (unsigned long long) remote->buffer_size);
+ crm_trace("Expanding buffer to %zu bytes", remote->buffer_size);
remote->buffer = pcmk__realloc(remote->buffer, remote->buffer_size + 1);
}
if (remote->tls_session) {
read_rc = gnutls_record_recv(remote->tls_session,
remote->buffer + remote->buffer_offset,
remote->buffer_size - remote->buffer_offset);
if (read_rc == GNUTLS_E_INTERRUPTED) {
rc = EINTR;
} else if (read_rc == GNUTLS_E_AGAIN) {
rc = EAGAIN;
} else if (read_rc < 0) {
- crm_debug("TLS receive failed: %s (%lld)",
- gnutls_strerror(read_rc), (long long) read_rc);
+ crm_debug("TLS receive failed: %s (%zd)",
+ gnutls_strerror((int) read_rc), read_rc);
rc = EIO;
}
} else if (remote->tcp_socket >= 0) {
read_rc = read(remote->tcp_socket,
remote->buffer + remote->buffer_offset,
remote->buffer_size - remote->buffer_offset);
if (read_rc < 0) {
rc = errno;
}
} else {
crm_err("Remote connection type undetermined (bug?)");
return ESOCKTNOSUPPORT;
}
/* process any errors. */
if (read_rc > 0) {
remote->buffer_offset += read_rc;
/* always null terminate buffer, the +1 to alloc always allows for this. */
remote->buffer[remote->buffer_offset] = '\0';
- crm_trace("Received %lld more bytes (%llu total)",
- (long long) read_rc,
- (unsigned long long) remote->buffer_offset);
-
- } else if ((rc == EINTR) || (rc == EAGAIN)) {
- crm_trace("No data available for non-blocking remote read: %s (%d)",
- pcmk_rc_str(rc), rc);
+ crm_trace("Received %zd more bytes (%zu total)",
+ read_rc, remote->buffer_offset);
} else if (read_rc == 0) {
- crm_debug("End of remote data encountered after %llu bytes",
- (unsigned long long) remote->buffer_offset);
+ crm_debug("End of remote data encountered after %zu bytes",
+ remote->buffer_offset);
return ENOTCONN;
- } else {
- crm_debug("Error receiving remote data after %llu bytes: %s (%d)",
- (unsigned long long) remote->buffer_offset,
+ } else if ((rc == EINTR) || (rc == EAGAIN) || (rc == EWOULDBLOCK)) {
+ crm_trace("No data available for non-blocking remote read: %s (%d)",
pcmk_rc_str(rc), rc);
+
+ } else {
+ crm_debug("Error receiving remote data after %zu bytes: %s (%d)",
+ remote->buffer_offset, pcmk_rc_str(rc), rc);
return ENOTCONN;
}
header = localized_remote_header(remote);
if(header) {
if(remote->buffer_offset < header->size_total) {
- crm_trace("Read partial remote message (%llu of %u bytes)",
- (unsigned long long) remote->buffer_offset,
- header->size_total);
+ crm_trace("Read partial remote message (%zu of %" PRIu32 " bytes)",
+ remote->buffer_offset, header->size_total);
} else {
- crm_trace("Read full remote message of %llu bytes",
- (unsigned long long) remote->buffer_offset);
+ crm_trace("Read full remote message of %zu bytes",
+ remote->buffer_offset);
return pcmk_rc_ok;
}
}
return EAGAIN;
}
/*!
* \internal
* \brief Read one message from a remote connection
*
* \param[in,out] remote Remote connection to read
* \param[in] timeout_ms Fail if message not read in this many milliseconds
* (10s will be used if 0, and 60s if negative)
*
* \return Standard Pacemaker return code
*/
int
pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
{
int rc = pcmk_rc_ok;
time_t start = time(NULL);
int remaining_timeout = 0;
if (timeout_ms == 0) {
timeout_ms = 10000;
} else if (timeout_ms < 0) {
timeout_ms = 60000;
}
remaining_timeout = timeout_ms;
while (remaining_timeout > 0) {
crm_trace("Waiting for remote data (%d ms of %d ms timeout remaining)",
remaining_timeout, timeout_ms);
rc = pcmk__remote_ready(remote, remaining_timeout);
if (rc == ETIME) {
crm_err("Timed out (%d ms) while waiting for remote data",
remaining_timeout);
return rc;
} else if (rc != pcmk_rc_ok) {
crm_debug("Wait for remote data aborted (will retry): %s "
QB_XS " rc=%d", pcmk_rc_str(rc), rc);
} else {
rc = pcmk__read_available_remote_data(remote);
if (rc == pcmk_rc_ok) {
return rc;
} else if (rc == EAGAIN) {
crm_trace("Waiting for more remote data");
} else {
crm_debug("Could not receive remote data: %s " QB_XS " rc=%d",
pcmk_rc_str(rc), rc);
}
}
// Don't waste time retrying after fatal errors
if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) {
return rc;
}
remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000);
}
return ETIME;
}
struct tcp_async_cb_data {
int sock;
int timeout_ms;
time_t start;
void *userdata;
void (*callback) (void *userdata, int rc, int sock);
};
// \return TRUE if timer should be rescheduled, FALSE otherwise
static gboolean
check_connect_finished(gpointer userdata)
{
struct tcp_async_cb_data *cb_data = userdata;
int rc;
fd_set rset, wset;
struct timeval ts = { 0, };
if (cb_data->start == 0) {
// Last connect() returned success immediately
rc = pcmk_rc_ok;
goto dispatch_done;
}
// If the socket is ready for reading or writing, the connect succeeded
FD_ZERO(&rset);
FD_SET(cb_data->sock, &rset);
wset = rset;
rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts);
if (rc < 0) { // select() error
rc = errno;
- if ((rc == EINPROGRESS) || (rc == EAGAIN)) {
+ if ((rc == EINTR) || (rc == EAGAIN)) {
if ((time(NULL) - cb_data->start) < pcmk__timeout_ms2s(cb_data->timeout_ms)) {
return TRUE; // There is time left, so reschedule timer
} else {
rc = ETIMEDOUT;
}
}
crm_trace("Could not check socket %d for connection success: %s (%d)",
cb_data->sock, pcmk_rc_str(rc), rc);
} else if (rc == 0) { // select() timeout
if ((time(NULL) - cb_data->start) < pcmk__timeout_ms2s(cb_data->timeout_ms)) {
return TRUE; // There is time left, so reschedule timer
}
crm_debug("Timed out while waiting for socket %d connection success",
cb_data->sock);
rc = ETIMEDOUT;
// select() returned number of file descriptors that are ready
} else if (FD_ISSET(cb_data->sock, &rset)
|| FD_ISSET(cb_data->sock, &wset)) {
// The socket is ready; check it for connection errors
int error = 0;
socklen_t len = sizeof(error);
if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
rc = errno;
crm_trace("Couldn't check socket %d for connection errors: %s (%d)",
cb_data->sock, pcmk_rc_str(rc), rc);
} else if (error != 0) {
rc = error;
crm_trace("Socket %d connected with error: %s (%d)",
cb_data->sock, pcmk_rc_str(rc), rc);
} else {
rc = pcmk_rc_ok;
}
} else { // Should not be possible
crm_trace("select() succeeded, but socket %d not in resulting "
"read/write sets", cb_data->sock);
rc = EAGAIN;
}
dispatch_done:
if (rc == pcmk_rc_ok) {
crm_trace("Socket %d is connected", cb_data->sock);
} else {
close(cb_data->sock);
cb_data->sock = -1;
}
if (cb_data->callback) {
cb_data->callback(cb_data->userdata, rc, cb_data->sock);
}
free(cb_data);
return FALSE; // Do not reschedule timer
}
/*!
* \internal
* \brief Attempt to connect socket, calling callback when done
*
* Set a given socket non-blocking, then attempt to connect to it,
* retrying periodically until success or a timeout is reached.
* Call a caller-supplied callback function when completed.
*
* \param[in] sock Newly created socket
* \param[in] addr Socket address information for connect
* \param[in] addrlen Size of socket address information in bytes
* \param[in] timeout_ms Fail if not connected within this much time
* \param[out] timer_id If not NULL, store retry timer ID here
* \param[in] userdata User data to pass to callback
* \param[in] callback Function to call when connection attempt completes
*
* \return Standard Pacemaker return code
*/
static int
connect_socket_retry(int sock, const struct sockaddr *addr, socklen_t addrlen,
int timeout_ms, int *timer_id, void *userdata,
void (*callback) (void *userdata, int rc, int sock))
{
int rc = 0;
int interval = 500;
int timer;
struct tcp_async_cb_data *cb_data = NULL;
rc = pcmk__set_nonblocking(sock);
if (rc != pcmk_rc_ok) {
crm_warn("Could not set socket non-blocking: %s " QB_XS " rc=%d",
pcmk_rc_str(rc), rc);
return rc;
}
rc = connect(sock, addr, addrlen);
- if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
+ if (rc < 0) {
rc = errno;
- crm_warn("Could not connect socket: %s " QB_XS " rc=%d",
- pcmk_rc_str(rc), rc);
- return rc;
+ switch (rc) {
+ case EINTR:
+ case EINPROGRESS:
+ case EAGAIN:
+ break;
+
+ default:
+ crm_warn("Could not connect socket: %s " QB_XS " rc=%d",
+ pcmk_rc_str(rc), rc);
+ return rc;
+ }
}
cb_data = pcmk__assert_alloc(1, sizeof(struct tcp_async_cb_data));
cb_data->userdata = userdata;
cb_data->callback = callback;
cb_data->sock = sock;
cb_data->timeout_ms = timeout_ms;
if (rc == 0) {
/* The connect was successful immediately, we still return to mainloop
* and let this callback get called later. This avoids the user of this api
* to have to account for the fact the callback could be invoked within this
* function before returning. */
cb_data->start = 0;
interval = 1;
} else {
cb_data->start = time(NULL);
}
/* This timer function does a non-blocking poll on the socket to see if we
* can use it. Once we can, the connect has completed. This method allows us
* to connect without blocking the mainloop.
*
* @TODO Use a mainloop fd callback for this instead of polling. Something
* about the way mainloop is currently polling prevents this from
* working at the moment though. (See connect(2) regarding EINPROGRESS
* for possible new handling needed.)
*/
crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
interval, sock);
timer = pcmk__create_timer(interval, check_connect_finished, cb_data);
if (timer_id) {
*timer_id = timer;
}
// timer callback should be taking care of cb_data
// cppcheck-suppress memleak
return pcmk_rc_ok;
}
/*!
* \internal
* \brief Attempt once to connect socket and set it non-blocking
*
* \param[in] sock Newly created socket
* \param[in] addr Socket address information for connect
* \param[in] addrlen Size of socket address information in bytes
*
* \return Standard Pacemaker return code
*/
static int
connect_socket_once(int sock, const struct sockaddr *addr, socklen_t addrlen)
{
int rc = connect(sock, addr, addrlen);
if (rc < 0) {
rc = errno;
crm_warn("Could not connect socket: %s " QB_XS " rc=%d",
pcmk_rc_str(rc), rc);
return rc;
}
rc = pcmk__set_nonblocking(sock);
if (rc != pcmk_rc_ok) {
crm_warn("Could not set socket non-blocking: %s " QB_XS " rc=%d",
pcmk_rc_str(rc), rc);
return rc;
}
return pcmk_ok;
}
/*!
* \internal
* \brief Connect to server at specified TCP port
*
* \param[in] host Name of server to connect to
* \param[in] port Server port to connect to
* \param[in] timeout_ms If asynchronous, fail if not connected in this time
* \param[out] timer_id If asynchronous and this is non-NULL, retry timer ID
* will be put here (for ease of cancelling by caller)
* \param[out] sock_fd Where to store socket file descriptor
* \param[in] userdata If asynchronous, data to pass to callback
* \param[in] callback If NULL, attempt a single synchronous connection,
* otherwise retry asynchronously then call this
*
* \return Standard Pacemaker return code
*/
int
pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id,
int *sock_fd, void *userdata,
void (*callback) (void *userdata, int rc, int sock))
{
char buffer[INET6_ADDRSTRLEN];
struct addrinfo *res = NULL;
struct addrinfo *rp = NULL;
struct addrinfo hints;
const char *server = host;
int rc;
int sock = -1;
CRM_CHECK((host != NULL) && (sock_fd != NULL), return EINVAL);
// Get host's IP address(es)
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_CANONNAME;
rc = getaddrinfo(server, NULL, &hints, &res);
rc = pcmk__gaierror2rc(rc);
if (rc != pcmk_rc_ok) {
crm_err("Unable to get IP address info for %s: %s",
server, pcmk_rc_str(rc));
goto async_cleanup;
}
if (!res || !res->ai_addr) {
crm_err("Unable to get IP address info for %s: no result", server);
rc = ENOTCONN;
goto async_cleanup;
}
// getaddrinfo() returns a list of host's addresses, try them in order
for (rp = res; rp != NULL; rp = rp->ai_next) {
struct sockaddr *addr = rp->ai_addr;
if (!addr) {
continue;
}
if (rp->ai_canonname) {
server = res->ai_canonname;
}
crm_debug("Got canonical name %s for %s", server, host);
sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
if (sock == -1) {
rc = errno;
crm_warn("Could not create socket for remote connection to %s:%d: "
"%s " QB_XS " rc=%d", server, port, pcmk_rc_str(rc), rc);
continue;
}
/* Set port appropriately for address family */
/* (void*) casts avoid false-positive compiler alignment warnings */
if (addr->sa_family == AF_INET6) {
((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
} else {
((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
}
memset(buffer, 0, PCMK__NELEM(buffer));
pcmk__sockaddr2str(addr, buffer);
crm_info("Attempting remote connection to %s:%d", buffer, port);
if (callback) {
if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen, timeout,
timer_id, userdata, callback) == pcmk_rc_ok) {
goto async_cleanup; /* Success for now, we'll hear back later in the callback */
}
} else if (connect_socket_once(sock, rp->ai_addr,
rp->ai_addrlen) == pcmk_rc_ok) {
break; /* Success */
}
// Connect failed
close(sock);
sock = -1;
rc = ENOTCONN;
}
async_cleanup:
if (res) {
freeaddrinfo(res);
}
*sock_fd = sock;
return rc;
}
/*!
* \internal
* \brief Convert an IP address (IPv4 or IPv6) to a string for logging
*
* \param[in] sa Socket address for IP
* \param[out] s Storage for at least INET6_ADDRSTRLEN bytes
*
* \note sa The socket address can be a pointer to struct sockaddr_in (IPv4),
* struct sockaddr_in6 (IPv6) or struct sockaddr_storage (either),
* as long as its sa_family member is set correctly.
*/
void
pcmk__sockaddr2str(const void *sa, char *s)
{
switch (((const struct sockaddr *) sa)->sa_family) {
case AF_INET:
inet_ntop(AF_INET, &(((const struct sockaddr_in *) sa)->sin_addr),
s, INET6_ADDRSTRLEN);
break;
case AF_INET6:
inet_ntop(AF_INET6,
&(((const struct sockaddr_in6 *) sa)->sin6_addr),
s, INET6_ADDRSTRLEN);
break;
default:
strcpy(s, "<invalid>");
}
}
/*!
* \internal
* \brief Accept a client connection on a remote server socket
*
* \param[in] ssock Server socket file descriptor being listened on
* \param[out] csock Where to put new client socket's file descriptor
*
* \return Standard Pacemaker return code
*/
int
pcmk__accept_remote_connection(int ssock, int *csock)
{
int rc;
struct sockaddr_storage addr;
socklen_t laddr = sizeof(addr);
char addr_str[INET6_ADDRSTRLEN];
#ifdef TCP_USER_TIMEOUT
long sbd_timeout = 0;
#endif
/* accept the connection */
memset(&addr, 0, sizeof(addr));
*csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
if (*csock == -1) {
rc = errno;
crm_err("Could not accept remote client connection: %s "
QB_XS " rc=%d", pcmk_rc_str(rc), rc);
return rc;
}
pcmk__sockaddr2str(&addr, addr_str);
crm_info("Accepted new remote client connection from %s", addr_str);
rc = pcmk__set_nonblocking(*csock);
if (rc != pcmk_rc_ok) {
crm_err("Could not set socket non-blocking: %s " QB_XS " rc=%d",
pcmk_rc_str(rc), rc);
close(*csock);
*csock = -1;
return rc;
}
#ifdef TCP_USER_TIMEOUT
sbd_timeout = pcmk__get_sbd_watchdog_timeout();
if (sbd_timeout > 0) {
// Time to fail and retry before watchdog
long half = sbd_timeout / 2;
unsigned int optval = (half <= UINT_MAX)? half : UINT_MAX;
rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT,
&optval, sizeof(optval));
if (rc < 0) {
rc = errno;
crm_err("Could not set TCP timeout to %d ms on remote connection: "
"%s " QB_XS " rc=%d", optval, pcmk_rc_str(rc), rc);
close(*csock);
*csock = -1;
return rc;
}
}
#endif
return rc;
}
/*!
* \brief Get the default remote connection TCP port on this host
*
* \return Remote connection TCP port number
*/
int
crm_default_remote_port(void)
{
static int port = 0;
if (port == 0) {
const char *env = pcmk__env_option(PCMK__ENV_REMOTE_PORT);
if (env) {
errno = 0;
port = strtol(env, NULL, 10);
if (errno || (port < 1) || (port > 65535)) {
crm_warn("Environment variable PCMK_" PCMK__ENV_REMOTE_PORT
" has invalid value '%s', using %d instead",
env, DEFAULT_REMOTE_PORT);
port = DEFAULT_REMOTE_PORT;
}
} else {
port = DEFAULT_REMOTE_PORT;
}
}
return port;
}
diff --git a/lib/services/services_linux.c b/lib/services/services_linux.c
index 2f8a46d195..d7b9b97b3c 100644
--- a/lib/services/services_linux.c
+++ b/lib/services/services_linux.c
@@ -1,1482 +1,1479 @@
/*
- * Copyright 2010-2024 the Pacemaker project contributors
+ * Copyright 2010-2025 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
* This source code is licensed under the GNU Lesser General Public License
* version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
*/
#include <crm_internal.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <errno.h>
#include <unistd.h>
#include <dirent.h>
#include <grp.h>
#include <string.h>
#include <sys/time.h>
#include <sys/resource.h>
#include "crm/crm.h"
#include "crm/common/mainloop.h"
#include "crm/services.h"
#include "crm/services_internal.h"
#include "services_private.h"
static void close_pipe(int fildes[]);
/* We have two alternative ways of handling SIGCHLD when synchronously waiting
* for spawned processes to complete. Both rely on polling a file descriptor to
* discover SIGCHLD events.
*
* If sys/signalfd.h is available (e.g. on Linux), we call signalfd() to
* generate the file descriptor. Otherwise, we use the "self-pipe trick"
* (opening a pipe and writing a byte to it when SIGCHLD is received).
*/
#ifdef HAVE_SYS_SIGNALFD_H
// signalfd() implementation
#include <sys/signalfd.h>
// Everything needed to manage SIGCHLD handling
struct sigchld_data_s {
sigset_t mask; // Signals to block now (including SIGCHLD)
sigset_t old_mask; // Previous set of blocked signals
bool ignored; // If SIGCHLD for another child has been ignored
};
// Initialize SIGCHLD data and prepare for use
static bool
sigchld_setup(struct sigchld_data_s *data)
{
sigemptyset(&(data->mask));
sigaddset(&(data->mask), SIGCHLD);
sigemptyset(&(data->old_mask));
// Block SIGCHLD (saving previous set of blocked signals to restore later)
if (sigprocmask(SIG_BLOCK, &(data->mask), &(data->old_mask)) < 0) {
crm_info("Wait for child process completion failed: %s "
QB_XS " source=sigprocmask", pcmk_rc_str(errno));
return false;
}
data->ignored = false;
return true;
}
// Get a file descriptor suitable for polling for SIGCHLD events
static int
sigchld_open(struct sigchld_data_s *data)
{
int fd;
CRM_CHECK(data != NULL, return -1);
fd = signalfd(-1, &(data->mask), SFD_NONBLOCK);
if (fd < 0) {
crm_info("Wait for child process completion failed: %s "
QB_XS " source=signalfd", pcmk_rc_str(errno));
}
return fd;
}
// Close a file descriptor returned by sigchld_open()
static void
sigchld_close(int fd)
{
if (fd > 0) {
close(fd);
}
}
// Return true if SIGCHLD was received from polled fd
static bool
sigchld_received(int fd, int pid, struct sigchld_data_s *data)
{
struct signalfd_siginfo fdsi;
ssize_t s;
if (fd < 0) {
return false;
}
s = read(fd, &fdsi, sizeof(struct signalfd_siginfo));
if (s != sizeof(struct signalfd_siginfo)) {
crm_info("Wait for child process completion failed: %s "
QB_XS " source=read", pcmk_rc_str(errno));
} else if (fdsi.ssi_signo == SIGCHLD) {
if (fdsi.ssi_pid == pid) {
return true;
} else {
/* This SIGCHLD is for another child. We have to ignore it here but
* will still need to resend it after this synchronous action has
* completed and SIGCHLD has been restored to be handled by the
* previous SIGCHLD handler, so that it will be handled.
*/
data->ignored = true;
return false;
}
}
return false;
}
// Do anything needed after done waiting for SIGCHLD
static void
sigchld_cleanup(struct sigchld_data_s *data)
{
// Restore the original set of blocked signals
if ((sigismember(&(data->old_mask), SIGCHLD) == 0)
&& (sigprocmask(SIG_UNBLOCK, &(data->mask), NULL) < 0)) {
crm_warn("Could not clean up after child process completion: %s",
pcmk_rc_str(errno));
}
// Resend any ignored SIGCHLD for other children so that they'll be handled.
if (data->ignored && kill(getpid(), SIGCHLD) != 0) {
crm_warn("Could not resend ignored SIGCHLD to ourselves: %s",
pcmk_rc_str(errno));
}
}
#else // HAVE_SYS_SIGNALFD_H not defined
// Self-pipe implementation (see above for function descriptions)
struct sigchld_data_s {
int pipe_fd[2]; // Pipe file descriptors
struct sigaction sa; // Signal handling info (with SIGCHLD)
struct sigaction old_sa; // Previous signal handling info
bool ignored; // If SIGCHLD for another child has been ignored
};
// We need a global to use in the signal handler
volatile struct sigchld_data_s *last_sigchld_data = NULL;
static void
sigchld_handler(void)
{
// We received a SIGCHLD, so trigger pipe polling
if ((last_sigchld_data != NULL)
&& (last_sigchld_data->pipe_fd[1] >= 0)
&& (write(last_sigchld_data->pipe_fd[1], "", 1) == -1)) {
crm_info("Wait for child process completion failed: %s "
QB_XS " source=write", pcmk_rc_str(errno));
}
}
static bool
sigchld_setup(struct sigchld_data_s *data)
{
int rc;
data->pipe_fd[0] = data->pipe_fd[1] = -1;
if (pipe(data->pipe_fd) == -1) {
crm_info("Wait for child process completion failed: %s "
QB_XS " source=pipe", pcmk_rc_str(errno));
return false;
}
rc = pcmk__set_nonblocking(data->pipe_fd[0]);
if (rc != pcmk_rc_ok) {
crm_info("Could not set pipe input non-blocking: %s " QB_XS " rc=%d",
pcmk_rc_str(rc), rc);
}
rc = pcmk__set_nonblocking(data->pipe_fd[1]);
if (rc != pcmk_rc_ok) {
crm_info("Could not set pipe output non-blocking: %s " QB_XS " rc=%d",
pcmk_rc_str(rc), rc);
}
// Set SIGCHLD handler
data->sa.sa_handler = (sighandler_t) sigchld_handler;
data->sa.sa_flags = 0;
sigemptyset(&(data->sa.sa_mask));
if (sigaction(SIGCHLD, &(data->sa), &(data->old_sa)) < 0) {
crm_info("Wait for child process completion failed: %s "
QB_XS " source=sigaction", pcmk_rc_str(errno));
}
data->ignored = false;
// Remember data for use in signal handler
last_sigchld_data = data;
return true;
}
static int
sigchld_open(struct sigchld_data_s *data)
{
CRM_CHECK(data != NULL, return -1);
return data->pipe_fd[0];
}
static void
sigchld_close(int fd)
{
// Pipe will be closed in sigchld_cleanup()
return;
}
static bool
sigchld_received(int fd, int pid, struct sigchld_data_s *data)
{
char ch;
if (fd < 0) {
return false;
}
// Clear out the self-pipe
while (read(fd, &ch, 1) == 1) /*omit*/;
return true;
}
static void
sigchld_cleanup(struct sigchld_data_s *data)
{
// Restore the previous SIGCHLD handler
if (sigaction(SIGCHLD, &(data->old_sa), NULL) < 0) {
crm_warn("Could not clean up after child process completion: %s",
pcmk_rc_str(errno));
}
close_pipe(data->pipe_fd);
// Resend any ignored SIGCHLD for other children so that they'll be handled.
if (data->ignored && kill(getpid(), SIGCHLD) != 0) {
crm_warn("Could not resend ignored SIGCHLD to ourselves: %s",
pcmk_rc_str(errno));
}
}
#endif
/*!
* \internal
* \brief Close the two file descriptors of a pipe
*
* \param[in,out] fildes Array of file descriptors opened by pipe()
*/
static void
close_pipe(int fildes[])
{
if (fildes[0] >= 0) {
close(fildes[0]);
fildes[0] = -1;
}
if (fildes[1] >= 0) {
close(fildes[1]);
fildes[1] = -1;
}
}
#define out_type(is_stderr) ((is_stderr)? "stderr" : "stdout")
// Maximum number of bytes of stdout or stderr we'll accept
#define MAX_OUTPUT (10 * 1024 * 1024)
static gboolean
svc_read_output(int fd, svc_action_t * op, bool is_stderr)
{
char *data = NULL;
ssize_t rc = 0;
size_t len = 0;
size_t discarded = 0;
char buf[500];
static const size_t buf_read_len = sizeof(buf) - 1;
if (fd < 0) {
crm_trace("No fd for %s", op->id);
return FALSE;
}
if (is_stderr && op->stderr_data) {
len = strlen(op->stderr_data);
data = op->stderr_data;
- crm_trace("Reading %s stderr into offset %lld",
- op->id, (long long) len);
+ crm_trace("Reading %s stderr into offset %zu", op->id, len);
} else if (is_stderr == FALSE && op->stdout_data) {
len = strlen(op->stdout_data);
data = op->stdout_data;
- crm_trace("Reading %s stdout into offset %lld",
- op->id, (long long) len);
+ crm_trace("Reading %s stdout into offset %zu", op->id, len);
} else {
crm_trace("Reading %s %s", op->id, out_type(is_stderr));
}
do {
errno = 0;
rc = read(fd, buf, buf_read_len);
if (rc > 0) {
if (len < MAX_OUTPUT) {
buf[rc] = 0;
- crm_trace("Received %lld bytes of %s %s: %.80s",
- (long long) rc, op->id, out_type(is_stderr), buf);
+ crm_trace("Received %zd bytes of %s %s: %.80s",
+ rc, op->id, out_type(is_stderr), buf);
data = pcmk__realloc(data, len + rc + 1);
strcpy(data + len, buf);
len += rc;
} else {
discarded += rc;
}
} else if (errno != EINTR) { // Fatal error or EOF
rc = 0;
break;
}
} while ((rc == buf_read_len) || (rc < 0));
if (discarded > 0) {
- crm_warn("Truncated %s %s to %lld bytes (discarded %lld)",
- op->id, out_type(is_stderr), (long long) len,
- (long long) discarded);
+ crm_warn("Truncated %s %s to %zu bytes (discarded %zu)",
+ op->id, out_type(is_stderr), len, discarded);
}
if (is_stderr) {
op->stderr_data = data;
} else {
op->stdout_data = data;
}
return rc != 0;
}
static int
dispatch_stdout(gpointer userdata)
{
svc_action_t *op = (svc_action_t *) userdata;
return svc_read_output(op->opaque->stdout_fd, op, FALSE);
}
static int
dispatch_stderr(gpointer userdata)
{
svc_action_t *op = (svc_action_t *) userdata;
return svc_read_output(op->opaque->stderr_fd, op, TRUE);
}
static void
pipe_out_done(gpointer user_data)
{
svc_action_t *op = (svc_action_t *) user_data;
crm_trace("%p", op);
op->opaque->stdout_gsource = NULL;
if (op->opaque->stdout_fd > STDOUT_FILENO) {
close(op->opaque->stdout_fd);
}
op->opaque->stdout_fd = -1;
}
static void
pipe_err_done(gpointer user_data)
{
svc_action_t *op = (svc_action_t *) user_data;
op->opaque->stderr_gsource = NULL;
if (op->opaque->stderr_fd > STDERR_FILENO) {
close(op->opaque->stderr_fd);
}
op->opaque->stderr_fd = -1;
}
static struct mainloop_fd_callbacks stdout_callbacks = {
.dispatch = dispatch_stdout,
.destroy = pipe_out_done,
};
static struct mainloop_fd_callbacks stderr_callbacks = {
.dispatch = dispatch_stderr,
.destroy = pipe_err_done,
};
static void
set_ocf_env(const char *key, const char *value, gpointer user_data)
{
if (setenv(key, value, 1) != 0) {
crm_perror(LOG_ERR, "setenv failed for key:%s and value:%s", key, value);
}
}
static void
set_ocf_env_with_prefix(gpointer key, gpointer value, gpointer user_data)
{
char buffer[500];
snprintf(buffer, sizeof(buffer), strcmp(key, "OCF_CHECK_LEVEL") != 0 ? "OCF_RESKEY_%s" : "%s", (char *)key);
set_ocf_env(buffer, value, user_data);
}
static void
set_alert_env(gpointer key, gpointer value, gpointer user_data)
{
int rc;
if (value != NULL) {
rc = setenv(key, value, 1);
} else {
rc = unsetenv(key);
}
if (rc < 0) {
crm_perror(LOG_ERR, "setenv %s=%s",
(char*)key, (value? (char*)value : ""));
} else {
crm_trace("setenv %s=%s", (char*)key, (value? (char*)value : ""));
}
}
/*!
* \internal
* \brief Add environment variables suitable for an action
*
* \param[in] op Action to use
*/
static void
add_action_env_vars(const svc_action_t *op)
{
void (*env_setter)(gpointer, gpointer, gpointer) = NULL;
if (op->agent == NULL) {
env_setter = set_alert_env; /* we deal with alert handler */
} else if (pcmk__str_eq(op->standard, PCMK_RESOURCE_CLASS_OCF, pcmk__str_casei)) {
env_setter = set_ocf_env_with_prefix;
}
if (env_setter != NULL && op->params != NULL) {
g_hash_table_foreach(op->params, env_setter, NULL);
}
if (env_setter == NULL || env_setter == set_alert_env) {
return;
}
set_ocf_env("OCF_RA_VERSION_MAJOR", PCMK_OCF_MAJOR_VERSION, NULL);
set_ocf_env("OCF_RA_VERSION_MINOR", PCMK_OCF_MINOR_VERSION, NULL);
set_ocf_env("OCF_ROOT", PCMK_OCF_ROOT, NULL);
set_ocf_env("OCF_EXIT_REASON_PREFIX", PCMK_OCF_REASON_PREFIX, NULL);
if (op->rsc) {
set_ocf_env("OCF_RESOURCE_INSTANCE", op->rsc, NULL);
}
if (op->agent != NULL) {
set_ocf_env("OCF_RESOURCE_TYPE", op->agent, NULL);
}
/* Notes: this is not added to specification yet. Sept 10,2004 */
if (op->provider != NULL) {
set_ocf_env("OCF_RESOURCE_PROVIDER", op->provider, NULL);
}
}
static void
pipe_in_single_parameter(gpointer key, gpointer value, gpointer user_data)
{
svc_action_t *op = user_data;
char *buffer = crm_strdup_printf("%s=%s\n", (char *)key, (char *) value);
size_t len = strlen(buffer);
size_t total = 0;
ssize_t ret = 0;
do {
errno = 0;
ret = write(op->opaque->stdin_fd, buffer + total, len - total);
if (ret > 0) {
total += ret;
}
} while ((errno == EINTR) && (total < len));
free(buffer);
}
/*!
* \internal
* \brief Pipe parameters in via stdin for action
*
* \param[in] op Action to use
*/
static void
pipe_in_action_stdin_parameters(const svc_action_t *op)
{
if (op->params) {
g_hash_table_foreach(op->params, pipe_in_single_parameter, (gpointer) op);
}
}
gboolean
recurring_action_timer(gpointer data)
{
svc_action_t *op = data;
crm_debug("Scheduling another invocation of %s", op->id);
/* Clean out the old result */
free(op->stdout_data);
op->stdout_data = NULL;
free(op->stderr_data);
op->stderr_data = NULL;
op->opaque->repeat_timer = 0;
services_action_async(op, NULL);
return FALSE;
}
/*!
* \internal
* \brief Finalize handling of an asynchronous operation
*
* Given a completed asynchronous operation, cancel or reschedule it as
* appropriate if recurring, call its callback if registered, stop tracking it,
* and clean it up.
*
* \param[in,out] op Operation to finalize
*
* \return Standard Pacemaker return code
* \retval EINVAL Caller supplied NULL or invalid \p op
* \retval EBUSY Uncanceled recurring action has only been cleaned up
* \retval pcmk_rc_ok Action has been freed
*
* \note If the return value is not pcmk_rc_ok, the caller is responsible for
* freeing the action.
*/
int
services__finalize_async_op(svc_action_t *op)
{
CRM_CHECK((op != NULL) && !(op->synchronous), return EINVAL);
if (op->interval_ms != 0) {
// Recurring operations must be either cancelled or rescheduled
if (op->cancel) {
services__set_cancelled(op);
cancel_recurring_action(op);
} else {
op->opaque->repeat_timer = pcmk__create_timer(op->interval_ms,
recurring_action_timer,
op);
}
}
if (op->opaque->callback != NULL) {
op->opaque->callback(op);
}
// Stop tracking the operation (as in-flight or blocked)
op->pid = 0;
services_untrack_op(op);
if ((op->interval_ms != 0) && !(op->cancel)) {
// Do not free recurring actions (they will get freed when cancelled)
services_action_cleanup(op);
return EBUSY;
}
services_action_free(op);
return pcmk_rc_ok;
}
static void
close_op_input(svc_action_t *op)
{
if (op->opaque->stdin_fd >= 0) {
close(op->opaque->stdin_fd);
}
}
static void
finish_op_output(svc_action_t *op, bool is_stderr)
{
mainloop_io_t **source;
int fd;
if (is_stderr) {
source = &(op->opaque->stderr_gsource);
fd = op->opaque->stderr_fd;
} else {
source = &(op->opaque->stdout_gsource);
fd = op->opaque->stdout_fd;
}
if (op->synchronous || *source) {
crm_trace("Finish reading %s[%d] %s",
op->id, op->pid, (is_stderr? "stderr" : "stdout"));
svc_read_output(fd, op, is_stderr);
if (op->synchronous) {
close(fd);
} else {
mainloop_del_fd(*source);
*source = NULL;
}
}
}
// Log an operation's stdout and stderr
static void
log_op_output(svc_action_t *op)
{
char *prefix = crm_strdup_printf("%s[%d] error output", op->id, op->pid);
/* The library caller has better context to know how important the output
* is, so log it at info and debug severity here. They can log it again at
* higher severity if appropriate.
*/
crm_log_output(LOG_INFO, prefix, op->stderr_data);
strcpy(prefix + strlen(prefix) - strlen("error output"), "output");
crm_log_output(LOG_DEBUG, prefix, op->stdout_data);
free(prefix);
}
// Truncate exit reasons at this many characters
#define EXIT_REASON_MAX_LEN 128
static void
parse_exit_reason_from_stderr(svc_action_t *op)
{
const char *reason_start = NULL;
const char *reason_end = NULL;
const int prefix_len = strlen(PCMK_OCF_REASON_PREFIX);
if ((op->stderr_data == NULL) ||
// Only OCF agents have exit reasons in stderr
!pcmk__str_eq(op->standard, PCMK_RESOURCE_CLASS_OCF, pcmk__str_none)) {
return;
}
// Find the last occurrence of the magic string indicating an exit reason
for (const char *cur = strstr(op->stderr_data, PCMK_OCF_REASON_PREFIX);
cur != NULL; cur = strstr(cur, PCMK_OCF_REASON_PREFIX)) {
cur += prefix_len; // Skip over magic string
reason_start = cur;
}
if ((reason_start == NULL) || (reason_start[0] == '\n')
|| (reason_start[0] == '\0')) {
return; // No or empty exit reason
}
// Exit reason goes to end of line (or end of output)
reason_end = strchr(reason_start, '\n');
if (reason_end == NULL) {
reason_end = reason_start + strlen(reason_start);
}
// Limit size of exit reason to something reasonable
if (reason_end > (reason_start + EXIT_REASON_MAX_LEN)) {
reason_end = reason_start + EXIT_REASON_MAX_LEN;
}
free(op->opaque->exit_reason);
op->opaque->exit_reason = strndup(reason_start, reason_end - reason_start);
}
/*!
* \internal
* \brief Process the completion of an asynchronous child process
*
* \param[in,out] p Child process that completed
* \param[in] pid Process ID of child
* \param[in] core (Unused)
* \param[in] signo Signal that interrupted child, if any
* \param[in] exitcode Exit status of child process
*/
static void
async_action_complete(mainloop_child_t *p, pid_t pid, int core, int signo,
int exitcode)
{
svc_action_t *op = mainloop_child_userdata(p);
mainloop_clear_child_userdata(p);
CRM_CHECK(op->pid == pid,
services__set_result(op, services__generic_error(op),
PCMK_EXEC_ERROR, "Bug in mainloop handling");
return);
/* Depending on the priority the mainloop gives the stdout and stderr
* file descriptors, this function could be called before everything has
* been read from them, so force a final read now.
*/
finish_op_output(op, true);
finish_op_output(op, false);
close_op_input(op);
if (signo == 0) {
crm_debug("%s[%d] exited with status %d", op->id, op->pid, exitcode);
services__set_result(op, exitcode, PCMK_EXEC_DONE, NULL);
log_op_output(op);
parse_exit_reason_from_stderr(op);
} else if (mainloop_child_timeout(p)) {
const char *kind = services__action_kind(op);
crm_info("%s %s[%d] timed out after %s",
kind, op->id, op->pid, pcmk__readable_interval(op->timeout));
services__format_result(op, services__generic_error(op),
PCMK_EXEC_TIMEOUT,
"%s did not complete within %s",
kind, pcmk__readable_interval(op->timeout));
} else if (op->cancel) {
/* If an in-flight recurring operation was killed because it was
* cancelled, don't treat that as a failure.
*/
crm_info("%s[%d] terminated with signal %d (%s)",
op->id, op->pid, signo, strsignal(signo));
services__set_result(op, PCMK_OCF_OK, PCMK_EXEC_CANCELLED, NULL);
} else {
crm_info("%s[%d] terminated with signal %d (%s)",
op->id, op->pid, signo, strsignal(signo));
services__format_result(op, PCMK_OCF_UNKNOWN_ERROR, PCMK_EXEC_ERROR,
"%s interrupted by %s signal",
services__action_kind(op), strsignal(signo));
}
services__finalize_async_op(op);
}
/*!
* \internal
* \brief Return agent standard's exit status for "generic error"
*
* When returning an internal error for an action, a value that is appropriate
* to the action's agent standard must be used. This function returns a value
* appropriate for errors in general.
*
* \param[in] op Action that error is for
*
* \return Exit status appropriate to agent standard
* \note Actions without a standard will get PCMK_OCF_UNKNOWN_ERROR.
*/
int
services__generic_error(const svc_action_t *op)
{
if ((op == NULL) || (op->standard == NULL)) {
return PCMK_OCF_UNKNOWN_ERROR;
}
#if PCMK__ENABLE_LSB
if (pcmk__str_eq(op->standard, PCMK_RESOURCE_CLASS_LSB, pcmk__str_casei)
&& pcmk__str_eq(op->action, PCMK_ACTION_STATUS, pcmk__str_casei)) {
return PCMK_LSB_STATUS_UNKNOWN;
}
#endif
return PCMK_OCF_UNKNOWN_ERROR;
}
/*!
* \internal
* \brief Return agent standard's exit status for "not installed"
*
* When returning an internal error for an action, a value that is appropriate
* to the action's agent standard must be used. This function returns a value
* appropriate for "not installed" errors.
*
* \param[in] op Action that error is for
*
* \return Exit status appropriate to agent standard
* \note Actions without a standard will get PCMK_OCF_UNKNOWN_ERROR.
*/
int
services__not_installed_error(const svc_action_t *op)
{
if ((op == NULL) || (op->standard == NULL)) {
return PCMK_OCF_UNKNOWN_ERROR;
}
#if PCMK__ENABLE_LSB
if (pcmk__str_eq(op->standard, PCMK_RESOURCE_CLASS_LSB, pcmk__str_casei)
&& pcmk__str_eq(op->action, PCMK_ACTION_STATUS, pcmk__str_casei)) {
return PCMK_LSB_STATUS_NOT_INSTALLED;
}
#endif
return PCMK_OCF_NOT_INSTALLED;
}
/*!
* \internal
* \brief Return agent standard's exit status for "insufficient privileges"
*
* When returning an internal error for an action, a value that is appropriate
* to the action's agent standard must be used. This function returns a value
* appropriate for "insufficient privileges" errors.
*
* \param[in] op Action that error is for
*
* \return Exit status appropriate to agent standard
* \note Actions without a standard will get PCMK_OCF_UNKNOWN_ERROR.
*/
int
services__authorization_error(const svc_action_t *op)
{
if ((op == NULL) || (op->standard == NULL)) {
return PCMK_OCF_UNKNOWN_ERROR;
}
#if PCMK__ENABLE_LSB
if (pcmk__str_eq(op->standard, PCMK_RESOURCE_CLASS_LSB, pcmk__str_casei)
&& pcmk__str_eq(op->action, PCMK_ACTION_STATUS, pcmk__str_casei)) {
return PCMK_LSB_STATUS_INSUFFICIENT_PRIV;
}
#endif
return PCMK_OCF_INSUFFICIENT_PRIV;
}
/*!
* \internal
* \brief Return agent standard's exit status for "not configured"
*
* When returning an internal error for an action, a value that is appropriate
* to the action's agent standard must be used. This function returns a value
* appropriate for "not configured" errors.
*
* \param[in] op Action that error is for
* \param[in] is_fatal Whether problem is cluster-wide instead of only local
*
* \return Exit status appropriate to agent standard
* \note Actions without a standard will get PCMK_OCF_UNKNOWN_ERROR.
*/
int
services__configuration_error(const svc_action_t *op, bool is_fatal)
{
if ((op == NULL) || (op->standard == NULL)) {
return PCMK_OCF_UNKNOWN_ERROR;
}
#if PCMK__ENABLE_LSB
if (pcmk__str_eq(op->standard, PCMK_RESOURCE_CLASS_LSB, pcmk__str_casei)
&& pcmk__str_eq(op->action, PCMK_ACTION_STATUS, pcmk__str_casei)) {
return PCMK_LSB_NOT_CONFIGURED;
}
#endif
return is_fatal? PCMK_OCF_NOT_CONFIGURED : PCMK_OCF_INVALID_PARAM;
}
/*!
* \internal
* \brief Set operation rc and status per errno from stat(), fork() or execvp()
*
* \param[in,out] op Operation to set rc and status for
* \param[in] error Value of errno after system call
*
* \return void
*/
void
services__handle_exec_error(svc_action_t * op, int error)
{
const char *name = op->opaque->exec;
if (name == NULL) {
name = op->agent;
if (name == NULL) {
name = op->id;
}
}
switch (error) { /* see execve(2), stat(2) and fork(2) */
case ENOENT: /* No such file or directory */
case EISDIR: /* Is a directory */
case ENOTDIR: /* Path component is not a directory */
case EINVAL: /* Invalid executable format */
case ENOEXEC: /* Invalid executable format */
services__format_result(op, services__not_installed_error(op),
PCMK_EXEC_NOT_INSTALLED, "%s: %s",
name, pcmk_rc_str(error));
break;
case EACCES: /* permission denied (various errors) */
case EPERM: /* permission denied (various errors) */
services__format_result(op, services__authorization_error(op),
PCMK_EXEC_ERROR, "%s: %s",
name, pcmk_rc_str(error));
break;
default:
services__set_result(op, services__generic_error(op),
PCMK_EXEC_ERROR, pcmk_rc_str(error));
}
}
/*!
* \internal
* \brief Exit a child process that failed before executing agent
*
* \param[in] op Action that failed
* \param[in] exit_status Exit status code to use
* \param[in] exit_reason Exit reason to output if for OCF agent
*/
static void
exit_child(const svc_action_t *op, int exit_status, const char *exit_reason)
{
if ((op != NULL) && (exit_reason != NULL)
&& pcmk__str_eq(op->standard, PCMK_RESOURCE_CLASS_OCF,
pcmk__str_none)) {
fprintf(stderr, PCMK_OCF_REASON_PREFIX "%s\n", exit_reason);
}
pcmk_common_cleanup();
_exit(exit_status);
}
static void
action_launch_child(svc_action_t *op)
{
int rc;
/* SIGPIPE is ignored (which is different from signal blocking) by the gnutls library.
* Depending on the libqb version in use, libqb may set SIGPIPE to be ignored as well.
* We do not want this to be inherited by the child process. By resetting this the signal
* to the default behavior, we avoid some potential odd problems that occur during OCF
* scripts when SIGPIPE is ignored by the environment. */
signal(SIGPIPE, SIG_DFL);
if (sched_getscheduler(0) != SCHED_OTHER) {
struct sched_param sp;
memset(&sp, 0, sizeof(sp));
sp.sched_priority = 0;
if (sched_setscheduler(0, SCHED_OTHER, &sp) == -1) {
crm_info("Could not reset scheduling policy for %s", op->id);
}
}
if (setpriority(PRIO_PROCESS, 0, 0) == -1) {
crm_info("Could not reset process priority for %s", op->id);
}
/* Man: The call setpgrp() is equivalent to setpgid(0,0)
* _and_ compiles on BSD variants too
* need to investigate if it works the same too.
*/
setpgid(0, 0);
pcmk__close_fds_in_child(false);
/* It would be nice if errors in this function could be reported as
* execution status (for example, PCMK_EXEC_NO_SECRETS for the secrets error
* below) instead of exit status. However, we've already forked, so
* exit status is all we have. At least for OCF actions, we can output an
* exit reason for the parent to parse.
*
* @TODO It might be better to substitute secrets in the parent before
* forking, so that if it fails, we can give a better message and result,
* and avoid the fork.
*/
#if PCMK__ENABLE_CIBSECRETS
rc = pcmk__substitute_secrets(op->rsc, op->params);
if (rc != pcmk_rc_ok) {
if (pcmk__str_eq(op->action, PCMK_ACTION_STOP, pcmk__str_casei)) {
crm_info("Proceeding with stop operation for %s "
"despite being unable to load CIB secrets (%s)",
op->rsc, pcmk_rc_str(rc));
} else {
crm_err("Considering %s unconfigured "
"because unable to load CIB secrets: %s",
op->rsc, pcmk_rc_str(rc));
exit_child(op, services__configuration_error(op, false),
"Unable to load CIB secrets");
}
}
#endif
add_action_env_vars(op);
/* Become the desired user */
if (op->opaque->uid && (geteuid() == 0)) {
// If requested, set effective group
if (op->opaque->gid && (setgid(op->opaque->gid) < 0)) {
crm_err("Considering %s unauthorized because could not set "
"child group to %d: %s",
op->id, op->opaque->gid, strerror(errno));
exit_child(op, services__authorization_error(op),
"Could not set group for child process");
}
// Erase supplementary group list
// (We could do initgroups() if we kept a copy of the username)
if (setgroups(0, NULL) < 0) {
crm_err("Considering %s unauthorized because could not "
"clear supplementary groups: %s", op->id, strerror(errno));
exit_child(op, services__authorization_error(op),
"Could not clear supplementary groups for child process");
}
// Set effective user
if (setuid(op->opaque->uid) < 0) {
crm_err("Considering %s unauthorized because could not set user "
"to %d: %s", op->id, op->opaque->uid, strerror(errno));
exit_child(op, services__authorization_error(op),
"Could not set user for child process");
}
}
// Execute the agent (doesn't return if successful)
execvp(op->opaque->exec, op->opaque->args);
// An earlier stat() should have avoided most possible errors
rc = errno;
services__handle_exec_error(op, rc);
crm_err("Unable to execute %s: %s", op->id, strerror(rc));
exit_child(op, op->rc, "Child process was unable to execute file");
}
/*!
* \internal
* \brief Wait for synchronous action to complete, and set its result
*
* \param[in,out] op Action to wait for
* \param[in,out] data Child signal data
*/
static void
wait_for_sync_result(svc_action_t *op, struct sigchld_data_s *data)
{
int status = 0;
int timeout = op->timeout;
time_t start = time(NULL);
struct pollfd fds[3];
int wait_rc = 0;
const char *wait_reason = NULL;
fds[0].fd = op->opaque->stdout_fd;
fds[0].events = POLLIN;
fds[0].revents = 0;
fds[1].fd = op->opaque->stderr_fd;
fds[1].events = POLLIN;
fds[1].revents = 0;
fds[2].fd = sigchld_open(data);
fds[2].events = POLLIN;
fds[2].revents = 0;
crm_trace("Waiting for %s[%d]", op->id, op->pid);
do {
int poll_rc = poll(fds, 3, timeout);
wait_reason = NULL;
if (poll_rc > 0) {
if (fds[0].revents & POLLIN) {
svc_read_output(op->opaque->stdout_fd, op, FALSE);
}
if (fds[1].revents & POLLIN) {
svc_read_output(op->opaque->stderr_fd, op, TRUE);
}
if ((fds[2].revents & POLLIN)
&& sigchld_received(fds[2].fd, op->pid, data)) {
wait_rc = waitpid(op->pid, &status, WNOHANG);
if ((wait_rc > 0) || ((wait_rc < 0) && (errno == ECHILD))) {
// Child process exited or doesn't exist
break;
} else if (wait_rc < 0) {
wait_reason = pcmk_rc_str(errno);
crm_info("Wait for completion of %s[%d] failed: %s "
QB_XS " source=waitpid",
op->id, op->pid, wait_reason);
wait_rc = 0; // Act as if process is still running
#ifndef HAVE_SYS_SIGNALFD_H
} else {
/* The child hasn't exited, so this SIGCHLD could be for
* another child. We have to ignore it here but will still
* need to resend it after this synchronous action has
* completed and SIGCHLD has been restored to be handled by
* the previous handler, so that it will be handled.
*/
data->ignored = true;
#endif
}
}
} else if (poll_rc == 0) {
// Poll timed out with no descriptors ready
timeout = 0;
break;
} else if ((poll_rc < 0) && (errno != EINTR)) {
wait_reason = pcmk_rc_str(errno);
crm_info("Wait for completion of %s[%d] failed: %s "
QB_XS " source=poll", op->id, op->pid, wait_reason);
break;
}
timeout = op->timeout - (time(NULL) - start) * 1000;
} while ((op->timeout < 0 || timeout > 0));
crm_trace("Stopped waiting for %s[%d]", op->id, op->pid);
finish_op_output(op, true);
finish_op_output(op, false);
close_op_input(op);
sigchld_close(fds[2].fd);
if (wait_rc <= 0) {
if ((op->timeout > 0) && (timeout <= 0)) {
services__format_result(op, services__generic_error(op),
PCMK_EXEC_TIMEOUT,
"%s did not exit within specified timeout",
services__action_kind(op));
crm_info("%s[%d] timed out after %dms",
op->id, op->pid, op->timeout);
} else {
services__set_result(op, services__generic_error(op),
PCMK_EXEC_ERROR, wait_reason);
}
/* If only child hasn't been successfully waited for, yet.
This is to limit killing wrong target a bit more. */
if ((wait_rc == 0) && (waitpid(op->pid, &status, WNOHANG) == 0)) {
if (kill(op->pid, SIGKILL)) {
crm_warn("Could not kill rogue child %s[%d]: %s",
op->id, op->pid, pcmk_rc_str(errno));
}
/* Safe to skip WNOHANG here as we sent non-ignorable signal. */
while ((waitpid(op->pid, &status, 0) == (pid_t) -1)
&& (errno == EINTR)) {
/* keep waiting */;
}
}
} else if (WIFEXITED(status)) {
services__set_result(op, WEXITSTATUS(status), PCMK_EXEC_DONE, NULL);
parse_exit_reason_from_stderr(op);
crm_info("%s[%d] exited with status %d", op->id, op->pid, op->rc);
} else if (WIFSIGNALED(status)) {
int signo = WTERMSIG(status);
services__format_result(op, services__generic_error(op),
PCMK_EXEC_ERROR, "%s interrupted by %s signal",
services__action_kind(op), strsignal(signo));
crm_info("%s[%d] terminated with signal %d (%s)",
op->id, op->pid, signo, strsignal(signo));
#ifdef WCOREDUMP
if (WCOREDUMP(status)) {
crm_warn("%s[%d] dumped core", op->id, op->pid);
}
#endif
} else {
// Shouldn't be possible to get here
services__set_result(op, services__generic_error(op), PCMK_EXEC_ERROR,
"Unable to wait for child to complete");
}
}
/*!
* \internal
* \brief Execute an action whose standard uses executable files
*
* \param[in,out] op Action to execute
*
* \return Standard Pacemaker return value
* \retval EBUSY Recurring operation could not be initiated
* \retval pcmk_rc_error Synchronous action failed
* \retval pcmk_rc_ok Synchronous action succeeded, or asynchronous action
* should not be freed (because it's pending or because
* it failed to execute and was already freed)
*
* \note If the return value for an asynchronous action is not pcmk_rc_ok, the
* caller is responsible for freeing the action.
*/
int
services__execute_file(svc_action_t *op)
{
int stdout_fd[2];
int stderr_fd[2];
int stdin_fd[2] = {-1, -1};
int rc;
struct stat st;
struct sigchld_data_s data = { .ignored = false };
// Catch common failure conditions early
if (stat(op->opaque->exec, &st) != 0) {
rc = errno;
crm_info("Cannot execute '%s': %s " QB_XS " stat rc=%d",
op->opaque->exec, pcmk_rc_str(rc), rc);
services__handle_exec_error(op, rc);
goto done;
}
if (pipe(stdout_fd) < 0) {
rc = errno;
crm_info("Cannot execute '%s': %s " QB_XS " pipe(stdout) rc=%d",
op->opaque->exec, pcmk_rc_str(rc), rc);
services__handle_exec_error(op, rc);
goto done;
}
if (pipe(stderr_fd) < 0) {
rc = errno;
close_pipe(stdout_fd);
crm_info("Cannot execute '%s': %s " QB_XS " pipe(stderr) rc=%d",
op->opaque->exec, pcmk_rc_str(rc), rc);
services__handle_exec_error(op, rc);
goto done;
}
if (pcmk_is_set(pcmk_get_ra_caps(op->standard), pcmk_ra_cap_stdin)) {
if (pipe(stdin_fd) < 0) {
rc = errno;
close_pipe(stdout_fd);
close_pipe(stderr_fd);
crm_info("Cannot execute '%s': %s " QB_XS " pipe(stdin) rc=%d",
op->opaque->exec, pcmk_rc_str(rc), rc);
services__handle_exec_error(op, rc);
goto done;
}
}
if (op->synchronous && !sigchld_setup(&data)) {
close_pipe(stdin_fd);
close_pipe(stdout_fd);
close_pipe(stderr_fd);
sigchld_cleanup(&data);
services__set_result(op, services__generic_error(op), PCMK_EXEC_ERROR,
"Could not manage signals for child process");
goto done;
}
op->pid = fork();
switch (op->pid) {
case -1:
rc = errno;
close_pipe(stdin_fd);
close_pipe(stdout_fd);
close_pipe(stderr_fd);
crm_info("Cannot execute '%s': %s " QB_XS " fork rc=%d",
op->opaque->exec, pcmk_rc_str(rc), rc);
services__handle_exec_error(op, rc);
if (op->synchronous) {
sigchld_cleanup(&data);
}
goto done;
break;
case 0: /* Child */
close(stdout_fd[0]);
close(stderr_fd[0]);
if (stdin_fd[1] >= 0) {
close(stdin_fd[1]);
}
if (STDOUT_FILENO != stdout_fd[1]) {
if (dup2(stdout_fd[1], STDOUT_FILENO) != STDOUT_FILENO) {
crm_warn("Can't redirect output from '%s': %s "
QB_XS " errno=%d",
op->opaque->exec, pcmk_rc_str(errno), errno);
}
close(stdout_fd[1]);
}
if (STDERR_FILENO != stderr_fd[1]) {
if (dup2(stderr_fd[1], STDERR_FILENO) != STDERR_FILENO) {
crm_warn("Can't redirect error output from '%s': %s "
QB_XS " errno=%d",
op->opaque->exec, pcmk_rc_str(errno), errno);
}
close(stderr_fd[1]);
}
if ((stdin_fd[0] >= 0) &&
(STDIN_FILENO != stdin_fd[0])) {
if (dup2(stdin_fd[0], STDIN_FILENO) != STDIN_FILENO) {
crm_warn("Can't redirect input to '%s': %s "
QB_XS " errno=%d",
op->opaque->exec, pcmk_rc_str(errno), errno);
}
close(stdin_fd[0]);
}
if (op->synchronous) {
sigchld_cleanup(&data);
}
action_launch_child(op);
pcmk__assert(false); // action_launch_child() should not return
}
/* Only the parent reaches here */
close(stdout_fd[1]);
close(stderr_fd[1]);
if (stdin_fd[0] >= 0) {
close(stdin_fd[0]);
}
op->opaque->stdout_fd = stdout_fd[0];
rc = pcmk__set_nonblocking(op->opaque->stdout_fd);
if (rc != pcmk_rc_ok) {
crm_info("Could not set '%s' output non-blocking: %s "
QB_XS " rc=%d",
op->opaque->exec, pcmk_rc_str(rc), rc);
}
op->opaque->stderr_fd = stderr_fd[0];
rc = pcmk__set_nonblocking(op->opaque->stderr_fd);
if (rc != pcmk_rc_ok) {
crm_info("Could not set '%s' error output non-blocking: %s "
QB_XS " rc=%d",
op->opaque->exec, pcmk_rc_str(rc), rc);
}
op->opaque->stdin_fd = stdin_fd[1];
if (op->opaque->stdin_fd >= 0) {
// using buffer behind non-blocking-fd here - that could be improved
// as long as no other standard uses stdin_fd assume stonith
rc = pcmk__set_nonblocking(op->opaque->stdin_fd);
if (rc != pcmk_rc_ok) {
crm_info("Could not set '%s' input non-blocking: %s "
QB_XS " fd=%d,rc=%d", op->opaque->exec,
pcmk_rc_str(rc), op->opaque->stdin_fd, rc);
}
pipe_in_action_stdin_parameters(op);
// as long as we are handling parameters directly in here just close
close(op->opaque->stdin_fd);
op->opaque->stdin_fd = -1;
}
// after fds are setup properly and before we plug anything into mainloop
if (op->opaque->fork_callback) {
op->opaque->fork_callback(op);
}
if (op->synchronous) {
wait_for_sync_result(op, &data);
sigchld_cleanup(&data);
goto done;
}
crm_trace("Waiting async for '%s'[%d]", op->opaque->exec, op->pid);
mainloop_child_add_with_flags(op->pid, op->timeout, op->id, op,
pcmk_is_set(op->flags, SVC_ACTION_LEAVE_GROUP)? mainloop_leave_pid_group : 0,
async_action_complete);
op->opaque->stdout_gsource = mainloop_add_fd(op->id,
G_PRIORITY_LOW,
op->opaque->stdout_fd, op,
&stdout_callbacks);
op->opaque->stderr_gsource = mainloop_add_fd(op->id,
G_PRIORITY_LOW,
op->opaque->stderr_fd, op,
&stderr_callbacks);
services_add_inflight_op(op);
return pcmk_rc_ok;
done:
if (op->synchronous) {
return (op->rc == PCMK_OCF_OK)? pcmk_rc_ok : pcmk_rc_error;
} else {
return services__finalize_async_op(op);
}
}
GList *
services_os_get_single_directory_list(const char *root, gboolean files, gboolean executable)
{
GList *list = NULL;
struct dirent **namelist;
int entries = 0, lpc = 0;
char buffer[PATH_MAX];
entries = scandir(root, &namelist, NULL, alphasort);
if (entries <= 0) {
return list;
}
for (lpc = 0; lpc < entries; lpc++) {
struct stat sb;
if ('.' == namelist[lpc]->d_name[0]) {
free(namelist[lpc]);
continue;
}
snprintf(buffer, sizeof(buffer), "%s/%s", root, namelist[lpc]->d_name);
if (stat(buffer, &sb)) {
continue;
}
if (S_ISDIR(sb.st_mode)) {
if (files) {
free(namelist[lpc]);
continue;
}
} else if (S_ISREG(sb.st_mode)) {
if (files == FALSE) {
free(namelist[lpc]);
continue;
} else if (executable
&& (sb.st_mode & S_IXUSR) == 0
&& (sb.st_mode & S_IXGRP) == 0 && (sb.st_mode & S_IXOTH) == 0) {
free(namelist[lpc]);
continue;
}
}
list = g_list_append(list, strdup(namelist[lpc]->d_name));
free(namelist[lpc]);
}
free(namelist);
return list;
}
GList *
services_os_get_directory_list(const char *root, gboolean files, gboolean executable)
{
GList *result = NULL;
char *dirs = strdup(root);
char *dir = NULL;
if (pcmk__str_empty(dirs)) {
free(dirs);
return result;
}
for (dir = strtok(dirs, ":"); dir != NULL; dir = strtok(NULL, ":")) {
GList *tmp = services_os_get_single_directory_list(dir, files, executable);
if (tmp) {
result = g_list_concat(result, tmp);
}
}
free(dirs);
return result;
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Wed, Jun 25, 4:20 AM (1 d, 11 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1952125
Default Alt Text
(226 KB)
Attached To
Mode
rP Pacemaker
Attached
Detach File
Event Timeline
Log In to Comment