Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/crmd/callbacks.c b/crmd/callbacks.c
index 9e7dc5d305..f98d4a4145 100644
--- a/crmd/callbacks.c
+++ b/crmd/callbacks.c
@@ -1,651 +1,652 @@
/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <crm_internal.h>
#include <sys/param.h>
#include <crm/crm.h>
#include <string.h>
#include <crmd_fsa.h>
#include <heartbeat.h>
#include <crm/msg_xml.h>
#include <crm/common/xml.h>
#include <crm/common/msg.h>
#include <crm/common/cluster.h>
#include <crm/cib.h>
#include <crmd.h>
#include <crmd_messages.h>
#include <crmd_callbacks.h>
xmlNode *find_xml_in_hamessage(const xmlNode * msg);
void crmd_ha_connection_destroy(gpointer user_data);
void crmd_ha_msg_filter(xmlNode *msg);
/* From join_dc... */
extern gboolean check_join_state(
enum crmd_fsa_state cur_state, const char *source);
/* #define MAX_EMPTY_CALLBACKS 20 */
/* int empty_callbacks = 0; */
#define trigger_fsa(source) crm_debug_3("Triggering FSA: %s", __FUNCTION__); \
G_main_set_trigger(source);
#if SUPPORT_HEARTBEAT
gboolean
crmd_ha_msg_dispatch(ll_cluster_t *cluster_conn, gpointer user_data)
{
IPC_Channel *channel = NULL;
gboolean stay_connected = TRUE;
crm_debug_3("Invoked");
if(cluster_conn != NULL) {
channel = cluster_conn->llc_ops->ipcchan(cluster_conn);
}
CRM_CHECK(cluster_conn != NULL, ;);
CRM_CHECK(channel != NULL, ;);
if(channel != NULL && IPC_ISRCONN(channel)) {
if(cluster_conn->llc_ops->msgready(cluster_conn) == 0) {
crm_debug_2("no message ready yet");
}
/* invoke the callbacks but dont block */
cluster_conn->llc_ops->rcvmsg(cluster_conn, 0);
}
if (channel == NULL || channel->ch_status != IPC_CONNECT) {
if(is_set(fsa_input_register, R_HA_DISCONNECTED) == FALSE) {
crm_crit("Lost connection to heartbeat service.");
} else {
crm_info("Lost connection to heartbeat service.");
}
trigger_fsa(fsa_source);
stay_connected = FALSE;
}
return stay_connected;
}
#endif
void
crmd_ha_connection_destroy(gpointer user_data)
{
crm_debug_3("Invoked");
if(is_set(fsa_input_register, R_HA_DISCONNECTED)) {
/* we signed out, so this is expected */
crm_info("Heartbeat disconnection complete");
return;
}
crm_crit("Lost connection to heartbeat service!");
register_fsa_input(C_HA_DISCONNECT, I_ERROR, NULL);
trigger_fsa(fsa_source);
}
void
crmd_ha_msg_filter(xmlNode *msg)
{
ha_msg_input_t *new_input = NULL;
const char *from = crm_element_value(msg, F_ORIG);
const char *seq = crm_element_value(msg, F_SEQ);
const char *op = crm_element_value(msg, F_CRM_TASK);
const char *sys_to = crm_element_value(msg, F_CRM_SYS_TO);
const char *sys_from = crm_element_value(msg, F_CRM_SYS_FROM);
if(safe_str_eq(sys_to, CRM_SYSTEM_DC) && AM_I_DC == FALSE) {
crm_debug_2("Ignoring message for the DC [F_SEQ=%s]", seq);
+ free_xml(msg);
return;
} else if(safe_str_eq(sys_from, CRM_SYSTEM_DC)) {
if(AM_I_DC && safe_str_neq(from, fsa_our_uname)) {
crm_err("Another DC detected: %s (op=%s)", from, op);
/* make sure the election happens NOW */
if(fsa_state != S_ELECTION) {
new_input = new_ha_msg_input(msg);
register_fsa_error_adv(C_FSA_INTERNAL, I_ELECTION, NULL,
new_input, __FUNCTION__);
}
} else {
crm_debug_2("Processing DC message from %s [F_SEQ=%s]", from, seq);
}
}
if(new_input == NULL) {
crm_log_xml(LOG_MSG, "HA[inbound]", msg);
- new_input = new_ha_msg_input(msg);
- route_message(C_HA_MESSAGE, new_input);
+ route_message(C_HA_MESSAGE, msg);
+
+ } else {
+ free_xml(msg);
+ crm_free(new_input);
}
- crm_free(new_input);
trigger_fsa(fsa_source);
}
#if SUPPORT_HEARTBEAT
void
crmd_ha_msg_callback(HA_Message *hamsg, void* private_data)
{
int level = LOG_DEBUG;
oc_node_t *from_node = NULL;
xmlNode *msg = convert_ha_message(NULL, hamsg, __FUNCTION__);
const char *from = crm_element_value(msg, F_ORIG);
const char *op = crm_element_value(msg, F_CRM_TASK);
const char *sys_from = crm_element_value(msg, F_CRM_SYS_FROM);
- CRM_CHECK(from != NULL, crm_log_xml_err(msg, "anon"); return);
+ CRM_CHECK(from != NULL, crm_log_xml_err(msg, "anon"); goto bail);
crm_debug_2("HA[inbound]: %s from %s", op, from);
if(crm_peer_cache == NULL || crm_active_members() == 0) {
crm_debug("Ignoring HA messages until we are"
" connected to the CCM (%s op from %s)", op, from);
crm_log_xml(LOG_MSG, "HA[inbound]: Ignore (No CCM)", msg);
goto bail;
}
from_node = g_hash_table_lookup(crm_peer_cache, from);
if(from_node == NULL) {
if(safe_str_eq(op, CRM_OP_VOTE)) {
level = LOG_WARNING;
} else if(AM_I_DC && safe_str_eq(op, CRM_OP_JOIN_ANNOUNCE)) {
level = LOG_WARNING;
} else if(safe_str_eq(sys_from, CRM_SYSTEM_DC)) {
level = LOG_WARNING;
}
do_crm_log(level,
"Ignoring HA message (op=%s) from %s: not in our"
" membership list (size=%d)", op, from,
crm_active_members());
crm_log_xml(LOG_MSG, "HA[inbound]: CCM Discard", msg);
} else {
crmd_ha_msg_filter(msg);
+ return;
}
bail:
free_xml(msg);
return;
}
#endif
/*
* Apparently returning TRUE means "stay connected, keep doing stuff".
* Returning FALSE means "we're all done, close the connection"
*/
gboolean
crmd_ipc_msg_callback(IPC_Channel *client, gpointer user_data)
{
int lpc = 0;
xmlNode *msg = NULL;
- ha_msg_input_t *new_input = NULL;
crmd_client_t *curr_client = (crmd_client_t*)user_data;
gboolean stay_connected = TRUE;
crm_debug_2("Invoked: %s",
curr_client->table_key);
while(IPC_ISRCONN(client)) {
if(client->ops->is_message_pending(client) == 0) {
break;
}
msg = xmlfromIPC(client, 0);
if (msg == NULL) {
break;
}
lpc++;
- new_input = new_ha_msg_input(msg);
-
crm_debug_2("Processing msg from %s", curr_client->table_key);
- crm_log_xml(LOG_DEBUG_2, "CRMd[inbound]", new_input->msg);
- if(crmd_authorize_message(new_input, curr_client)) {
- route_message(C_IPC_MESSAGE, new_input);
+ crm_log_xml(LOG_DEBUG_2, "CRMd[inbound]", msg);
+
+ if(crmd_authorize_message(msg, curr_client)) {
+ route_message(C_IPC_MESSAGE, msg);
+ } else {
+ free_xml(msg);
}
- crm_free(new_input);
- free_xml(msg);
- new_input = NULL;
msg = NULL;
if(client->ch_status != IPC_CONNECT) {
break;
}
}
crm_debug_2("Processed %d messages", lpc);
if (client->ch_status != IPC_CONNECT) {
stay_connected = FALSE;
process_client_disconnect(curr_client);
}
trigger_fsa(fsa_source);
return stay_connected;
}
extern GCHSource *lrm_source;
gboolean
lrm_dispatch(IPC_Channel *src_not_used, gpointer user_data)
{
/* ?? src == lrm_channel ?? */
ll_lrm_t *lrm = (ll_lrm_t*)user_data;
IPC_Channel *lrm_channel = lrm->lrm_ops->ipcchan(lrm);
crm_debug_3("Invoked");
lrm->lrm_ops->rcvmsg(lrm, FALSE);
if(lrm_channel->ch_status != IPC_CONNECT) {
if(is_set(fsa_input_register, R_LRM_CONNECTED)) {
crm_crit("LRM Connection failed");
register_fsa_input(C_FSA_INTERNAL, I_ERROR, NULL);
clear_bit_inplace(fsa_input_register, R_LRM_CONNECTED);
} else {
crm_info("LRM Connection disconnected");
}
lrm_source = NULL;
return FALSE;
}
return TRUE;
}
extern gboolean process_lrm_event(lrm_op_t *op);
void
lrm_op_callback(lrm_op_t* op)
{
CRM_CHECK(op != NULL, return);
process_lrm_event(op);
}
void
crmd_ha_status_callback(const char *node, const char *status, void *private)
{
xmlNode *update = NULL;
crm_node_t *member = NULL;
crm_notice("Status update: Node %s now has status [%s] (DC=%s)",
node, status, AM_I_DC?"true":"false");
member = g_hash_table_lookup(crm_peer_cache, node);
if(member == NULL) {
/* Make sure it is created so crm_update_peer_proc() succeeds */
const char *uuid = get_uuid(node);
member = crm_update_peer(0, 0, -1, 0, uuid, node, NULL, NULL);
}
if(safe_str_eq(status, PINGSTATUS)) {
return;
}
if(safe_str_eq(status, DEADSTATUS)) {
/* this node is toast */
crm_update_peer_proc(node, crm_proc_ais, OFFLINESTATUS);
if(AM_I_DC) {
update = create_node_state(
node, DEADSTATUS, XML_BOOLEAN_NO, OFFLINESTATUS,
CRMD_JOINSTATE_DOWN, NULL, TRUE, __FUNCTION__);
}
} else {
crm_update_peer_proc(node, crm_proc_ais, ONLINESTATUS);
if(AM_I_DC) {
update = create_node_state(
node, ACTIVESTATUS, NULL, NULL, NULL, NULL,
FALSE, __FUNCTION__);
}
}
trigger_fsa(fsa_source);
if(update != NULL) {
fsa_cib_anon_update(
XML_CIB_TAG_STATUS, update, cib_scope_local|cib_quorum_override);
free_xml(update);
}
}
void
crmd_client_status_callback(const char * node, const char * client,
const char * status, void * private)
{
const char *join = NULL;
crm_node_t *member = NULL;
xmlNode *update = NULL;
gboolean clear_shutdown = FALSE;
crm_debug_3("Invoked");
if(safe_str_neq(client, CRM_SYSTEM_CRMD)) {
return;
}
if(safe_str_eq(status, JOINSTATUS)){
status = ONLINESTATUS;
clear_shutdown = TRUE;
} else if(safe_str_eq(status, LEAVESTATUS)){
status = OFFLINESTATUS;
join = CRMD_JOINSTATE_DOWN;
/* clear_shutdown = TRUE; */
}
set_bit_inplace(fsa_input_register, R_PEER_DATA);
crm_notice("Status update: Client %s/%s now has status [%s] (DC=%s)",
node, client, status, AM_I_DC?"true":"false");
if(safe_str_eq(status, ONLINESTATUS)) {
/* remove the cached value in case it changed */
crm_debug_2("Uncaching UUID for %s", node);
unget_uuid(node);
}
member = g_hash_table_lookup(crm_peer_cache, node);
if(member == NULL) {
/* Make sure it is created so crm_update_peer_proc() succeeds */
const char *uuid = get_uuid(node);
member = crm_update_peer(0, 0, -1, 0, uuid, node, NULL, NULL);
}
crm_update_peer_proc(node, crm_proc_crmd, status);
if(is_set(fsa_input_register, R_CIB_CONNECTED) == FALSE) {
return;
} else if(fsa_state == S_STOPPING) {
return;
}
if(safe_str_eq(node, fsa_our_dc) && safe_str_eq(status, OFFLINESTATUS)){
/* did our DC leave us */
crm_info("Got client status callback - our DC is dead");
register_fsa_input(C_CRMD_STATUS_CALLBACK, I_ELECTION, NULL);
} else if(AM_I_DC == FALSE) {
crm_info("Not the DC");
} else {
crm_debug_3("Got client status callback");
update = create_node_state(node, NULL, NULL, status, join,
NULL, clear_shutdown, __FUNCTION__);
if(safe_str_eq(status, ONLINESTATUS)){
crm_xml_add(update, XML_CIB_ATTR_REPLACE, XML_CIB_TAG_LRM","XML_TAG_TRANSIENT_NODEATTRS",");
}
fsa_cib_anon_update(
XML_CIB_TAG_STATUS, update, cib_scope_local|cib_quorum_override);
free_xml(update);
if(safe_str_eq(status, OFFLINESTATUS)) {
erase_node_from_join(node);
check_join_state(fsa_state, __FUNCTION__);
}
}
trigger_fsa(fsa_source);
}
void
crmd_ipc_connection_destroy(gpointer user_data)
{
GCHSource *source = NULL;
crmd_client_t *client = user_data;
/* Calling this function on an _active_ connection results in:
* crmd_ipc_connection_destroy (callbacks.c:431)
* -> G_main_del_IPC_Channel (GSource.c:478)
* -> g_source_unref
* -> G_CH_destroy_int (GSource.c:647)
* -> crmd_ipc_connection_destroy (callbacks.c:437)\
*
* A better alternative is to call G_main_del_IPC_Channel() directly
*/
if(client == NULL) {
crm_debug_4("No client to delete");
return;
}
crm_debug_2("Disconnecting client %s (%p)", client->table_key, client);
source = client->client_source;
client->client_source = NULL;
if(source != NULL) {
crm_debug_3("Deleting %s (%p) from mainloop",
client->table_key, source);
G_main_del_IPC_Channel(source);
}
crm_free(client->table_key);
crm_free(client->sub_sys);
crm_free(client->uuid);
crm_free(client);
return;
}
gboolean
crmd_client_connect(IPC_Channel *client_channel, gpointer user_data)
{
crm_debug_3("Invoked");
if (client_channel == NULL) {
crm_err("Channel was NULL");
} else if (client_channel->ch_status == IPC_DISCONNECT) {
crm_err("Channel was disconnected");
} else {
crmd_client_t *blank_client = NULL;
crm_debug_3("Channel connected");
crm_malloc0(blank_client, sizeof(crmd_client_t));
CRM_ASSERT(blank_client != NULL);
crm_debug_2("Created client: %p", blank_client);
client_channel->ops->set_recv_qlen(client_channel, 1024);
client_channel->ops->set_send_qlen(client_channel, 1024);
blank_client->client_channel = client_channel;
blank_client->sub_sys = NULL;
blank_client->uuid = NULL;
blank_client->table_key = NULL;
blank_client->client_source =
G_main_add_IPC_Channel(
G_PRIORITY_LOW, client_channel,
FALSE, crmd_ipc_msg_callback,
blank_client, crmd_ipc_connection_destroy);
}
return TRUE;
}
#if SUPPORT_HEARTBEAT
static gboolean fsa_have_quorum = FALSE;
gboolean ccm_dispatch(int fd, gpointer user_data)
{
int rc = 0;
oc_ev_t *ccm_token = (oc_ev_t*)user_data;
gboolean was_error = FALSE;
crm_debug_3("Invoked");
rc = oc_ev_handle_event(ccm_token);
if(rc != 0) {
if(is_set(fsa_input_register, R_CCM_DISCONNECTED) == FALSE) {
/* we signed out, so this is expected */
register_fsa_input(C_CCM_CALLBACK, I_ERROR, NULL);
crm_err("CCM connection appears to have failed: rc=%d.",
rc);
}
was_error = TRUE;
}
trigger_fsa(fsa_source);
return !was_error;
}
void
crmd_ccm_msg_callback(
oc_ed_t event, void *cookie, size_t size, const void *data)
{
gboolean update_cache = FALSE;
const oc_ev_membership_t *membership = data;
gboolean update_quorum = FALSE;
gboolean trigger_transition = FALSE;
crm_debug_3("Invoked");
CRM_ASSERT(data != NULL);
crm_info("Quorum %s after event=%s (id=%d)",
ccm_have_quorum(event)?"(re)attained":"lost",
ccm_event_name(event), membership->m_instance);
if(crm_peer_seq > membership->m_instance) {
crm_err("Membership instance ID went backwards! %llu->%d",
crm_peer_seq, membership->m_instance);
CRM_ASSERT(crm_peer_seq <= membership->m_instance);
return;
}
/*
* OC_EV_MS_NEW_MEMBERSHIP: membership with quorum
* OC_EV_MS_MS_INVALID: membership without quorum
* OC_EV_MS_NOT_PRIMARY: previous membership no longer valid
* OC_EV_MS_PRIMARY_RESTORED: previous membership restored
* OC_EV_MS_EVICTED: the client is evicted from ccm.
*/
switch(event) {
case OC_EV_MS_NEW_MEMBERSHIP:
case OC_EV_MS_INVALID:
update_cache = TRUE;
update_quorum = TRUE;
break;
case OC_EV_MS_NOT_PRIMARY:
break;
case OC_EV_MS_PRIMARY_RESTORED:
update_cache = TRUE;
crm_peer_seq = membership->m_instance;
if(AM_I_DC && need_transition(fsa_state)) {
trigger_transition = TRUE;
}
break;
case OC_EV_MS_EVICTED:
update_quorum = TRUE;
register_fsa_input(C_FSA_INTERNAL, I_STOP, NULL);
crm_err("Shutting down after CCM event: %s",
ccm_event_name(event));
break;
default:
crm_err("Unknown CCM event: %d", event);
}
if(update_quorum) {
crm_have_quorum = ccm_have_quorum(event);
crm_update_quorum(crm_have_quorum);
if(crm_have_quorum == FALSE) {
/* did we just loose quorum? */
if(fsa_have_quorum && need_transition(fsa_state)) {
crm_info("Quorum lost: triggering transition (%s)",
ccm_event_name(event));
trigger_transition = TRUE;
}
}
}
if(update_cache) {
crm_debug_2("Updating cache after event %s", ccm_event_name(event));
do_ccm_update_cache(C_CCM_CALLBACK, fsa_state, event, data, NULL);
} else if(event != OC_EV_MS_NOT_PRIMARY) {
crm_peer_seq = membership->m_instance;
register_fsa_action(A_TE_CANCEL);
}
oc_ev_callback_done(cookie);
return;
}
#endif
void
crmd_cib_connection_destroy(gpointer user_data)
{
crm_debug_3("Invoked");
trigger_fsa(fsa_source);
if(is_set(fsa_input_register, R_CIB_CONNECTED) == FALSE) {
crm_info("Connection to the CIB terminated...");
return;
}
/* eventually this will trigger a reconnect, not a shutdown */
crm_err("Connection to the CIB terminated...");
register_fsa_input(C_FSA_INTERNAL, I_ERROR, NULL);
clear_bit_inplace(fsa_input_register, R_CIB_CONNECTED);
return;
}
longclock_t fsa_start = 0;
longclock_t fsa_stop = 0;
longclock_t fsa_diff = 0;
gboolean
crm_fsa_trigger(gpointer user_data)
{
unsigned int fsa_diff_ms = 0;
if(fsa_diff_max_ms > 0) {
fsa_start = time_longclock();
}
crm_debug_2("Invoked (queue len: %d)", g_list_length(fsa_message_queue));
s_crmd_fsa(C_FSA_INTERNAL);
crm_debug_2("Exited (queue len: %d)", g_list_length(fsa_message_queue));
if(fsa_diff_max_ms > 0) {
fsa_stop = time_longclock();
fsa_diff = sub_longclock(fsa_stop, fsa_start);
fsa_diff_ms = longclockto_ms(fsa_diff);
if(fsa_diff_ms > fsa_diff_max_ms) {
crm_err("FSA took %dms to complete", fsa_diff_ms);
} else if(fsa_diff_ms > fsa_diff_warn_ms) {
crm_warn("FSA took %dms to complete", fsa_diff_ms);
}
}
return TRUE;
}
diff --git a/crmd/crmd_messages.h b/crmd/crmd_messages.h
index 2a512a71e7..8a93809120 100644
--- a/crmd/crmd_messages.h
+++ b/crmd/crmd_messages.h
@@ -1,114 +1,113 @@
/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef XML_CRM_MESSAGES__H
#define XML_CRM_MESSAGES__H
#include <crm/crm.h>
#include <crm/common/ipc.h>
#include <crm/common/xml.h>
#include <crmd_fsa.h>
extern void *fsa_typed_data_adv(
fsa_data_t *fsa_data, enum fsa_data_type a_type, const char *caller);
#define fsa_typed_data(x) fsa_typed_data_adv(msg_data, x, __FUNCTION__)
extern void register_fsa_error_adv(
enum crmd_fsa_cause cause, enum crmd_fsa_input input,
fsa_data_t *cur_data, void *new_data, const char *raised_from);
#define register_fsa_error(cause, input, new_data) register_fsa_error_adv(cause, input, msg_data, new_data, __FUNCTION__)
extern int register_fsa_input_adv(
enum crmd_fsa_cause cause, enum crmd_fsa_input input,
void *data, long long with_actions,
gboolean prepend, const char *raised_from);
extern void fsa_dump_queue(int log_level);
-extern void route_message(enum crmd_fsa_cause cause, ha_msg_input_t *input);
+extern void route_message(enum crmd_fsa_cause cause, xmlNode *input);
#define crmd_fsa_stall(cur_input) if(cur_input != NULL) { \
register_fsa_input_adv( \
((fsa_data_t*)cur_input)->fsa_cause, I_WAIT_FOR_EVENT, \
((fsa_data_t*)cur_input)->data, action, TRUE, __FUNCTION__); \
} else { \
register_fsa_input_adv( \
C_FSA_INTERNAL, I_WAIT_FOR_EVENT, \
NULL, action, TRUE, __FUNCTION__); \
} \
#define register_fsa_input(cause, input, data) register_fsa_input_adv(cause, input, data, A_NOTHING, FALSE, __FUNCTION__)
#define register_fsa_action(action) { \
fsa_actions |= action; \
if(fsa_source) { \
G_main_set_trigger(fsa_source); \
} \
crm_debug("%s added action %s to the FSA", \
__FUNCTION__, fsa_action2string(action)); \
}
#define register_fsa_input_before(cause, input, data) register_fsa_input_adv(cause, input, data, A_NOTHING, TRUE, __FUNCTION__)
#define register_fsa_input_later(cause, input, data) register_fsa_input_adv(cause, input, data, A_NOTHING, FALSE, __FUNCTION__)
void delete_fsa_input(fsa_data_t *fsa_data);
GListPtr put_message(fsa_data_t *new_message);
fsa_data_t *get_message(void);
gboolean is_message(void);
gboolean have_wait_message(void);
extern gboolean relay_message(
xmlNode *relay_message, gboolean originated_locally);
extern gboolean crmd_ipc_msg_callback(IPC_Channel *client, gpointer user_data);
extern void process_message(
xmlNode *msg, gboolean originated_locally,const char *src_node_name);
extern gboolean crm_dc_process_message(xmlNode *whole_message,
xmlNode *action,
const char *host_from,
const char *sys_from,
const char *sys_to,
const char *op,
gboolean dc_mode);
extern gboolean send_msg_via_ha(xmlNode *msg);
extern gboolean send_msg_via_ipc(xmlNode *msg, const char *sys);
extern gboolean add_pending_outgoing_reply(const char *originating_node_name,
const char *crm_msg_reference,
const char *sys_to,
const char *sys_from);
-extern gboolean crmd_authorize_message(
- ha_msg_input_t *client_msg, crmd_client_t *curr_client);
+extern gboolean crmd_authorize_message(xmlNode *client_msg, crmd_client_t *curr_client);
extern gboolean send_request(xmlNode *msg, char **msg_reference);
-extern enum crmd_fsa_input handle_message(ha_msg_input_t *stored_msg);
+extern enum crmd_fsa_input handle_message(xmlNode *stored_msg);
extern void lrm_op_callback(lrm_op_t* op);
extern void msg_queue_helper(void);
#endif
diff --git a/crmd/election.c b/crmd/election.c
index 6ff0249b19..c3c72da65c 100644
--- a/crmd/election.c
+++ b/crmd/election.c
@@ -1,453 +1,455 @@
/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <crm_internal.h>
#include <heartbeat.h>
#include <crm/cib.h>
#include <crm/msg_xml.h>
#include <crm/common/xml.h>
#include <crm/common/cluster.h>
#include <crm/crm.h>
#include <crmd_fsa.h>
#include <crmd_messages.h>
#include <crmd_callbacks.h>
#include <clplumbing/Gmain_timeout.h>
#include <clplumbing/cl_uuid.h>
#include <ha_version.h>
GHashTable *voted = NULL;
uint highest_born_on = -1;
static int current_election_id = 1;
const char *get_hg_version(void);
const char *get_hg_version(void)
{
/* limit this #define's use to a single file to avoid rebuilding more than necessary */
return HA_HG_VERSION;
}
/* A_ELECTION_VOTE */
void
do_election_vote(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input,
fsa_data_t *msg_data)
{
gboolean not_voting = FALSE;
xmlNode *vote = NULL;
/* don't vote if we're in one of these states or wanting to shut down */
switch(cur_state) {
case S_RECOVERY:
case S_STOPPING:
case S_TERMINATE:
crm_warn("Not voting in election, we're in state %s",
fsa_state2string(cur_state));
not_voting = TRUE;
break;
default:
break;
}
if(not_voting == FALSE) {
if(is_set(fsa_input_register, R_STARTING)) {
not_voting = TRUE;
}
}
if(not_voting) {
if(AM_I_DC) {
register_fsa_input(C_FSA_INTERNAL, I_RELEASE_DC, NULL);
} else {
register_fsa_input(C_FSA_INTERNAL, I_PENDING, NULL);
}
return;
}
vote = create_request(
CRM_OP_VOTE, NULL, NULL,
CRM_SYSTEM_CRMD, CRM_SYSTEM_CRMD, NULL);
current_election_id++;
crm_xml_add(vote, F_CRM_ELECTION_OWNER, fsa_our_uuid);
crm_xml_add_int(vote, F_CRM_ELECTION_ID, current_election_id);
send_request(vote, NULL);
crm_debug("Destroying voted hash");
g_hash_table_destroy(voted);
voted = NULL;
if(cur_state == S_ELECTION || cur_state == S_RELEASE_DC) {
crm_timer_start(election_timeout);
} else if(cur_state != S_INTEGRATION) {
crm_err("Broken? Voting in state %s",
fsa_state2string(cur_state));
}
return;
}
char *dc_hb_msg = NULL;
int beat_num = 0;
gboolean
do_dc_heartbeat(gpointer data)
{
#if 0
fsa_timer_t *timer = (fsa_timer_t *)data;
crm_debug_3("Sending DC Heartbeat %d", beat_num);
xmlNode *msg = ha_msg_new(5);
crm_xml_add(msg, F_TYPE, T_CRM);
crm_xml_add(msg, F_SUBTYPE, XML_ATTR_REQUEST);
crm_xml_add(msg, F_CRM_SYS_TO, CRM_SYSTEM_CRMD);
crm_xml_add(msg, F_CRM_SYS_FROM, CRM_SYSTEM_DC);
crm_xml_add(msg, F_CRM_TASK, CRM_OP_HBEAT);
crm_xml_add_int(msg, "dc_beat_seq", beat_num);
beat_num++;
if(send_msg_via_ha(msg) == FALSE) {
/* this is bad */
crm_timer_stop(timer); /* make it not go off again */
register_fsa_input(C_HEARTBEAT_FAILED, I_SHUTDOWN, NULL);
return FALSE;
}
#endif
return TRUE;
}
struct election_data_s
{
const char *winning_uname;
unsigned int winning_bornon;
};
static void
log_member_uname(gpointer key, gpointer value, gpointer user_data)
{
if(crm_is_member_active(value)) {
crm_err("%s: %s", (char*)user_data, (char*)key);
}
}
static void
log_node(gpointer key, gpointer value, gpointer user_data)
{
crm_err("%s: %s", (char*)user_data, (char*)key);
}
void
do_election_check(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input,
fsa_data_t *msg_data)
{
int voted_size = g_hash_table_size(voted);
int num_members = crm_active_members();
/* in the case of #voted > #members, it is better to
* wait for the timeout and give the cluster time to
* stabilize
*/
if(fsa_state != S_ELECTION) {
crm_debug("Ignore election check: we not in an election");
} else if(voted_size >= num_members) {
/* we won and everyone has voted */
crm_timer_stop(election_timeout);
register_fsa_input(C_FSA_INTERNAL, I_ELECTION_DC, NULL);
if(voted_size > num_members) {
char *data = NULL;
data = crm_strdup("member");
g_hash_table_foreach(crm_peer_cache, log_member_uname, data);
crm_free(data);
data = crm_strdup("voted");
g_hash_table_foreach(voted, log_node, data);
crm_free(data);
}
crm_debug("Destroying voted hash");
g_hash_table_destroy(voted);
voted = NULL;
} else {
crm_info("Still waiting on %d non-votes (%d total)",
num_members - voted_size, num_members);
}
return;
}
/* A_ELECTION_COUNT */
void
do_election_count_vote(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input,
fsa_data_t *msg_data)
{
int election_id = -1;
gboolean we_loose = FALSE;
static time_t last_election_loss = 0;
enum crmd_fsa_input election_result = I_NULL;
crm_node_t *our_node = NULL, *your_node = NULL;
ha_msg_input_t *vote = fsa_typed_data(fsa_dt_ha_msg);
const char *op = crm_element_value(vote->msg, F_CRM_TASK);
const char *vote_from = crm_element_value(vote->msg, F_CRM_HOST_FROM);
const char *your_version = crm_element_value(vote->msg, F_CRM_VERSION);
const char *election_owner= crm_element_value(vote->msg, F_CRM_ELECTION_OWNER);
/* if the membership copy is NULL we REALLY shouldnt be voting
* the question is how we managed to get here.
*/
+
+ CRM_CHECK(vote->msg != NULL, crm_err("Bogus data from %s", msg_data->origin); return);
CRM_CHECK(crm_peer_cache != NULL, return);
CRM_CHECK(vote_from != NULL, vote_from = fsa_our_uname);
our_node = g_hash_table_lookup(crm_peer_cache, fsa_our_uname);
your_node = g_hash_table_lookup(crm_peer_cache, vote_from);
if(your_node == NULL) {
crm_debug("Election ignore: The other side doesn't exist in CCM: %s", vote_from);
return;
}
if(voted == NULL) {
crm_debug("Created voted hash");
voted = g_hash_table_new_full(
g_str_hash, g_str_equal,
g_hash_destroy_str, g_hash_destroy_str);
}
crm_element_value_int(vote->msg, F_CRM_ELECTION_ID, &election_id);
crm_debug("Election %d, owner: %s", election_id, election_owner);
/* update the list of nodes that have voted */
if(crm_str_eq(fsa_our_uuid, election_owner, TRUE)
|| crm_str_eq(fsa_our_uname, election_owner, TRUE)) {
if(election_id == current_election_id) {
char *uname_copy = NULL;
char *op_copy = crm_strdup(op);
uname_copy = crm_strdup(your_node->uname);
g_hash_table_replace(voted, uname_copy, op_copy);
crm_info("Updated voted hash for %s to %s",
your_node->uname, op);
} else {
crm_debug("Ignore old '%s' from %s: %d vs. %d",
op, your_node->uname,
election_id, current_election_id);
return;
}
} else {
CRM_CHECK(safe_str_neq(op, CRM_OP_NOVOTE), return);
}
if(vote_from == NULL || crm_str_eq(vote_from, fsa_our_uname, TRUE)) {
/* don't count our own vote */
crm_info("Election ignore: our %s (%s)", op,crm_str(vote_from));
return;
} else if(crm_str_eq(op, CRM_OP_NOVOTE, TRUE)) {
crm_info("Election ignore: no-vote from %s", vote_from);
return;
}
crm_info("Election check: %s from %s", op, vote_from);
if(our_node == NULL || safe_str_neq(our_node->state, CRM_NODE_MEMBER)) {
crm_info("Election fail: we don't exist in CCM");
we_loose = TRUE;
} else if(compare_version(your_version, CRM_FEATURE_SET) < 0) {
crm_info("Election fail: version");
we_loose = TRUE;
} else if(compare_version(your_version, CRM_FEATURE_SET) > 0) {
crm_info("Election pass: version");
} else if(is_heartbeat_cluster() && your_node->born < our_node->born) {
crm_debug("Election fail: born_on");
we_loose = TRUE;
} else if(is_heartbeat_cluster() && your_node->born > our_node->born) {
crm_debug("Election pass: born_on");
} else if(fsa_our_uname == NULL
|| strcasecmp(fsa_our_uname, vote_from) > 0) {
crm_debug("Election fail: uname");
we_loose = TRUE;
} else {
CRM_CHECK(strcasecmp(fsa_our_uname, vote_from) != 0, ;);
crm_debug("Them: %s (born=%llu) Us: %s (born=%llu)",
vote_from, (unsigned long long)your_node->born,
fsa_our_uname, (unsigned long long)our_node->born);
/* cant happen...
* } else if(strcasecmp(fsa_our_uname, vote_from) == 0) {
*
* default...
* } else { // strcasecmp(fsa_our_uname, vote_from) < 0
* we win
*/
}
if(we_loose) {
gboolean vote_sent = FALSE;
xmlNode *novote = create_request(
CRM_OP_NOVOTE, NULL, vote_from,
CRM_SYSTEM_CRMD, CRM_SYSTEM_CRMD, NULL);
update_dc(NULL, FALSE);
crm_timer_stop(election_timeout);
crm_debug("Election lost to %s (%d)", vote_from, election_id);
if(fsa_input_register & R_THE_DC) {
crm_debug_3("Give up the DC to %s", vote_from);
election_result = I_RELEASE_DC;
} else {
crm_debug_3("We werent the DC anyway");
election_result = I_PENDING;
}
crm_xml_add(novote, F_CRM_ELECTION_OWNER, election_owner);
crm_xml_add_int(novote, F_CRM_ELECTION_ID, election_id);
vote_sent = send_request(novote, NULL);
CRM_DEV_ASSERT(vote_sent);
fsa_cib_conn->cmds->set_slave(fsa_cib_conn, cib_scope_local);
last_election_loss = time(NULL);
} else {
int dampen = 2;
time_t tm_now = time(NULL);
if(tm_now - last_election_loss < (time_t)dampen) {
crm_debug("Election ignore: We already lost an election less than %ds ago", dampen);
return;
}
last_election_loss = 0;
election_result = I_ELECTION;
crm_info("Election won over %s", vote_from);
g_hash_table_destroy(voted);
voted = NULL;
}
register_fsa_input(C_FSA_INTERNAL, election_result, NULL);
}
/* A_ELECT_TIMER_START, A_ELECTION_TIMEOUT */
/* we won */
void
do_election_timer_ctrl(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input,
fsa_data_t *msg_data)
{
}
static void
feature_update_callback(xmlNode *msg, int call_id, int rc,
xmlNode *output, void *user_data)
{
if(rc != cib_ok) {
fsa_data_t *msg_data = NULL;
register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL);
}
}
/* A_DC_TAKEOVER */
void
do_dc_takeover(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input,
fsa_data_t *msg_data)
{
int rc = cib_ok;
xmlNode *cib = NULL;
crm_info("Taking over DC status for this partition");
set_bit_inplace(fsa_input_register, R_THE_DC);
if(voted != NULL) {
crm_debug_2("Destroying voted hash");
g_hash_table_destroy(voted);
voted = NULL;
}
set_bit_inplace(fsa_input_register, R_JOIN_OK);
set_bit_inplace(fsa_input_register, R_INVOKE_PE);
fsa_cib_conn->cmds->set_slave_all(fsa_cib_conn, cib_none);
fsa_cib_conn->cmds->set_master(fsa_cib_conn, cib_none);
cib = createEmptyCib();
crm_xml_add(cib, XML_ATTR_CRM_VERSION, CRM_FEATURE_SET);
crm_xml_add(cib, XML_ATTR_CIB_REVISION, CIB_FEATURE_SET);
fsa_cib_update(XML_TAG_CIB, cib, cib_quorum_override, rc);
add_cib_op_callback(rc, FALSE, NULL, feature_update_callback);
update_attr(fsa_cib_conn, cib_none, XML_CIB_TAG_CRMCONFIG,
NULL, NULL, NULL, "dc-version", VERSION"-"HA_HG_VERSION, FALSE);
free_xml(cib);
}
/* A_DC_RELEASE */
void
do_dc_release(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input,
fsa_data_t *msg_data)
{
if(action & A_DC_RELEASE) {
crm_debug("Releasing the role of DC");
clear_bit_inplace(fsa_input_register, R_THE_DC);
} else if (action & A_DC_RELEASED) {
crm_info("DC role released");
#if 0
if( are there errors ) {
/* we cant stay up if not healthy */
/* or perhaps I_ERROR and go to S_RECOVER? */
result = I_SHUTDOWN;
}
#endif
register_fsa_input(C_FSA_INTERNAL, I_RELEASE_SUCCESS, NULL);
} else {
crm_err("Unknown action %s", fsa_action2string(action));
}
crm_debug_2("Am I still the DC? %s", AM_I_DC?XML_BOOLEAN_YES:XML_BOOLEAN_NO);
}
diff --git a/crmd/messages.c b/crmd/messages.c
index 258f8b335f..76e1005909 100644
--- a/crmd/messages.c
+++ b/crmd/messages.c
@@ -1,1203 +1,1216 @@
/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <crm_internal.h>
#include <sys/param.h>
#include <crm/crm.h>
#include <string.h>
#include <time.h>
#include <crmd_fsa.h>
#include <lrm/lrm_api.h>
#include <crm/msg_xml.h>
#include <crm/common/xml.h>
#include <crm/common/msg.h>
#include <crm/common/cluster.h>
#include <crm/cib.h>
#include <crmd.h>
#include <crmd_messages.h>
#include <crmd_lrm.h>
GListPtr fsa_message_queue = NULL;
extern void crm_shutdown(int nsig);
-enum crmd_fsa_input handle_request(ha_msg_input_t *stored_msg);
-enum crmd_fsa_input handle_response(ha_msg_input_t *stored_msg);
+enum crmd_fsa_input handle_request(xmlNode *stored_msg);
+enum crmd_fsa_input handle_response(xmlNode *stored_msg);
enum crmd_fsa_input handle_shutdown_request(xmlNode *stored_msg);
ha_msg_input_t *copy_ha_msg_input(ha_msg_input_t *orig);
gboolean ipc_queue_helper(gpointer key, gpointer value, gpointer user_data);
#ifdef MSG_LOG
# define ROUTER_RESULT(x) crm_debug_3("Router result: %s", x); \
crm_log_xml(LOG_MSG, "router.log", relay_message);
#else
# define ROUTER_RESULT(x) crm_debug_3("Router result: %s", x)
#endif
/* debug only, can wrap all it likes */
int last_data_id = 0;
void
register_fsa_error_adv(
enum crmd_fsa_cause cause, enum crmd_fsa_input input,
fsa_data_t *cur_data, void *new_data, const char *raised_from)
{
/* save the current actions if any */
if(fsa_actions != A_NOTHING) {
register_fsa_input_adv(
cur_data?cur_data->fsa_cause:C_FSA_INTERNAL,
I_NULL, cur_data?cur_data->data:NULL,
fsa_actions, TRUE, __FUNCTION__);
}
/* reset the action list */
fsa_actions = A_NOTHING;
/* register the error */
register_fsa_input_adv(
cause, input, new_data, A_NOTHING, TRUE, raised_from);
}
static gboolean last_was_vote = FALSE;
int
register_fsa_input_adv(
enum crmd_fsa_cause cause, enum crmd_fsa_input input,
void *data, long long with_actions,
gboolean prepend, const char *raised_from)
{
unsigned old_len = g_list_length(fsa_message_queue);
fsa_data_t *fsa_data = NULL;
last_data_id++;
CRM_CHECK(raised_from != NULL, raised_from = "<unknown>");
crm_debug("%s %s FSA input %d (%s) (cause=%s) %s data",
raised_from, prepend?"prepended":"appended",last_data_id, fsa_input2string(input),
fsa_cause2string(cause), data?"with":"without");
if(input == I_WAIT_FOR_EVENT) {
do_fsa_stall = TRUE;
crm_debug("Stalling the FSA pending further input: cause=%s",
fsa_cause2string(cause));
if(old_len > 0) {
crm_warn("%s stalled the FSA with pending inputs",
raised_from);
fsa_dump_queue(LOG_DEBUG);
}
if(data == NULL) {
set_bit_inplace(fsa_actions, with_actions);
with_actions = A_NOTHING;
return 0;
}
crm_err("%s stalled the FSA with data - this may be broken",
raised_from);
}
if(old_len == 0) {
last_was_vote = FALSE;
}
if(input == I_NULL && with_actions == A_NOTHING /* && data == NULL */){
/* no point doing anything */
crm_err("Cannot add entry to queue: no input and no action");
return 0;
} else if(data == NULL) {
last_was_vote = FALSE;
#if 0
} else if(last_was_vote && cause == C_HA_MESSAGE && input == I_ROUTER) {
const char *op = crm_element_value(
((ha_msg_input_t*)data)->msg, F_CRM_TASK);
if(safe_str_eq(op, CRM_OP_VOTE)) {
/* It is always safe to treat N successive votes as
* a single one
*
* If all the discarded votes are more "loosing" than
* the first then the result is accurate
* (win or loose).
*
* If any of the discarded votes are less "loosing"
* than the first then we will cast our vote and the
* eventual winner will vote us down again (which
* even in the case that N=2, is no worse than if we
* had not disarded the vote).
*/
crm_debug_2("Vote compression: %d", old_len);
return 0;
}
#endif
} else if (cause == C_HA_MESSAGE && input == I_ROUTER) {
const char *op = crm_element_value(
((ha_msg_input_t*)data)->msg, F_CRM_TASK);
if(safe_str_eq(op, CRM_OP_VOTE)) {
last_was_vote = TRUE;
crm_debug_3("Added vote: %d", old_len);
}
} else {
last_was_vote = FALSE;
}
crm_malloc0(fsa_data, sizeof(fsa_data_t));
fsa_data->id = last_data_id;
fsa_data->fsa_input = input;
fsa_data->fsa_cause = cause;
fsa_data->origin = raised_from;
fsa_data->data = NULL;
fsa_data->data_type = fsa_dt_none;
fsa_data->actions = with_actions;
if(with_actions != A_NOTHING) {
crm_debug_3("Adding actions %.16llx to input", with_actions);
}
if(data != NULL) {
switch(cause) {
case C_FSA_INTERNAL:
case C_CRMD_STATUS_CALLBACK:
case C_IPC_MESSAGE:
case C_HA_MESSAGE:
crm_debug_3("Copying %s data from %s as a HA msg",
fsa_cause2string(cause),
raised_from);
+ CRM_CHECK(((ha_msg_input_t*)data)->msg != NULL,
+ crm_err("Bogus data from %s", raised_from));
fsa_data->data = copy_ha_msg_input(data);
fsa_data->data_type = fsa_dt_ha_msg;
break;
case C_LRM_OP_CALLBACK:
crm_debug_3("Copying %s data from %s as lrm_op_t",
fsa_cause2string(cause),
raised_from);
fsa_data->data = copy_lrm_op((lrm_op_t*)data);
fsa_data->data_type = fsa_dt_lrm;
break;
case C_CCM_CALLBACK:
case C_SUBSYSTEM_CONNECT:
case C_LRM_MONITOR_CALLBACK:
case C_TIMER_POPPED:
case C_SHUTDOWN:
case C_HEARTBEAT_FAILED:
case C_HA_DISCONNECT:
case C_ILLEGAL:
case C_UNKNOWN:
case C_STARTUP:
crm_err("Copying %s data (from %s)"
" not yet implemented",
fsa_cause2string(cause), raised_from);
exit(1);
break;
}
crm_debug_4("%s data copied",
fsa_cause2string(fsa_data->fsa_cause));
}
/* make sure to free it properly later */
if(prepend) {
crm_debug_2("Prepending input");
fsa_message_queue = g_list_prepend(fsa_message_queue, fsa_data);
} else {
fsa_message_queue = g_list_append(fsa_message_queue, fsa_data);
}
crm_debug_2("Queue len: %d", g_list_length(fsa_message_queue));
fsa_dump_queue(LOG_DEBUG_2);
if(old_len == g_list_length(fsa_message_queue)){
crm_err("Couldnt add message to the queue");
}
if(fsa_source) {
crm_debug_3("Triggering FSA: %s", __FUNCTION__);
G_main_set_trigger(fsa_source);
}
return last_data_id;
}
void
fsa_dump_queue(int log_level)
{
if(log_level < (int)crm_log_level) {
return;
}
slist_iter(
data, fsa_data_t, fsa_message_queue, lpc,
do_crm_log(log_level,
"queue[%d(%d)]: input %s raised by %s()\t(cause=%s)",
lpc, data->id, fsa_input2string(data->fsa_input),
data->origin, fsa_cause2string(data->fsa_cause));
);
}
ha_msg_input_t *
copy_ha_msg_input(ha_msg_input_t *orig)
{
+ ha_msg_input_t *copy = NULL;
xmlNodePtr data = NULL;
if(orig != NULL) {
crm_debug_4("Copy msg");
data = copy_xml(orig->msg);
} else {
crm_debug_3("No message to copy");
}
- return new_ha_msg_input(data);
+ copy = new_ha_msg_input(data);
+ if(orig->msg != NULL) {
+ CRM_CHECK(copy->msg != NULL, crm_err("copy failed"));
+ }
+ return copy;
}
void
delete_fsa_input(fsa_data_t *fsa_data)
{
lrm_op_t *op = NULL;
xmlNode *foo = NULL;
if(fsa_data == NULL) {
return;
}
crm_debug_4("About to free %s data",
fsa_cause2string(fsa_data->fsa_cause));
if(fsa_data->data != NULL) {
switch(fsa_data->data_type) {
case fsa_dt_ha_msg:
delete_ha_msg_input(fsa_data->data);
break;
case fsa_dt_xml:
foo = fsa_data->data;
free_xml(foo);
break;
case fsa_dt_lrm:
op = (lrm_op_t*)fsa_data->data;
-
free_lrm_op(op);
-
break;
case fsa_dt_none:
if(fsa_data->data != NULL) {
crm_err("Dont know how to free %s data from %s",
fsa_cause2string(fsa_data->fsa_cause),
fsa_data->origin);
exit(1);
}
break;
}
crm_debug_4("%s data freed",
fsa_cause2string(fsa_data->fsa_cause));
}
crm_free(fsa_data);
}
/* returns the next message */
fsa_data_t *
get_message(void)
{
fsa_data_t* message = g_list_nth_data(fsa_message_queue, 0);
fsa_message_queue = g_list_remove(fsa_message_queue, message);
crm_debug_2("Processing input %d", message->id);
return message;
}
/* returns the current head of the FIFO queue */
gboolean
is_message(void)
{
return (g_list_length(fsa_message_queue) > 0);
}
void *
fsa_typed_data_adv(
fsa_data_t *fsa_data, enum fsa_data_type a_type, const char *caller)
{
void *ret_val = NULL;
if(fsa_data == NULL) {
do_crm_log(LOG_ERR, "%s: No FSA data available", caller);
} else if(fsa_data->data == NULL) {
do_crm_log(LOG_ERR, "%s: No message data available", caller);
} else if(fsa_data->data_type != a_type) {
do_crm_log(LOG_CRIT,
"%s: Message data was the wrong type! %d vs. requested=%d."
" Origin: %s", caller,
fsa_data->data_type, a_type, fsa_data->origin);
CRM_ASSERT(fsa_data->data_type == a_type);
} else {
ret_val = fsa_data->data;
}
return ret_val;
}
/* A_MSG_ROUTE */
void
do_msg_route(long long action,
enum crmd_fsa_cause cause,
enum crmd_fsa_state cur_state,
enum crmd_fsa_input current_input,
fsa_data_t *msg_data)
{
ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg);
- route_message(msg_data->fsa_cause, input);
+ route_message(msg_data->fsa_cause, input->msg);
}
void
-route_message(enum crmd_fsa_cause cause, ha_msg_input_t *input)
+route_message(enum crmd_fsa_cause cause, xmlNode *input)
{
+ ha_msg_input_t fsa_input;
enum crmd_fsa_input result = I_NULL;
+ fsa_input.msg = input;
CRM_CHECK(cause == C_IPC_MESSAGE || cause == C_HA_MESSAGE, return);
/* try passing the buck first */
crm_debug_4("Attempting to route message");
- if(relay_message(input->msg, cause==C_IPC_MESSAGE)) {
+ if(relay_message(input, cause==C_IPC_MESSAGE)) {
crm_debug_4("Message routed...");
- input->msg = NULL;
return;
}
crm_debug_4("Message wasn't routed... try handling locally");
/* calculate defer */
result = handle_message(input);
switch(result) {
case I_NULL:
crm_debug_4("Message processed");
break;
case I_CIB_OP:
break;
case I_ROUTER:
break;
case I_NODE_JOIN:
case I_JOIN_REQUEST:
case I_JOIN_RESULT:
break;
default:
crm_debug_4("Defering local processing of message");
- register_fsa_input_later(cause, result, input);
+ register_fsa_input_later(cause, result, &fsa_input);
result = I_NULL;
break;
}
if(result != I_NULL) {
/* add to the front of the queue */
- register_fsa_input(cause, result, input);
+ register_fsa_input(cause, result, &fsa_input);
}
+
+ free_xml(input);
}
/*
* This method frees msg
*/
gboolean
send_request(xmlNode *msg, char **msg_reference)
{
gboolean was_sent = FALSE;
/* crm_log_xml_debug_3(request, "Final request..."); */
if(msg_reference != NULL) {
*msg_reference = crm_strdup(
crm_element_value(msg, XML_ATTR_REFERENCE));
}
was_sent = relay_message(msg, TRUE);
if(was_sent == FALSE) {
ha_msg_input_t *fsa_input = new_ha_msg_input(msg);
register_fsa_input(C_IPC_MESSAGE, I_ROUTER, fsa_input);
crm_free(fsa_input);
free_xml(msg);
}
return was_sent;
}
/* unless more processing is required, relay_message is freed */
gboolean
relay_message(xmlNode *relay_message, gboolean originated_locally)
{
int is_for_dc = 0;
int is_for_dcib = 0;
int is_for_te = 0;
int is_for_crm = 0;
int is_for_cib = 0;
int is_local = 0;
gboolean processing_complete = FALSE;
const char *host_to = crm_element_value(relay_message, F_CRM_HOST_TO);
const char *sys_to = crm_element_value(relay_message, F_CRM_SYS_TO);
const char *sys_from= crm_element_value(relay_message, F_CRM_SYS_FROM);
const char *type = crm_element_value(relay_message, F_TYPE);
const char *msg_error = NULL;
crm_debug_3("Routing message %s",
crm_element_value(relay_message, XML_ATTR_REFERENCE));
if(relay_message == NULL) {
msg_error = "Cannot route empty message";
} else if(safe_str_eq(CRM_OP_HELLO,
crm_element_value(relay_message, F_CRM_TASK))){
/* quietly ignore */
processing_complete = TRUE;
} else if(safe_str_neq(type, T_CRM)) {
msg_error = "Bad message type";
} else if(sys_to == NULL) {
msg_error = "Bad message destination: no subsystem";
}
if(msg_error != NULL) {
processing_complete = TRUE;
crm_err("%s", msg_error);
crm_log_xml(LOG_WARNING, "bad msg", relay_message);
}
if(processing_complete) {
free_xml(relay_message);
return TRUE;
}
processing_complete = TRUE;
is_for_dc = (strcasecmp(CRM_SYSTEM_DC, sys_to) == 0);
is_for_dcib = (strcasecmp(CRM_SYSTEM_DCIB, sys_to) == 0);
is_for_te = (strcasecmp(CRM_SYSTEM_TENGINE, sys_to) == 0);
is_for_cib = (strcasecmp(CRM_SYSTEM_CIB, sys_to) == 0);
is_for_crm = (strcasecmp(CRM_SYSTEM_CRMD, sys_to) == 0);
is_local = 0;
if(host_to == NULL || strlen(host_to) == 0) {
if(is_for_dc || is_for_te) {
is_local = 0;
} else if(is_for_crm && originated_locally) {
is_local = 0;
} else {
is_local = 1;
}
} else if(safe_str_eq(fsa_our_uname, host_to)) {
is_local=1;
}
if(is_for_dc || is_for_dcib || is_for_te) {
if(AM_I_DC && is_for_te) {
ROUTER_RESULT("Message result: Local relay");
send_msg_via_ipc(relay_message, sys_to);
} else if(AM_I_DC) {
ROUTER_RESULT("Message result: DC/CRMd process");
processing_complete = FALSE; /* more to be done by caller */
} else if(originated_locally
&& safe_str_neq(sys_from, CRM_SYSTEM_PENGINE)
&& safe_str_neq(sys_from, CRM_SYSTEM_TENGINE)) {
/* Neither the TE or PE should be sending messages
* to DC's on other nodes
*
* By definition, if we are no longer the DC, then
* the PE or TE's data should be discarded
*/
ROUTER_RESULT("Message result: External relay to DC");
send_msg_via_ha(relay_message);
} else {
/* discard */
ROUTER_RESULT("Message result: Discard, not DC");
free_xml(relay_message);
}
} else if(is_local && (is_for_crm || is_for_cib)) {
ROUTER_RESULT("Message result: CRMd process");
processing_complete = FALSE; /* more to be done by caller */
} else if(is_local) {
ROUTER_RESULT("Message result: Local relay");
send_msg_via_ipc(relay_message, sys_to);
} else {
ROUTER_RESULT("Message result: External relay");
send_msg_via_ha(relay_message);
}
return processing_complete;
}
gboolean
-crmd_authorize_message(ha_msg_input_t *client_msg, crmd_client_t *curr_client)
+crmd_authorize_message(xmlNode *client_msg, crmd_client_t *curr_client)
{
/* check the best case first */
- const char *sys_from = crm_element_value(client_msg->msg, F_CRM_SYS_FROM);
+ const char *sys_from = crm_element_value(client_msg, F_CRM_SYS_FROM);
char *uuid = NULL;
char *client_name = NULL;
char *major_version = NULL;
char *minor_version = NULL;
const char *filtered_from;
gpointer table_key = NULL;
gboolean auth_result = FALSE;
struct crm_subsystem_s *the_subsystem = NULL;
gboolean can_reply = FALSE; /* no-one has registered with this id */
- const char *op = crm_element_value(client_msg->msg, F_CRM_TASK);
+ xmlNode *xml = NULL;
+ const char *op = crm_element_value(client_msg, F_CRM_TASK);
if (safe_str_neq(CRM_OP_HELLO, op)) {
if(sys_from == NULL) {
crm_warn("Message [%s] was had no value for %s... discarding",
- crm_element_value(client_msg->msg, XML_ATTR_REFERENCE),
+ crm_element_value(client_msg, XML_ATTR_REFERENCE),
F_CRM_SYS_FROM);
return FALSE;
}
filtered_from = sys_from;
/* The CIB can have two names on the DC */
if(strcasecmp(sys_from, CRM_SYSTEM_DCIB) == 0)
filtered_from = CRM_SYSTEM_CIB;
if (g_hash_table_lookup (ipc_clients, filtered_from) != NULL) {
can_reply = TRUE; /* reply can be routed */
}
crm_debug_2("Message reply can%s be routed from %s.",
can_reply?"":" not", sys_from);
if(can_reply == FALSE) {
crm_warn("Message [%s] not authorized",
- crm_element_value(client_msg->msg, XML_ATTR_REFERENCE));
+ crm_element_value(client_msg, XML_ATTR_REFERENCE));
}
return can_reply;
}
crm_debug_3("received client join msg");
- crm_log_xml(LOG_MSG, "join", client_msg->msg);
+ crm_log_xml(LOG_MSG, "join", client_msg);
+ xml = get_message_xml(client_msg, F_CRM_DATA);
auth_result = process_hello_message(
- client_msg->xml, &uuid, &client_name,
+ xml, &uuid, &client_name,
&major_version, &minor_version);
if (auth_result == TRUE) {
if(client_name == NULL || uuid == NULL) {
crm_err("Bad client details (client_name=%s, uuid=%s)",
crm_str(client_name), crm_str(uuid));
auth_result = FALSE;
}
}
if (auth_result == TRUE) {
/* check version */
int mav = atoi(major_version);
int miv = atoi(minor_version);
crm_debug_3("Checking client version number");
if (mav < 0 || miv < 0) {
crm_err("Client version (%d:%d) is not acceptable",
mav, miv);
auth_result = FALSE;
}
crm_free(major_version);
crm_free(minor_version);
}
-
if (safe_str_eq(CRM_SYSTEM_PENGINE, client_name)) {
the_subsystem = pe_subsystem;
} else if (safe_str_eq(CRM_SYSTEM_TENGINE, client_name)) {
the_subsystem = te_subsystem;
}
if (auth_result == TRUE && the_subsystem != NULL) {
/* if we already have one of those clients
* only applies to te, pe etc. not admin clients
*/
crm_debug_3("Checking if %s is required/already connected",
client_name);
table_key = (gpointer)crm_strdup(client_name);
if(is_set(fsa_input_register, the_subsystem->flag_connected)) {
auth_result = FALSE;
crm_free(table_key);
table_key = NULL;
crm_warn("Bit\t%.16llx set in %.16llx",
the_subsystem->flag_connected,
fsa_input_register);
crm_err("Client %s is already connected",
client_name);
} else if(FALSE == is_set(fsa_input_register,
the_subsystem->flag_required)) {
crm_warn("Bit\t%.16llx not set in %.16llx",
the_subsystem->flag_connected,
fsa_input_register);
crm_warn("Client %s joined but we dont need it",
client_name);
stop_subsystem(the_subsystem, TRUE);
} else {
the_subsystem->ipc = curr_client->client_channel;
set_bit_inplace(fsa_input_register,
the_subsystem->flag_connected);
}
} else {
table_key = (gpointer)generate_hash_key(client_name, uuid);
}
if (auth_result == TRUE) {
crm_debug_2("Accepted client %s", crm_str(table_key));
curr_client->table_key = table_key;
curr_client->sub_sys = crm_strdup(client_name);
curr_client->uuid = crm_strdup(uuid);
g_hash_table_insert (ipc_clients,
table_key, curr_client->client_channel);
send_hello_message(curr_client->client_channel,
"n/a", CRM_SYSTEM_CRMD,
"0", "1");
crm_debug_3("Updated client list with %s", crm_str(table_key));
crm_debug_3("Triggering FSA: %s", __FUNCTION__);
G_main_set_trigger(fsa_source);
if(the_subsystem != NULL) {
CRM_CHECK(the_subsystem->client == NULL,
process_client_disconnect(the_subsystem->client));
the_subsystem->client = curr_client;
}
} else {
crm_free(table_key);
crm_warn("Rejected client logon request");
curr_client->client_channel->ch_status = IPC_DISC_PENDING;
}
if(uuid != NULL) crm_free(uuid);
if(minor_version != NULL) crm_free(minor_version);
if(major_version != NULL) crm_free(major_version);
if(client_name != NULL) crm_free(client_name);
/* hello messages should never be processed further */
return FALSE;
}
enum crmd_fsa_input
-handle_message(ha_msg_input_t *stored_msg)
+handle_message(xmlNode *stored_msg)
{
enum crmd_fsa_input next_input = I_NULL;
const char *type = NULL;
- if(stored_msg == NULL || stored_msg->msg == NULL) {
+ if(stored_msg == NULL) {
crm_err("No message to handle");
return I_NULL;
}
- type = crm_element_value(stored_msg->msg, F_CRM_MSG_TYPE);
+ type = crm_element_value(stored_msg, F_CRM_MSG_TYPE);
if(safe_str_eq(type, XML_ATTR_REQUEST)) {
next_input = handle_request(stored_msg);
} else if(safe_str_eq(type, XML_ATTR_RESPONSE)) {
next_input = handle_response(stored_msg);
} else {
crm_err("Unknown message type: %s", type);
}
/* crm_debug_2("%s: Next input is %s", __FUNCTION__, */
/* fsa_input2string(next_input)); */
return next_input;
}
#define schedule_pe() do { \
next_input = I_PE_CALC; \
if(fsa_pe_ref) { \
crm_debug("Cancelling %s...", fsa_pe_ref); \
crm_free(fsa_pe_ref); \
fsa_pe_ref = NULL; \
} \
} while(0)
enum crmd_fsa_input
-handle_request(ha_msg_input_t *stored_msg)
+handle_request(xmlNode *stored_msg)
{
xmlNode *msg = NULL;
enum crmd_fsa_input next_input = I_NULL;
- const char *op = crm_element_value(stored_msg->msg, F_CRM_TASK);
- const char *sys_to = crm_element_value(stored_msg->msg, F_CRM_SYS_TO);
- const char *host_from = crm_element_value(stored_msg->msg, F_CRM_HOST_FROM);
+ const char *op = crm_element_value(stored_msg, F_CRM_TASK);
+ const char *sys_to = crm_element_value(stored_msg, F_CRM_SYS_TO);
+ const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM);
crm_debug_2("Received %s "XML_ATTR_REQUEST" from %s in state %s",
op, host_from, fsa_state2string(fsa_state));
if(op == NULL) {
- crm_log_xml(LOG_ERR, "Bad message", stored_msg->msg);
+ crm_log_xml(LOG_ERR, "Bad message", stored_msg);
/*========== common actions ==========*/
} else if(strcasecmp(op, CRM_OP_NOOP) == 0) {
crm_debug_2("no-op from %s", crm_str(host_from));
} else if(strcasecmp(op, CRM_OP_NOVOTE) == 0) {
- register_fsa_input_adv(C_HA_MESSAGE, I_NULL, stored_msg,
+ ha_msg_input_t fsa_input;
+ fsa_input.msg = stored_msg;
+ register_fsa_input_adv(C_HA_MESSAGE, I_NULL, &fsa_input,
A_ELECTION_COUNT|A_ELECTION_CHECK, FALSE, __FUNCTION__);
} else if(strcasecmp(op, CRM_OP_VOTE) == 0) {
/* count the vote and decide what to do after that */
- register_fsa_input_adv(C_HA_MESSAGE, I_NULL, stored_msg,
- A_ELECTION_COUNT|A_ELECTION_CHECK, FALSE, __FUNCTION__);
+ ha_msg_input_t fsa_input;
+ fsa_input.msg = stored_msg;
+ register_fsa_input_adv(C_HA_MESSAGE, I_NULL, &fsa_input,
+ A_ELECTION_COUNT|A_ELECTION_CHECK, FALSE, __FUNCTION__);
/* Sometimes we _must_ go into S_ELECTION */
if(fsa_state == S_HALT) {
crm_debug("Forcing an election from S_HALT");
next_input = I_ELECTION;
#if 0
} else if(AM_I_DC) {
/* This is the old way of doing things but what is gained? */
next_input = I_ELECTION;
#endif
}
} else if(strcasecmp(op, CRM_OP_LOCAL_SHUTDOWN) == 0) {
crm_shutdown(SIGTERM);
/*next_input = I_SHUTDOWN; */
next_input = I_NULL;
} else if(strcasecmp(op, CRM_OP_PING) == 0) {
/* eventually do some stuff to figure out
* if we /are/ ok
*/
xmlNode *ping = createPingAnswerFragment(sys_to, "ok");
crm_xml_add(ping, "crmd_state", fsa_state2string(fsa_state));
crm_info("Current ping state: %s", fsa_state2string(fsa_state));
- msg = create_reply(stored_msg->msg, ping);
+ msg = create_reply(stored_msg, ping);
free_xml(ping);
if(relay_message(msg, TRUE) == FALSE) {
free_xml(msg);
}
/* probably better to do this via signals on the
* local node
*/
} else if(strcasecmp(op, CRM_OP_DEBUG_UP) == 0) {
alter_debug(DEBUG_INC);
crm_info("Debug set to %d", get_crm_log_level());
} else if(strcasecmp(op, CRM_OP_DEBUG_DOWN) == 0) {
alter_debug(DEBUG_DEC);
crm_info("Debug set to %d", get_crm_log_level());
} else if(strcasecmp(op, CRM_OP_JOIN_OFFER) == 0) {
next_input = I_JOIN_OFFER;
crm_debug("Raising I_JOIN_OFFER: join-%s",
- crm_element_value(stored_msg->msg, F_CRM_JOIN_ID));
+ crm_element_value(stored_msg, F_CRM_JOIN_ID));
} else if(strcasecmp(op, CRM_OP_JOIN_ACKNAK) == 0) {
next_input = I_JOIN_RESULT;
crm_debug("Raising I_JOIN_RESULT: join-%s",
- crm_element_value(stored_msg->msg, F_CRM_JOIN_ID));
+ crm_element_value(stored_msg, F_CRM_JOIN_ID));
} else if(strcasecmp(op, CRM_OP_LRM_DELETE) == 0
|| strcasecmp(op, CRM_OP_LRM_FAIL) == 0
|| strcasecmp(op, CRM_OP_LRM_REFRESH) == 0
|| strcasecmp(op, CRM_OP_REPROBE) == 0) {
- crm_xml_add(stored_msg->msg, F_CRM_SYS_TO, CRM_SYSTEM_LRMD);
+ crm_xml_add(stored_msg, F_CRM_SYS_TO, CRM_SYSTEM_LRMD);
next_input = I_ROUTER;
/* this functionality should only be enabled
* if this is a development build
*/
} else if(CRM_DEV_BUILD && strcasecmp(op, CRM_OP_DIE) == 0/*constant condition*/) {
crm_warn("Test-only code: Killing the CRM without mercy");
crm_warn("Inhibiting respawns");
exit(100);
/*========== (NOT_DC)-Only Actions ==========*/
} else if(AM_I_DC == FALSE){
gboolean dc_match = safe_str_eq(host_from, fsa_our_dc);
if(dc_match || fsa_our_dc == NULL) {
if(strcasecmp(op, CRM_OP_HBEAT) == 0) {
crm_debug_3("Received DC heartbeat from %s",
host_from);
next_input = I_DC_HEARTBEAT;
} else if(fsa_our_dc == NULL) {
crm_warn("CRMd discarding request: %s"
" (DC: %s, from: %s)",
op, crm_str(fsa_our_dc), host_from);
- crm_log_xml(LOG_WARNING, "Ignored Request", stored_msg->msg);
+ crm_log_xml(LOG_WARNING, "Ignored Request", stored_msg);
} else if(strcasecmp(op, CRM_OP_SHUTDOWN) == 0) {
next_input = I_STOP;
} else {
crm_err("CRMd didnt expect request: %s", op);
- crm_log_xml(LOG_ERR, "bad request", stored_msg->msg);
+ crm_log_xml(LOG_ERR, "bad request", stored_msg);
}
} else {
crm_warn("Discarding %s op from %s", op, host_from);
}
/*========== DC-Only Actions ==========*/
} else if(AM_I_DC) {
const char *message = crm_element_value(
- stored_msg->msg, "message");
+ stored_msg, "message");
/* setting "fsa_pe_ref = NULL" makes sure we ignore any
* PE reply that might be pending or in the queue while
* we ask the CIB for a more up-to-date copy
*/
if(safe_str_eq(op, CRM_OP_TEABORT)) {
crm_debug("Transition cancelled: %s/%s", op, message);
clear_bit_inplace(fsa_input_register, R_IN_TRANSITION);
if(need_transition(fsa_state)) {
schedule_pe();
} else {
crm_debug("Filtering %s op in state %s",
op, fsa_state2string(fsa_state));
}
} else if(strcasecmp(op, CRM_OP_TECOMPLETE) == 0) {
crm_debug("Transition complete: %s/%s", op, message);
clear_bit_inplace(fsa_input_register, R_IN_TRANSITION);
if(fsa_state == S_TRANSITION_ENGINE) {
next_input = I_TE_SUCCESS;
} else {
crm_debug("Filtering %s op in state %s",
op, fsa_state2string(fsa_state));
}
} else if(strcasecmp(op, CRM_OP_JOIN_ANNOUNCE) == 0) {
next_input = I_NODE_JOIN;
} else if(strcasecmp(op, CRM_OP_JOIN_REQUEST) == 0) {
next_input = I_JOIN_REQUEST;
} else if(strcasecmp(op, CRM_OP_JOIN_CONFIRM) == 0) {
next_input = I_JOIN_RESULT;
} else if(strcasecmp(op, CRM_OP_SHUTDOWN) == 0) {
gboolean dc_match = safe_str_eq(host_from, fsa_our_dc);
if(dc_match) {
crm_err("We didnt ask to be shut down yet our"
" TE is telling us too."
" Better get out now!");
next_input = I_TERMINATE;
} else if(is_set(fsa_input_register, R_SHUTDOWN)) {
crm_info("Shutting ourselves down (DC)");
next_input = I_STOP;
} else if(fsa_state != S_STOPPING) {
crm_err("Another node is asking us to shutdown"
" but we think we're ok.");
next_input = I_ELECTION;
}
} else if(strcasecmp(op, CRM_OP_SHUTDOWN_REQ) == 0) {
/* a slave wants to shut down */
/* create cib fragment and add to message */
- next_input = handle_shutdown_request(stored_msg->msg);
+ next_input = handle_shutdown_request(stored_msg);
} else {
crm_err("Unexpected request (%s) sent to the DC", op);
- crm_log_xml(LOG_ERR, "Unexpected", stored_msg->msg);
- }
+ crm_log_xml(LOG_ERR, "Unexpected", stored_msg);
+ }
}
return next_input;
}
enum crmd_fsa_input
-handle_response(ha_msg_input_t *stored_msg)
+handle_response(xmlNode *stored_msg)
{
enum crmd_fsa_input next_input = I_NULL;
- const char *op = crm_element_value(stored_msg->msg, F_CRM_TASK);
- const char *sys_from = crm_element_value(stored_msg->msg, F_CRM_SYS_FROM);
- const char *host_from = crm_element_value(stored_msg->msg, F_CRM_HOST_FROM);
- const char *msg_ref = crm_element_value(stored_msg->msg, XML_ATTR_REFERENCE);
+ const char *op = crm_element_value(stored_msg, F_CRM_TASK);
+ const char *sys_from = crm_element_value(stored_msg, F_CRM_SYS_FROM);
+ const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM);
+ const char *msg_ref = crm_element_value(stored_msg, XML_ATTR_REFERENCE);
crm_debug_2("Received %s "XML_ATTR_RESPONSE" from %s in state %s",
op, host_from, fsa_state2string(fsa_state));
if(op == NULL) {
- crm_log_xml(LOG_ERR, "Bad message", stored_msg->msg);
+ crm_log_xml(LOG_ERR, "Bad message", stored_msg);
} else if(AM_I_DC && strcasecmp(op, CRM_OP_PECALC) == 0) {
crm_debug_2("Processing %s reply %s (fsa=%s)",
sys_from, msg_ref, crm_str(fsa_pe_ref));
if(msg_ref != NULL && safe_str_eq(msg_ref, fsa_pe_ref)) {
next_input = I_PE_SUCCESS;
crm_debug_2("Completed: %s...", fsa_pe_ref);
crm_free(fsa_pe_ref);
fsa_pe_ref = NULL;
} else {
crm_debug_2("Skipping superceeded reply from %s",
sys_from);
}
} else if(strcasecmp(op, CRM_OP_VOTE) == 0
|| strcasecmp(op, CRM_OP_HBEAT) == 0
|| strcasecmp(op, CRM_OP_SHUTDOWN_REQ) == 0
|| strcasecmp(op, CRM_OP_SHUTDOWN) == 0) {
crm_debug_2("Ignoring %s from %s in %s",
op, host_from, fsa_state2string(fsa_state));
next_input = I_NULL;
} else {
crm_err("Unexpected response (op=%s) sent to the %s",
op, AM_I_DC?"DC":"CRMd");
next_input = I_NULL;
}
return next_input;
}
enum crmd_fsa_input
handle_shutdown_request(xmlNode *stored_msg)
{
/* handle here to avoid potential version issues
* where the shutdown message/proceedure may have
* been changed in later versions.
*
* This way the DC is always in control of the shutdown
*/
time_t now = time(NULL);
xmlNode *node_state = NULL;
const char *host_from = crm_element_value(stored_msg, F_CRM_HOST_FROM);
if(host_from == NULL) {
/* we're shutting down and the DC */
host_from = fsa_our_uname;
}
crm_info("Creating shutdown request for %s",host_from);
crm_log_xml(LOG_MSG, "message", stored_msg);
node_state = create_node_state(
host_from, NULL, NULL, NULL, NULL,
CRMD_STATE_INACTIVE, FALSE, __FUNCTION__);
crm_xml_add_int(node_state, XML_CIB_ATTR_SHUTDOWN, (int)now);
fsa_cib_anon_update(XML_CIB_TAG_STATUS,node_state, cib_quorum_override);
crm_log_xml_debug_2(node_state, "Shutdown update");
free_xml(node_state);
/* will be picked up by the TE as long as its running */
if(need_transition(fsa_state)
&& is_set(fsa_input_register, R_TE_CONNECTED) == FALSE) {
register_fsa_action(A_TE_CANCEL);
}
return I_NULL;
}
/* frees msg upon completion */
gboolean
send_msg_via_ha(xmlNode *msg)
{
int log_level = LOG_DEBUG_3;
gboolean broadcast = FALSE;
gboolean all_is_good = TRUE;
const char *op = crm_element_value(msg, F_CRM_TASK);
const char *sys_to = crm_element_value(msg, F_CRM_SYS_TO);
const char *host_to = crm_element_value(msg, F_CRM_HOST_TO);
enum crm_ais_msg_types dest = 0;
if(is_openais_cluster()) {
dest = 1;
#if SUPPORT_AIS
dest = text2msg_type(sys_to);
#endif
}
if (msg == NULL) {
crm_err("Attempt to send NULL Message via HA failed.");
all_is_good = FALSE;
} else {
crm_debug_4("Relaying message to (%s) via HA", host_to);
}
if (all_is_good) {
if (sys_to == NULL || strlen(sys_to) == 0) {
crm_err("You did not specify a destination sub-system"
" for this message.");
all_is_good = FALSE;
}
}
/* There are a number of messages may not need to be ordered.
* At a later point perhaps we should detect them and send them
* as unordered messages.
*/
if (all_is_good) {
if (host_to == NULL
|| strlen(host_to) == 0
|| safe_str_eq(sys_to, CRM_SYSTEM_DC)) {
broadcast = TRUE;
all_is_good = send_cluster_message(NULL, dest, msg, FALSE);
} else {
all_is_good = send_cluster_message(host_to, dest, msg, FALSE);
}
}
if(all_is_good == FALSE) {
log_level = LOG_WARNING;
}
if(log_level == LOG_WARNING
|| (safe_str_neq(op, CRM_OP_HBEAT))) {
do_crm_log(log_level,
"Sending %sHA message (ref=%s) to %s@%s %s.",
broadcast?"broadcast ":"directed ",
crm_element_value(msg, XML_ATTR_REFERENCE),
crm_str(sys_to), host_to==NULL?"<all>":host_to,
all_is_good?"succeeded":"failed");
}
free_xml(msg);
return all_is_good;
}
/* msg is deleted by the time this returns */
gboolean
send_msg_via_ipc(xmlNode *msg, const char *sys)
{
gboolean send_ok = TRUE;
IPC_Channel *client_channel;
crm_debug_4("relaying msg to sub_sys=%s via IPC", sys);
client_channel = (IPC_Channel*)g_hash_table_lookup(ipc_clients, sys);
if(crm_element_value(msg, F_CRM_HOST_FROM) == NULL) {
crm_xml_add(msg, F_CRM_HOST_FROM, fsa_our_uname);
}
if (client_channel != NULL) {
crm_debug_3("Sending message via channel %s.", sys);
send_ok = send_ipc_message(client_channel, msg);
} else if(sys != NULL && strcasecmp(sys, CRM_SYSTEM_CIB) == 0) {
crm_err("Sub-system (%s) has been incorporated into the CRMd.",
sys);
crm_err("Change the way we handle this CIB message");
crm_log_xml(LOG_ERR, "cib op", msg);
send_ok = FALSE;
} else if(sys != NULL && strcasecmp(sys, CRM_SYSTEM_LRMD) == 0) {
fsa_data_t *fsa_data = NULL;
ha_msg_input_t *msg_copy = new_ha_msg_input(msg);
crm_malloc0(fsa_data, sizeof(fsa_data_t));
fsa_data->fsa_input = I_MESSAGE;
fsa_data->fsa_cause = C_IPC_MESSAGE;
fsa_data->data = msg_copy;
fsa_data->origin = __FUNCTION__;
fsa_data->data_type = fsa_dt_ha_msg;
#ifdef FSA_TRACE
crm_debug_2("Invoking action %s (%.16llx)",
fsa_action2string(A_LRM_INVOKE),
A_LRM_INVOKE);
#endif
do_lrm_invoke(A_LRM_INVOKE, C_IPC_MESSAGE, fsa_state, I_MESSAGE, fsa_data);
crm_free(msg_copy);
crm_free(fsa_data);
} else {
crm_err("Unknown Sub-system (%s)... discarding message.",
crm_str(sys));
send_ok = FALSE;
}
free_xml(msg);
return send_ok;
}
void
msg_queue_helper(void)
{
#if SUPPORT_HEARTBEAT
IPC_Channel *ipc = NULL;
if(fsa_cluster_conn != NULL) {
ipc = fsa_cluster_conn->llc_ops->ipcchan(
fsa_cluster_conn);
}
if(ipc != NULL) {
ipc->ops->resume_io(ipc);
}
/* g_hash_table_foreach_remove(ipc_clients, ipc_queue_helper, NULL); */
#endif
}
gboolean
ipc_queue_helper(gpointer key, gpointer value, gpointer user_data)
{
crmd_client_t *ipc_client = value;
if(ipc_client->client_channel != NULL) {
ipc_client->client_channel->ops->is_message_pending(ipc_client->client_channel);
}
return FALSE;
}

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 4:49 PM (14 h, 2 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1018983
Default Alt Text
(70 KB)

Event Timeline