Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F4639556
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
34 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/crm/crmd/messages.c b/crm/crmd/messages.c
index 7344705fcc..8a78c68433 100644
--- a/crm/crmd/messages.c
+++ b/crm/crmd/messages.c
@@ -1,1237 +1,1234 @@
/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <portability.h>
#include <sys/param.h>
#include <crm/crm.h>
#include <string.h>
#include <time.h>
#include <crmd_fsa.h>
#include <hb_api.h>
#include <lrm/lrm_api.h>
#include <crm/msg_xml.h>
#include <crm/common/xml.h>
#include <crm/common/msg.h>
#include <crm/cib.h>
#include <crmd.h>
#include <crmd_messages.h>
#include <crmd_lrm.h>
#include <crm/dmalloc_wrapper.h>
GListPtr fsa_message_queue = NULL;
extern void crm_shutdown(int nsig);
enum crmd_fsa_input handle_request(ha_msg_input_t *stored_msg);
enum crmd_fsa_input handle_response(ha_msg_input_t *stored_msg);
enum crmd_fsa_input handle_shutdown_request(HA_Message *stored_msg);
ha_msg_input_t *copy_ha_msg_input(ha_msg_input_t *orig);
gboolean ipc_queue_helper(gpointer key, gpointer value, gpointer user_data);
#ifdef MSG_LOG
# define ROUTER_RESULT(x) crm_debug_3("Router result: %s", x); \
crm_log_message_adv(LOG_MSG, "router.log", relay_message);
#else
# define ROUTER_RESULT(x) crm_debug_3("Router result: %s", x)
#endif
/* debug only, can wrap all it likes */
int last_data_id = 0;
void
register_fsa_error_adv(
enum crmd_fsa_cause cause, enum crmd_fsa_input input,
fsa_data_t *cur_data, void *new_data, const char *raised_from)
{
/* save the current actions if any */
if(fsa_actions != A_NOTHING) {
register_fsa_input_adv(
cur_data?cur_data->fsa_cause:C_FSA_INTERNAL,
I_NULL, cur_data?cur_data->data:NULL,
fsa_actions, TRUE, __FUNCTION__);
}
/* reset the action list */
fsa_actions = A_NOTHING;
/* register the error */
register_fsa_input_adv(
cause, input, new_data, A_NOTHING, TRUE, raised_from);
}
static gboolean last_was_vote = FALSE;
int
register_fsa_input_adv(
enum crmd_fsa_cause cause, enum crmd_fsa_input input,
void *data, long long with_actions,
gboolean prepend, const char *raised_from)
{
unsigned old_len = g_list_length(fsa_message_queue);
fsa_data_t *fsa_data = NULL;
last_data_id++;
crm_debug_2("%s raised FSA input %d (%s) (cause=%s) %s data",
raised_from, last_data_id, fsa_input2string(input),
fsa_cause2string(cause), data?"with":"without");
if(input == I_WAIT_FOR_EVENT) {
do_fsa_stall = TRUE;
crm_debug("Stalling the FSA pending further input: cause=%s",
fsa_cause2string(cause));
if(old_len > 0) {
crm_warn("%s stalled the FSA with pending inputs",
raised_from);
fsa_dump_queue(LOG_DEBUG);
}
if(data == NULL) {
set_bit_inplace(fsa_actions, with_actions);
with_actions = A_NOTHING;
return 0;
}
crm_err("%s stalled the FSA with data - this may be broken",
raised_from);
}
if(old_len == 0) {
last_was_vote = FALSE;
}
if(input == I_NULL && with_actions == A_NOTHING /* && data == NULL */){
/* no point doing anything */
crm_err("Cannot add entry to queue: no input and no action");
return 0;
} else if(data == NULL) {
last_was_vote = FALSE;
#if 0
} else if(last_was_vote && cause == C_HA_MESSAGE && input == I_ROUTER) {
const char *op = cl_get_string(
((ha_msg_input_t*)data)->msg, F_CRM_TASK);
if(safe_str_eq(op, CRM_OP_VOTE)) {
/* It is always safe to treat N successive votes as
* a single one
*
* If all the discarded votes are more "loosing" than
* the first then the result is accurate
* (win or loose).
*
* If any of the discarded votes are less "loosing"
* than the first then we will cast our vote and the
* eventual winner will vote us down again (which
* even in the case that N=2, is no worse than if we
* had not disarded the vote).
*/
crm_debug_2("Vote compression: %d", old_len);
return 0;
}
#endif
} else if (cause == C_HA_MESSAGE && input == I_ROUTER) {
const char *op = cl_get_string(
((ha_msg_input_t*)data)->msg, F_CRM_TASK);
if(safe_str_eq(op, CRM_OP_VOTE)) {
last_was_vote = TRUE;
crm_debug_3("Added vote: %d", old_len);
}
} else {
last_was_vote = FALSE;
}
crm_malloc0(fsa_data, sizeof(fsa_data_t));
fsa_data->id = last_data_id;
fsa_data->fsa_input = input;
fsa_data->fsa_cause = cause;
fsa_data->origin = raised_from;
fsa_data->data = NULL;
fsa_data->data_type = fsa_dt_none;
fsa_data->actions = with_actions;
if(with_actions != A_NOTHING) {
crm_debug_3("Adding actions %.16llx to input", with_actions);
}
if(data != NULL) {
switch(cause) {
case C_FSA_INTERNAL:
case C_CRMD_STATUS_CALLBACK:
case C_IPC_MESSAGE:
case C_HA_MESSAGE:
crm_debug_3("Copying %s data from %s as a HA msg",
fsa_cause2string(cause),
raised_from);
fsa_data->data = copy_ha_msg_input(data);
fsa_data->data_type = fsa_dt_ha_msg;
break;
case C_LRM_OP_CALLBACK:
crm_debug_3("Copying %s data from %s as lrm_op_t",
fsa_cause2string(cause),
raised_from);
fsa_data->data = copy_lrm_op((lrm_op_t*)data);
fsa_data->data_type = fsa_dt_lrm;
break;
case C_CCM_CALLBACK:
crm_debug_3("Copying %s data from %s as CCM data",
fsa_cause2string(cause),
raised_from);
fsa_data->data = copy_ccm_data(data);
fsa_data->data_type = fsa_dt_ccm;
break;
case C_SUBSYSTEM_CONNECT:
case C_LRM_MONITOR_CALLBACK:
case C_TIMER_POPPED:
case C_SHUTDOWN:
case C_HEARTBEAT_FAILED:
case C_HA_DISCONNECT:
case C_ILLEGAL:
case C_UNKNOWN:
case C_STARTUP:
crm_err("Copying %s data (from %s)"
" not yet implemented",
fsa_cause2string(cause), raised_from);
exit(1);
break;
}
crm_debug_4("%s data copied",
fsa_cause2string(fsa_data->fsa_cause));
}
/* make sure to free it properly later */
if(prepend) {
crm_debug_2("Prepending input");
fsa_message_queue = g_list_prepend(fsa_message_queue, fsa_data);
} else {
fsa_message_queue = g_list_append(fsa_message_queue, fsa_data);
}
crm_debug_2("Queue len: %d", g_list_length(fsa_message_queue));
fsa_dump_queue(LOG_DEBUG_2);
if(old_len == g_list_length(fsa_message_queue)){
crm_err("Couldnt add message to the queue");
}
if(fsa_source) {
crm_debug_3("Triggering FSA: %s", __FUNCTION__);
G_main_set_trigger(fsa_source);
}
return last_data_id;
}
void
fsa_dump_queue(int log_level)
{
if(log_level < (int)crm_log_level) {
return;
}
slist_iter(
data, fsa_data_t, fsa_message_queue, lpc,
do_crm_log(log_level, __FILE__, __FUNCTION__,
"queue[%d(%d)]: input %s raised by %s()\t(cause=%s)",
lpc, data->id, fsa_input2string(data->fsa_input),
data->origin, fsa_cause2string(data->fsa_cause));
);
}
ha_msg_input_t *
copy_ha_msg_input(ha_msg_input_t *orig)
{
ha_msg_input_t *input_copy = NULL;
crm_malloc0(input_copy, sizeof(ha_msg_input_t));
if(orig != NULL) {
crm_debug_4("Copy msg");
input_copy->msg = ha_msg_copy(orig->msg);
if(orig->xml != NULL) {
crm_debug_4("Copy xml");
input_copy->xml = copy_xml(orig->xml);
}
} else {
crm_debug_3("No message to copy");
}
return input_copy;
}
void
delete_fsa_input(fsa_data_t *fsa_data)
{
lrm_op_t *op = NULL;
crm_data_t *foo = NULL;
struct crmd_ccm_data_s *ccm_input = NULL;
if(fsa_data == NULL) {
return;
}
crm_debug_4("About to free %s data",
fsa_cause2string(fsa_data->fsa_cause));
if(fsa_data->data != NULL) {
switch(fsa_data->data_type) {
case fsa_dt_ha_msg:
delete_ha_msg_input(fsa_data->data);
break;
case fsa_dt_xml:
foo = fsa_data->data;
free_xml(foo);
break;
case fsa_dt_lrm:
op = (lrm_op_t*)fsa_data->data;
free_lrm_op(op);
break;
case fsa_dt_ccm:
ccm_input = (struct crmd_ccm_data_s *)
fsa_data->data;
crm_free(ccm_input->oc);
crm_free(ccm_input);
break;
case fsa_dt_none:
if(fsa_data->data != NULL) {
crm_err("Dont know how to free %s data from %s",
fsa_cause2string(fsa_data->fsa_cause),
fsa_data->origin);
exit(1);
}
break;
}
crm_debug_4("%s data freed",
fsa_cause2string(fsa_data->fsa_cause));
}
crm_free(fsa_data);
}
/* returns the next message */
fsa_data_t *
get_message(void)
{
fsa_data_t* message = g_list_nth_data(fsa_message_queue, 0);
fsa_message_queue = g_list_remove(fsa_message_queue, message);
crm_debug_2("Processing input %d", message->id);
return message;
}
/* returns the current head of the FIFO queue */
gboolean
is_message(void)
{
return (g_list_length(fsa_message_queue) > 0);
}
void *
fsa_typed_data_adv(
fsa_data_t *fsa_data, enum fsa_data_type a_type, const char *caller)
{
void *ret_val = NULL;
if(fsa_data == NULL) {
do_crm_log(LOG_ERR, caller, NULL, "No FSA data available");
} else if(fsa_data->data == NULL) {
do_crm_log(LOG_ERR, caller, NULL, "No message data available");
} else if(fsa_data->data_type != a_type) {
do_crm_log(LOG_CRIT, caller, NULL,
"Message data was the wrong type! %d vs. requested=%d."
" Origin: %s",
fsa_data->data_type, a_type, fsa_data->origin);
CRM_ASSERT(fsa_data->data_type == a_type);
} else {
ret_val = fsa_data->data;
}
return ret_val;
}
/* A_MSG_ROUTE */
enum crmd_fsa_input
do_msg_route(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input,
fsa_data_t *msg_data)
{
enum crmd_fsa_input result = I_NULL;
ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg);
gboolean routed = FALSE;
if(msg_data->fsa_cause != C_IPC_MESSAGE
&& msg_data->fsa_cause != C_HA_MESSAGE) {
/* dont try and route these */
crm_warn("Can only process HA and IPC messages");
return I_NULL;
}
/* try passing the buck first */
crm_debug_4("Attempting to route message");
routed = relay_message(input->msg, cause==C_IPC_MESSAGE);
if(routed == FALSE) {
crm_debug_4("Message wasn't routed... try handling locally");
/* calculate defer */
result = handle_message(input);
switch(result) {
case I_NULL:
break;
case I_DC_HEARTBEAT:
break;
case I_CIB_OP:
break;
/* case I_JOIN_OFFER: */
/* break; */
/* case I_JOIN_RESULT: */
/* break; */
/* what else should go here? */
default:
crm_debug_4("Defering local processing of message");
register_fsa_input_later(
cause, result, msg_data->data);
result = I_NULL;
break;
}
if(result == I_NULL) {
crm_debug_4("Message processed");
} else {
register_fsa_input(cause, result, msg_data->data);
}
} else {
crm_debug_4("Message routed...");
input->msg = NULL;
}
return I_NULL;
}
/*
* This method frees msg
*/
gboolean
send_request(HA_Message *msg, char **msg_reference)
{
gboolean was_sent = FALSE;
/* crm_log_xml_debug_3(request, "Final request..."); */
if(msg_reference != NULL) {
*msg_reference = crm_strdup(
cl_get_string(msg, XML_ATTR_REFERENCE));
}
was_sent = relay_message(msg, TRUE);
if(was_sent == FALSE) {
ha_msg_input_t *fsa_input = new_ha_msg_input(msg);
register_fsa_input(C_IPC_MESSAGE, I_ROUTER, fsa_input);
delete_ha_msg_input(fsa_input);
crm_msg_del(msg);
}
return was_sent;
}
/* unless more processing is required, relay_message is freed */
gboolean
relay_message(HA_Message *relay_message, gboolean originated_locally)
{
int is_for_dc = 0;
int is_for_dcib = 0;
int is_for_te = 0;
int is_for_crm = 0;
int is_for_cib = 0;
int is_local = 0;
gboolean processing_complete = FALSE;
const char *host_to = cl_get_string(relay_message, F_CRM_HOST_TO);
const char *sys_to = cl_get_string(relay_message, F_CRM_SYS_TO);
const char *sys_from= cl_get_string(relay_message, F_CRM_SYS_FROM);
const char *type = cl_get_string(relay_message, F_TYPE);
const char *msg_error = NULL;
crm_debug_3("Routing message %s",
cl_get_string(relay_message, XML_ATTR_REFERENCE));
if(relay_message == NULL) {
msg_error = "Cannot route empty message";
} else if(safe_str_eq(CRM_OP_HELLO,
cl_get_string(relay_message, F_CRM_TASK))){
/* quietly ignore */
processing_complete = TRUE;
} else if(safe_str_neq(type, T_CRM)) {
msg_error = "Bad message type";
} else if(sys_to == NULL) {
msg_error = "Bad message destination: no subsystem";
}
if(msg_error != NULL) {
processing_complete = TRUE;
crm_err("%s", msg_error);
crm_log_message(LOG_WARNING, relay_message);
}
if(processing_complete) {
crm_msg_del(relay_message);
return TRUE;
}
processing_complete = TRUE;
is_for_dc = (strcmp(CRM_SYSTEM_DC, sys_to) == 0);
is_for_dcib = (strcmp(CRM_SYSTEM_DCIB, sys_to) == 0);
is_for_te = (strcmp(CRM_SYSTEM_TENGINE, sys_to) == 0);
is_for_cib = (strcmp(CRM_SYSTEM_CIB, sys_to) == 0);
is_for_crm = (strcmp(CRM_SYSTEM_CRMD, sys_to) == 0);
is_local = 0;
if(host_to == NULL || strlen(host_to) == 0) {
if(is_for_dc || is_for_te) {
is_local = 0;
} else if(is_for_crm && originated_locally) {
is_local = 0;
} else {
is_local = 1;
}
} else if(strcmp(fsa_our_uname, host_to) == 0) {
is_local=1;
}
if(is_for_dc || is_for_dcib || is_for_te) {
if(AM_I_DC && is_for_te) {
ROUTER_RESULT("Message result: Local relay");
send_msg_via_ipc(relay_message, sys_to);
} else if(AM_I_DC) {
ROUTER_RESULT("Message result: DC/CRMd process");
processing_complete = FALSE; /* more to be done by caller */
} else if(originated_locally
&& safe_str_neq(sys_from, CRM_SYSTEM_PENGINE)
&& safe_str_neq(sys_from, CRM_SYSTEM_TENGINE)) {
/* Neither the TE or PE should be sending messages
* to DC's on other nodes
*
* By definition, if we are no longer the DC, then
* the PE or TE's data should be discarded
*/
ROUTER_RESULT("Message result: External relay to DC");
send_msg_via_ha(fsa_cluster_conn, relay_message);
} else {
/* discard */
ROUTER_RESULT("Message result: Discard, not DC");
crm_msg_del(relay_message);
}
} else if(is_local && (is_for_crm || is_for_cib)) {
ROUTER_RESULT("Message result: CRMd process");
processing_complete = FALSE; /* more to be done by caller */
} else if(is_local) {
ROUTER_RESULT("Message result: Local relay");
send_msg_via_ipc(relay_message, sys_to);
} else {
ROUTER_RESULT("Message result: External relay");
send_msg_via_ha(fsa_cluster_conn, relay_message);
}
return processing_complete;
}
gboolean
crmd_authorize_message(ha_msg_input_t *client_msg, crmd_client_t *curr_client)
{
/* check the best case first */
const char *sys_from = cl_get_string(client_msg->msg, F_CRM_SYS_FROM);
char *uuid = NULL;
char *client_name = NULL;
char *major_version = NULL;
char *minor_version = NULL;
const char *filtered_from;
gpointer table_key = NULL;
gboolean auth_result = FALSE;
struct crm_subsystem_s *the_subsystem = NULL;
gboolean can_reply = FALSE; /* no-one has registered with this id */
const char *op = cl_get_string(client_msg->msg, F_CRM_TASK);
if (safe_str_neq(CRM_OP_HELLO, op)) {
if(sys_from == NULL) {
crm_warn("Message [%s] was had no value for %s... discarding",
cl_get_string(client_msg->msg, XML_ATTR_REFERENCE),
F_CRM_SYS_FROM);
return FALSE;
}
filtered_from = sys_from;
/* The CIB can have two names on the DC */
if(strcmp(sys_from, CRM_SYSTEM_DCIB) == 0)
filtered_from = CRM_SYSTEM_CIB;
if (g_hash_table_lookup (ipc_clients, filtered_from) != NULL) {
can_reply = TRUE; /* reply can be routed */
}
crm_debug_2("Message reply can%s be routed from %s.",
can_reply?"":" not", sys_from);
if(can_reply == FALSE) {
crm_warn("Message [%s] not authorized",
cl_get_string(client_msg->msg, XML_ATTR_REFERENCE));
}
return can_reply;
}
crm_debug_3("received client join msg");
crm_log_message(LOG_MSG, client_msg->msg);
auth_result = process_hello_message(
client_msg->xml, &uuid, &client_name,
&major_version, &minor_version);
if (auth_result == TRUE) {
if(client_name == NULL || uuid == NULL) {
crm_err("Bad client details (client_name=%s, uuid=%s)",
crm_str(client_name), crm_str(uuid));
auth_result = FALSE;
}
}
if (auth_result == TRUE) {
/* check version */
int mav = atoi(major_version);
int miv = atoi(minor_version);
crm_debug_3("Checking client version number");
if (mav < 0 || miv < 0) {
crm_err("Client version (%d:%d) is not acceptable",
mav, miv);
auth_result = FALSE;
}
crm_free(major_version);
crm_free(minor_version);
}
if (auth_result == TRUE) {
/* if we already have one of those clients
* only applies to te, pe etc. not admin clients
*/
if (strcmp(CRM_SYSTEM_PENGINE, client_name) == 0) {
the_subsystem = pe_subsystem;
} else if (strcmp(CRM_SYSTEM_TENGINE, client_name) == 0) {
the_subsystem = te_subsystem;
}
if (the_subsystem != NULL) {
/* do we already have one? */
crm_debug_3("Checking if %s is required/already connected",
client_name);
if(is_set(fsa_input_register,
the_subsystem->flag_connected)) {
auth_result = FALSE;
crm_warn("Bit\t%.16llx set in %.16llx",
the_subsystem->flag_connected,
fsa_input_register);
crm_err("Client %s is already connected",
client_name);
} else if(FALSE == is_set(fsa_input_register,
the_subsystem->flag_required)) {
auth_result = TRUE;
crm_warn("Bit\t%.16llx not set in %.16llx",
the_subsystem->flag_connected,
fsa_input_register);
crm_warn("Client %s joined but we dont need it",
client_name);
stop_subsystem(the_subsystem, TRUE);
} else {
the_subsystem->ipc =
curr_client->client_channel;
set_bit_inplace(fsa_input_register,
the_subsystem->flag_connected);
}
} else {
table_key = (gpointer)
generate_hash_key(client_name, uuid);
}
}
if (auth_result == TRUE) {
if(table_key == NULL) {
table_key = (gpointer)crm_strdup(client_name);
}
crm_debug_2("Accepted client %s", crm_str(table_key));
curr_client->table_key = table_key;
curr_client->sub_sys = crm_strdup(client_name);
curr_client->uuid = crm_strdup(uuid);
g_hash_table_insert (ipc_clients,
table_key, curr_client->client_channel);
send_hello_message(curr_client->client_channel,
"n/a", CRM_SYSTEM_CRMD,
"0", "1");
crm_debug_3("Updated client list with %s", crm_str(table_key));
crm_debug_3("Triggering FSA: %s", __FUNCTION__);
G_main_set_trigger(fsa_source);
} else {
crm_warn("Rejected client logon request");
curr_client->client_channel->ch_status = IPC_DISC_PENDING;
}
if(uuid != NULL) crm_free(uuid);
if(minor_version != NULL) crm_free(minor_version);
if(major_version != NULL) crm_free(major_version);
if(client_name != NULL) crm_free(client_name);
/* hello messages should never be processed further */
return FALSE;
}
enum crmd_fsa_input
handle_message(ha_msg_input_t *stored_msg)
{
enum crmd_fsa_input next_input = I_NULL;
const char *type = NULL;
if(stored_msg == NULL || stored_msg->msg == NULL) {
crm_err("No message to handle");
return I_NULL;
}
type = cl_get_string(stored_msg->msg, F_CRM_MSG_TYPE);
if(safe_str_eq(type, XML_ATTR_REQUEST)) {
next_input = handle_request(stored_msg);
} else if(safe_str_eq(type, XML_ATTR_RESPONSE)) {
next_input = handle_response(stored_msg);
} else {
crm_err("Unknown message type: %s", type);
}
/* crm_debug_2("%s: Next input is %s", __FUNCTION__, */
/* fsa_input2string(next_input)); */
return next_input;
}
#define schedule_pe() \
{ \
next_input = I_PE_CALC; \
if(fsa_pe_ref) { \
crm_debug("Cancelling %s...", fsa_pe_ref); \
crm_free(fsa_pe_ref); \
fsa_pe_ref = NULL; \
} \
}
enum crmd_fsa_input
handle_request(ha_msg_input_t *stored_msg)
{
HA_Message *msg = NULL;
enum crmd_fsa_input next_input = I_NULL;
const char *op = cl_get_string(stored_msg->msg, F_CRM_TASK);
const char *sys_to = cl_get_string(stored_msg->msg, F_CRM_SYS_TO);
const char *host_from = cl_get_string(stored_msg->msg, F_CRM_HOST_FROM);
crm_debug_2("Received %s "XML_ATTR_REQUEST" from %s in state %s",
op, host_from, fsa_state2string(fsa_state));
if(op == NULL) {
crm_err("Bad message");
crm_log_message(LOG_ERR, stored_msg->msg);
/*========== common actions ==========*/
} else if(strcmp(op, CRM_OP_NOOP) == 0) {
crm_debug_2("no-op from %s", crm_str(host_from));
} else if(strcmp(op, CRM_OP_NOVOTE) == 0) {
register_fsa_input_adv(C_HA_MESSAGE, I_NULL, stored_msg,
A_ELECTION_COUNT|A_ELECTION_CHECK, FALSE, __FUNCTION__);
} else if(strcmp(op, CRM_OP_VOTE) == 0) {
/* count the vote and decide what to do after that */
register_fsa_input_adv(C_HA_MESSAGE, I_NULL, stored_msg,
A_ELECTION_COUNT|A_ELECTION_CHECK, FALSE, __FUNCTION__);
/* Sometimes we _must_ go into S_ELECTION */
if(fsa_state == S_HALT) {
crm_debug("Forcing an election from S_HALT");
next_input = I_ELECTION;
#if 0
} else if(AM_I_DC) {
/* This is the old way of doing things but what is gained? */
next_input = I_ELECTION;
#endif
}
} else if(strcmp(op, CRM_OP_LOCAL_SHUTDOWN) == 0) {
crm_shutdown(SIGTERM);
/*next_input = I_SHUTDOWN; */
next_input = I_NULL;
} else if(strcmp(op, CRM_OP_PING) == 0) {
/* eventually do some stuff to figure out
* if we /are/ ok
*/
crm_data_t *ping = createPingAnswerFragment(sys_to, "ok");
crm_xml_add(ping, "crmd_state", fsa_state2string(fsa_state));
crm_info("Current ping state: %s", fsa_state2string(fsa_state));
msg = create_reply(stored_msg->msg, ping);
if(relay_message(msg, TRUE) == FALSE) {
crm_msg_del(msg);
}
/* probably better to do this via signals on the
* local node
*/
} else if(strcmp(op, CRM_OP_DEBUG_UP) == 0) {
int level = get_crm_log_level();
set_crm_log_level(level+1);
crm_info("Debug set to %d (was %d)",
get_crm_log_level(), level);
} else if(strcmp(op, CRM_OP_DEBUG_DOWN) == 0) {
int level = get_crm_log_level();
set_crm_log_level(level-1);
crm_info("Debug set to %d (was %d)",
get_crm_log_level(), level);
} else if(strcmp(op, CRM_OP_JOIN_OFFER) == 0) {
next_input = I_JOIN_OFFER;
crm_debug("Raising I_JOIN_OFFER: join-%s",
cl_get_string(stored_msg->msg, F_CRM_JOIN_ID));
} else if(strcmp(op, CRM_OP_JOIN_ACKNAK) == 0) {
next_input = I_JOIN_RESULT;
crm_debug("Raising I_JOIN_RESULT: join-%s",
cl_get_string(stored_msg->msg, F_CRM_JOIN_ID));
} else if(strcmp(op, CRM_OP_LRM_DELETE) == 0
|| strcmp(op, CRM_OP_LRM_REFRESH) == 0
|| strcmp(op, CRM_OP_REPROBE) == 0) {
cl_msg_modstring(stored_msg->msg, F_CRM_SYS_TO, CRM_SYSTEM_LRMD);
next_input = I_ROUTER;
/* this functionality should only be enabled
* if this is a development build
*/
} else if(CRM_DEV_BUILD && strcmp(op, CRM_OP_DIE) == 0/*constant condition*/) {
crm_warn("Test-only code: Killing the CRM without mercy");
crm_warn("Inhibiting respawns");
exit(100);
/*========== (NOT_DC)-Only Actions ==========*/
} else if(AM_I_DC == FALSE){
gboolean dc_match = safe_str_eq(host_from, fsa_our_dc);
if(dc_match || fsa_our_dc == NULL) {
if(strcmp(op, CRM_OP_HBEAT) == 0) {
crm_debug_3("Received DC heartbeat from %s",
host_from);
next_input = I_DC_HEARTBEAT;
} else if(fsa_our_dc == NULL) {
crm_warn("CRMd discarding request: %s"
" (DC: %s, from: %s)",
op, crm_str(fsa_our_dc), host_from);
crm_warn("Ignored Request");
crm_log_message(LOG_WARNING, stored_msg->msg);
} else if(strcmp(op, CRM_OP_SHUTDOWN) == 0) {
next_input = I_STOP;
} else {
crm_err("CRMd didnt expect request: %s", op);
crm_log_message(LOG_ERR, stored_msg->msg);
}
} else {
crm_warn("Discarding %s op from %s", op, host_from);
}
/*========== DC-Only Actions ==========*/
} else if(AM_I_DC) {
const char *message = ha_msg_value(stored_msg->msg, "message");
/* setting "fsa_pe_ref = NULL" makes sure we ignore any
* PE reply that might be pending or in the queue while
* we ask the CIB for a more up-to-date copy
*/
if(safe_str_eq(op, CRM_OP_TEABORT)) {
crm_debug("Transition cancelled: %s/%s", op, message);
clear_bit_inplace(fsa_input_register, R_IN_TRANSITION);
if(need_transition(fsa_state)) {
schedule_pe();
} else {
crm_debug("Filtering %s op in state %s",
op, fsa_state2string(fsa_state));
}
} else if(strcmp(op, CRM_OP_TECOMPLETE) == 0) {
crm_debug("Transition complete: %s/%s", op, message);
clear_bit_inplace(fsa_input_register, R_IN_TRANSITION);
if(fsa_state == S_TRANSITION_ENGINE) {
next_input = I_TE_SUCCESS;
} else {
crm_debug("Filtering %s op in state %s",
op, fsa_state2string(fsa_state));
}
} else if(strcmp(op, CRM_OP_JOIN_ANNOUNCE) == 0) {
next_input = I_NODE_JOIN;
} else if(strcmp(op, CRM_OP_JOIN_REQUEST) == 0) {
next_input = I_JOIN_REQUEST;
} else if(strcmp(op, CRM_OP_JOIN_CONFIRM) == 0) {
next_input = I_JOIN_RESULT;
} else if(strcmp(op, CRM_OP_SHUTDOWN) == 0) {
gboolean dc_match = safe_str_eq(host_from, fsa_our_dc);
if(dc_match) {
crm_err("We didnt ask to be shut down yet our"
" TE is telling us too."
" Better get out now!");
next_input = I_TERMINATE;
} else if(is_set(fsa_input_register, R_SHUTDOWN)) {
crm_info("Shutting ourselves down (DC)");
next_input = I_STOP;
} else if(fsa_state != S_STOPPING) {
crm_err("Another node is asking us to shutdown"
" but we think we're ok.");
next_input = I_ELECTION;
}
} else if(strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0) {
/* a slave wants to shut down */
/* create cib fragment and add to message */
next_input = handle_shutdown_request(stored_msg->msg);
} else {
crm_err("Unexpected request (%s) sent to the DC", op);
crm_log_message(LOG_ERR, stored_msg->msg);
}
}
return next_input;
}
enum crmd_fsa_input
handle_response(ha_msg_input_t *stored_msg)
{
enum crmd_fsa_input next_input = I_NULL;
const char *op = cl_get_string(stored_msg->msg, F_CRM_TASK);
const char *sys_from = cl_get_string(stored_msg->msg, F_CRM_SYS_FROM);
const char *host_from = cl_get_string(stored_msg->msg, F_CRM_HOST_FROM);
const char *msg_ref = cl_get_string(stored_msg->msg, XML_ATTR_REFERENCE);
crm_debug_2("Received %s "XML_ATTR_RESPONSE" from %s in state %s",
op, host_from, fsa_state2string(fsa_state));
if(op == NULL) {
crm_err("Bad message");
crm_log_message(LOG_ERR, stored_msg->msg);
} else if(AM_I_DC && strcmp(op, CRM_OP_PECALC) == 0) {
crm_debug_2("Processing %s reply %s (fsa=%s)",
sys_from, msg_ref, crm_str(fsa_pe_ref));
if(msg_ref != NULL && safe_str_eq(msg_ref, fsa_pe_ref)) {
next_input = I_PE_SUCCESS;
crm_debug_2("Completed: %s...", fsa_pe_ref);
crm_free(fsa_pe_ref);
fsa_pe_ref = NULL;
} else {
crm_debug_2("Skipping superceeded reply from %s",
sys_from);
}
} else if(strcmp(op, CRM_OP_VOTE) == 0
|| strcmp(op, CRM_OP_HBEAT) == 0
|| strcmp(op, CRM_OP_SHUTDOWN_REQ) == 0
|| strcmp(op, CRM_OP_SHUTDOWN) == 0) {
crm_debug_2("Ignoring %s from %s in %s",
op, host_from, fsa_state2string(fsa_state));
next_input = I_NULL;
} else {
crm_err("Unexpected response (op=%s) sent to the %s",
op, AM_I_DC?"DC":"CRMd");
next_input = I_NULL;
}
return next_input;
}
enum crmd_fsa_input
handle_shutdown_request(HA_Message *stored_msg)
{
/* handle here to avoid potential version issues
* where the shutdown message/proceedure may have
* been changed in later versions.
*
* This way the DC is always in control of the shutdown
*/
time_t now = time(NULL);
crm_data_t *node_state = NULL;
const char *host_from = cl_get_string(stored_msg, F_CRM_HOST_FROM);
if(host_from == NULL) {
/* we're shutting down and the DC */
host_from = fsa_our_uname;
}
crm_info("Creating shutdown request for %s",host_from);
crm_log_message(LOG_MSG, stored_msg);
node_state = create_node_state(
host_from, NULL, NULL, NULL, NULL,
CRMD_STATE_INACTIVE, FALSE, __FUNCTION__);
crm_xml_add_int(node_state, XML_CIB_ATTR_SHUTDOWN, (int)now);
- fsa_cib_conn->cmds->update(
- fsa_cib_conn, XML_CIB_TAG_STATUS, node_state, NULL,
- cib_quorum_override);
-
- crm_log_xml_debug(node_state, "Shutdown update");
+ fsa_cib_anon_update(XML_CIB_TAG_STATUS,node_state, cib_quorum_override);
+ crm_log_xml_debug_2(node_state, "Shutdown update");
free_xml(node_state);
/* will be picked up by the TE as long as its running */
if(need_transition(fsa_state)
&& is_set(fsa_input_register, R_TE_CONNECTED) == FALSE) {
register_fsa_action(A_TE_CANCEL);
}
return I_NULL;
}
/* frees msg upon completion */
gboolean
send_msg_via_ha(ll_cluster_t *hb_fd, HA_Message *msg)
{
int log_level = LOG_DEBUG_3;
gboolean broadcast = FALSE;
gboolean all_is_good = TRUE;
const char *op = cl_get_string(msg, F_CRM_TASK);
const char *sys_to = cl_get_string(msg, F_CRM_SYS_TO);
const char *host_to = cl_get_string(msg, F_CRM_HOST_TO);
if (msg == NULL) {
crm_err("Attempt to send NULL Message via HA failed.");
all_is_good = FALSE;
} else {
crm_debug_4("Relaying message to (%s) via HA", host_to);
}
if (all_is_good) {
if (sys_to == NULL || strlen(sys_to) == 0) {
crm_err("You did not specify a destination sub-system"
" for this message.");
all_is_good = FALSE;
}
}
/* There are a number of messages may not need to be ordered.
* At a later point perhaps we should detect them and send them
* as unordered messages.
*/
if (all_is_good) {
if (host_to == NULL
|| strlen(host_to) == 0
|| safe_str_eq(sys_to, CRM_SYSTEM_DC)) {
broadcast = TRUE;
all_is_good = send_ha_message(hb_fd, msg, NULL, FALSE);
} else {
all_is_good = send_ha_message(hb_fd, msg, host_to, FALSE);
}
}
if(all_is_good == FALSE) {
log_level = LOG_WARNING;
}
if(log_level == LOG_WARNING
|| (safe_str_neq(op, CRM_OP_HBEAT))) {
do_crm_log(log_level, __FILE__, __FUNCTION__,
"Sending %sHA message (ref=%s) to %s@%s %s.",
broadcast?"broadcast ":"directed ",
cl_get_string(msg, XML_ATTR_REFERENCE),
crm_str(sys_to), host_to==NULL?"<all>":host_to,
all_is_good?"succeeded":"failed");
}
crm_msg_del(msg);
return all_is_good;
}
/* msg is deleted by the time this returns */
gboolean
send_msg_via_ipc(HA_Message *msg, const char *sys)
{
gboolean send_ok = TRUE;
IPC_Channel *client_channel;
enum crmd_fsa_input next_input;
crm_debug_4("relaying msg to sub_sys=%s via IPC", sys);
client_channel =
(IPC_Channel*)g_hash_table_lookup(ipc_clients, sys);
if(cl_get_string(msg, F_CRM_HOST_FROM) == NULL) {
ha_msg_add(msg, F_CRM_HOST_FROM, fsa_our_uname);
}
if (client_channel != NULL) {
crm_debug_3("Sending message via channel %s.", sys);
send_ok = send_ipc_message(client_channel, msg);
msg = NULL; /* so the crm_msg_del() below doesnt fail */
} else if(sys != NULL && strcmp(sys, CRM_SYSTEM_CIB) == 0) {
crm_err("Sub-system (%s) has been incorporated into the CRMd.",
sys);
crm_err("Change the way we handle this CIB message");
crm_log_message(LOG_ERR, msg);
send_ok = FALSE;
} else if(sys != NULL && strcmp(sys, CRM_SYSTEM_LRMD) == 0) {
fsa_data_t *fsa_data = NULL;
ha_msg_input_t *msg_copy = new_ha_msg_input(msg);
crm_malloc0(fsa_data, sizeof(fsa_data_t));
fsa_data->fsa_input = I_MESSAGE;
fsa_data->fsa_cause = C_IPC_MESSAGE;
fsa_data->data = msg_copy;
fsa_data->origin = __FUNCTION__;
fsa_data->data_type = fsa_dt_ha_msg;
#ifdef FSA_TRACE
crm_debug_2("Invoking action %s (%.16llx)",
fsa_action2string(A_LRM_INVOKE),
A_LRM_INVOKE);
#endif
next_input = do_lrm_invoke(A_LRM_INVOKE, C_IPC_MESSAGE,
fsa_state, I_MESSAGE, fsa_data);
delete_ha_msg_input(msg_copy);
crm_free(fsa_data);
/* todo: feed this back in for anything != I_NULL */
#ifdef FSA_TRACE
crm_debug_2("Result of action %s was %s",
fsa_action2string(A_LRM_INVOKE),
fsa_input2string(next_input));
#endif
} else {
crm_err("Unknown Sub-system (%s)... discarding message.",
crm_str(sys));
send_ok = FALSE;
}
crm_msg_del(msg);
return send_ok;
}
void
msg_queue_helper(void)
{
IPC_Channel *ipc = NULL;
if(fsa_cluster_conn != NULL) {
ipc = fsa_cluster_conn->llc_ops->ipcchan(
fsa_cluster_conn);
}
if(ipc != NULL) {
ipc->ops->resume_io(ipc);
}
/* g_hash_table_foreach_remove(ipc_clients, ipc_queue_helper, NULL); */
}
gboolean
ipc_queue_helper(gpointer key, gpointer value, gpointer user_data)
{
crmd_client_t *ipc_client = value;
if(ipc_client->client_channel != NULL) {
ipc_client->client_channel->ops->is_message_pending(ipc_client->client_channel);
}
return FALSE;
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Jul 10, 3:09 AM (14 h, 9 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2009868
Default Alt Text
(34 KB)
Attached To
Mode
rP Pacemaker
Attached
Detach File
Event Timeline
Log In to Comment