Page MenuHomeClusterLabs Projects

callbacks.c
No OneTemporary

callbacks.c

/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <sys/param.h>
#include <crm/crm.h>
#include <string.h>
#include <crmd_fsa.h>
#include <heartbeat.h>
#include <hb_api.h>
#include <crm/msg_xml.h>
#include <crm/common/xml.h>
#include <crm/common/msg.h>
#include <crm/cib.h>
#include <crmd.h>
#include <crmd_messages.h>
#include <crmd_callbacks.h>
#include <crm/dmalloc_wrapper.h>
GHashTable *crmd_peer_state = NULL;
crm_data_t *find_xml_in_hamessage(const HA_Message * msg);
void crmd_ha_connection_destroy(gpointer user_data);
/* From join_dc... */
extern gboolean check_join_state(
enum crmd_fsa_state cur_state, const char *source);
/* #define MAX_EMPTY_CALLBACKS 20 */
/* int empty_callbacks = 0; */
gboolean
crmd_ha_msg_dispatch(IPC_Channel *channel, gpointer user_data)
{
int lpc = 0;
ll_cluster_t *hb_cluster = (ll_cluster_t*)user_data;
while(lpc < 2 && hb_cluster->llc_ops->msgready(hb_cluster)) {
if(channel->ch_status != IPC_CONNECT) {
/* there really is no point continuing */
break;
}
lpc++;
/* invoke the callbacks but dont block */
hb_cluster->llc_ops->rcvmsg(hb_cluster, 0);
}
crm_devel("%d HA messages dispatched", lpc);
G_main_set_trigger(fsa_source);
if (channel && (channel->ch_status != IPC_CONNECT)) {
crm_crit("Lost connection to heartbeat service.");
return FALSE;
}
return TRUE;
}
void
crmd_ha_msg_callback(const HA_Message * msg, void* private_data)
{
ha_msg_input_t *new_input = NULL;
oc_node_t *from_node = NULL;
const char *from = ha_msg_value(msg, F_ORIG);
const char *seq = ha_msg_value(msg, F_SEQ);
const char *op = ha_msg_value(msg, F_CRM_TASK);
const char *sys_to = ha_msg_value(msg, F_CRM_SYS_TO);
const char *sys_from = ha_msg_value(msg, F_CRM_SYS_FROM);
CRM_DEV_ASSERT(from != NULL);
if(fsa_membership_copy == NULL) {
crm_debug("Ignoring HA messages until we are"
" connected to the CCM (%s op from %s)", op, from);
crm_log_message_adv(
LOG_MSG, "HA[inbound]: Ignore (No CCM)", msg);
return;
}
from_node = g_hash_table_lookup(fsa_membership_copy->members, from);
if(from_node == NULL) {
int level = LOG_DEBUG;
if(safe_str_eq(op, CRM_OP_VOTE)) {
level = LOG_WARNING;
} else if(AM_I_DC && safe_str_eq(op, CRM_OP_JOIN_ANNOUNCE)) {
level = LOG_WARNING;
} else if(safe_str_eq(sys_from, CRM_SYSTEM_DC)) {
level = LOG_WARNING;
}
do_crm_log(level, __FILE__, __FUNCTION__,
"Ignoring HA message (op=%s) from %s: not in our"
" membership list", op, from);
crm_log_message_adv(LOG_MSG, "HA[inbound]: CCM Discard", msg);
} else if(AM_I_DC
&& safe_str_eq(sys_from, CRM_SYSTEM_DC)
&& safe_str_neq(from, fsa_our_uname)) {
crm_err("Another DC detected: %s (op=%s)", from, op);
crm_log_message_adv(
LOG_WARNING, "HA[inbound]: Duplicate DC", msg);
new_input = new_ha_msg_input(msg);
/* make sure the election happens NOW */
register_fsa_error_adv(C_FSA_INTERNAL, I_ELECTION, NULL,
new_input, __FUNCTION__);
#if 0
/* still thinking about this one...
* could create a timing issue if we dont notice the
* election before a new DC is elected.
*/
} else if(fsa_our_dc != NULL
&& safe_str_eq(sys_from, CRM_SYSTEM_DC)
&& safe_str_neq(from, fsa_our_dc)) {
crm_warn("Ignoring message from wrong DC: %s vs. %s ",
from, fsa_our_dc);
crm_log_message_adv(LOG_WARNING, "HA[inbound]: wrong DC", msg);
#endif
} else if(safe_str_eq(sys_to, CRM_SYSTEM_DC) && AM_I_DC == FALSE) {
crm_verbose("Ignoring message for the DC [F_SEQ=%s]", seq);
crm_log_message_adv(LOG_TRACE, "HA[inbound]: ignore", msg);
return;
} else if(safe_str_eq(from, fsa_our_uname)
&& safe_str_eq(op, CRM_OP_VOTE)) {
crm_log_message_adv(LOG_TRACE, "HA[inbound]", msg);
crm_verbose("Ignoring our own vote [F_SEQ=%s]: own vote", seq);
return;
} else if(AM_I_DC && safe_str_eq(op, CRM_OP_HBEAT)) {
crm_verbose("Ignoring our own heartbeat [F_SEQ=%s]", seq);
crm_log_message_adv(LOG_TRACE, "HA[inbound]: own heartbeat", msg);
return;
} else {
crm_devel("Processing message");
crm_log_message_adv(LOG_MSG, "HA[inbound]", msg);
new_input = new_ha_msg_input(msg);
register_fsa_input(C_HA_MESSAGE, I_ROUTER, new_input);
}
#if 0
if(ha_msg_value(msg, XML_ATTR_REFERENCE) == NULL) {
ha_msg_add(new_input->msg, XML_ATTR_REFERENCE, seq);
}
#endif
delete_ha_msg_input(new_input);
return;
}
/*
* Apparently returning TRUE means "stay connected, keep doing stuff".
* Returning FALSE means "we're all done, close the connection"
*/
gboolean
crmd_ipc_msg_callback(IPC_Channel *client, gpointer user_data)
{
int lpc = 0;
IPC_Message *msg = NULL;
ha_msg_input_t *new_input = NULL;
crmd_client_t *curr_client = (crmd_client_t*)user_data;
gboolean stay_connected = TRUE;
crm_verbose("Processing IPC message from %s",
curr_client->table_key);
while(lpc == 0 && client->ops->is_message_pending(client)) {
if (client->ch_status != IPC_CONNECT) {
/* The message which was pending for us is that
* the IPC status is now IPC_DISCONNECT */
break;
}
if (client->ops->recv(client, &msg) != IPC_OK) {
perror("Receive failure:");
crm_err("[%s] [receive failure]", curr_client->table_key);
stay_connected = FALSE;
break;
} else if (msg == NULL) {
crm_err("No message from %s this time", curr_client->table_key);
continue;
}
lpc++;
new_input = new_ipc_msg_input(msg);
msg->msg_done(msg);
crm_verbose("Processing msg from %s", curr_client->table_key);
crm_log_message_adv(LOG_MSG, "CRMd[inbound]", new_input->msg);
crmd_authorize_message(new_input, curr_client);
delete_ha_msg_input(new_input);
msg = NULL;
new_input = NULL;
}
crm_verbose("Processed %d messages", lpc);
if (client->ch_status != IPC_CONNECT) {
stay_connected = FALSE;
crm_verbose("received HUP from %s", curr_client->table_key);
if (curr_client != NULL) {
struct crm_subsystem_s *the_subsystem = NULL;
if (curr_client->sub_sys == NULL) {
crm_debug("Client hadn't registered with us yet");
} else if (strcmp(CRM_SYSTEM_PENGINE,
curr_client->sub_sys) == 0) {
the_subsystem = pe_subsystem;
} else if (strcmp(CRM_SYSTEM_TENGINE,
curr_client->sub_sys) == 0) {
the_subsystem = te_subsystem;
} else if (strcmp(CRM_SYSTEM_CIB,
curr_client->sub_sys) == 0){
the_subsystem = cib_subsystem;
}
if(the_subsystem != NULL) {
cleanup_subsystem(the_subsystem);
} /* else that was a transient client */
if (curr_client->table_key != NULL) {
/*
* Key is destroyed below:
* curr_client->table_key
* Value is cleaned up by:
* G_main_del_IPC_Channel
*/
g_hash_table_remove(
ipc_clients, curr_client->table_key);
}
#if 0
if(curr_client->client_source != NULL) {
gboolean det = G_main_del_IPC_Channel(
curr_client->client_source);
crm_verbose("crm_client was %s detached",
det?"successfully":"not");
}
#endif
crm_free(curr_client->table_key);
crm_free(curr_client->sub_sys);
crm_free(curr_client->uuid);
crm_free(curr_client);
}
}
G_main_set_trigger(fsa_source);
return stay_connected;
}
gboolean
lrm_dispatch(IPC_Channel*src_not_used, gpointer user_data)
{
int num_msgs = 0;
ll_lrm_t *lrm = (ll_lrm_t*)user_data;
crm_devel("received callback");
num_msgs = lrm->lrm_ops->rcvmsg(lrm, FALSE);
if(num_msgs < 1) {
crm_err("lrm->lrm_ops->rcvmsg() failed, connection lost?");
clear_bit_inplace(fsa_input_register, R_LRM_CONNECTED);
register_fsa_input(C_FSA_INTERNAL, I_ERROR, NULL);
G_main_set_trigger(fsa_source);
return FALSE;
}
return TRUE;
}
void
lrm_op_callback(lrm_op_t* op)
{
CRM_DEV_ASSERT(op != NULL);
if(crm_assert_failed) {
return;
}
crm_debug("received callback: %s/%s (%s)",
op->op_type, op->rsc_id, op_status2text(op->op_status));
/* Make sure the LRM events are received in order */
register_fsa_input_later(C_LRM_OP_CALLBACK, I_LRM_EVENT, op);
G_main_set_trigger(fsa_source);
}
void
crmd_ha_status_callback(
const char *node, const char * status, void* private_data)
{
crm_data_t *update = NULL;
crm_devel("received callback");
crm_notice("Status update: Node %s now has status [%s]",node,status);
if(safe_str_neq(status, DEADSTATUS)) {
crm_devel("nstatus callback was not for a dead node");
return;
}
/* this node is taost */
update = create_node_state(
node, node, status, NULL, NULL, NULL, NULL);
set_xml_property_copy(
update, XML_CIB_ATTR_CLEAR_SHUTDOWN, XML_BOOLEAN_TRUE);
update_local_cib(create_cib_fragment(update, NULL));
G_main_set_trigger(fsa_source);
free_xml(update);
}
void
crmd_client_status_callback(const char * node, const char * client,
const char * status, void * private)
{
const char *join = NULL;
const char *extra = NULL;
crm_data_t * update = NULL;
crm_devel("received callback");
if(safe_str_neq(client, CRM_SYSTEM_CRMD)) {
return;
}
if(safe_str_eq(status, JOINSTATUS)){
status = ONLINESTATUS;
extra = XML_CIB_ATTR_CLEAR_SHUTDOWN;
} else if(safe_str_eq(status, LEAVESTATUS)){
status = OFFLINESTATUS;
join = CRMD_JOINSTATE_DOWN;
extra = XML_CIB_ATTR_CLEAR_SHUTDOWN;
}
set_bit_inplace(fsa_input_register, R_PEER_DATA);
g_hash_table_replace(
crmd_peer_state, crm_strdup(node), crm_strdup(status));
if(fsa_state == S_STARTING || fsa_state == S_STOPPING) {
return;
}
crm_notice("Status update: Client %s/%s now has status [%s]",
node, client, status);
if(safe_str_eq(node, fsa_our_dc)
&& safe_str_eq(status, OFFLINESTATUS)) {
/* did our DC leave us */
crm_info("Got client status callback - our DC is dead");
register_fsa_input(C_CRMD_STATUS_CALLBACK, I_ELECTION, NULL);
} else {
crm_devel("Got client status callback");
update = create_node_state(
node, node, NULL, NULL, status, join, NULL);
set_xml_property_copy(update, extra, XML_BOOLEAN_TRUE);
update_local_cib(create_cib_fragment(update, NULL));
free_xml(update);
if(AM_I_DC && safe_str_eq(status, OFFLINESTATUS)) {
g_hash_table_remove(confirmed_nodes, node);
g_hash_table_remove(finalized_nodes, node);
g_hash_table_remove(integrated_nodes, node);
g_hash_table_remove(welcomed_nodes, node);
check_join_state(fsa_state, __FUNCTION__);
}
}
G_main_set_trigger(fsa_source);
}
void
crmd_ha_connection_destroy(gpointer user_data)
{
crm_crit("Heartbeat has left us");
/* this is always an error */
/* feed this back into the FSA */
register_fsa_input(C_HA_DISCONNECT, I_ERROR, NULL);
G_main_set_trigger(fsa_source);
}
gboolean
crmd_client_connect(IPC_Channel *client_channel, gpointer user_data)
{
if (client_channel == NULL) {
crm_err("Channel was NULL");
} else if (client_channel->ch_status == IPC_DISCONNECT) {
crm_err("Channel was disconnected");
} else {
crmd_client_t *blank_client = NULL;
crm_devel("Channel connected");
crm_malloc0(blank_client, sizeof(crmd_client_t));
if (blank_client == NULL) {
return FALSE;
}
client_channel->ops->set_recv_qlen(client_channel, 100);
client_channel->ops->set_send_qlen(client_channel, 100);
blank_client->client_channel = client_channel;
blank_client->sub_sys = NULL;
blank_client->uuid = NULL;
blank_client->table_key = NULL;
blank_client->client_source =
G_main_add_IPC_Channel(
G_PRIORITY_LOW, client_channel,
FALSE, crmd_ipc_msg_callback,
blank_client, default_ipc_connection_destroy);
}
return TRUE;
}
gboolean ccm_dispatch(int fd, gpointer user_data)
{
int rc = 0;
oc_ev_t *ccm_token = (oc_ev_t*)user_data;
gboolean was_error = FALSE;
crm_devel("received callback");
rc = oc_ev_handle_event(ccm_token);
if(rc != 0) {
crm_err("CCM connection appears to have failed: rc=%d.", rc);
register_fsa_input(C_CCM_CALLBACK, I_ERROR, NULL);
was_error = TRUE;
}
G_main_set_trigger(fsa_source);
return !was_error;
}
void
crmd_ccm_msg_callback(
oc_ed_t event, void *cookie, size_t size, const void *data)
{
int instance = -1;
gboolean update_cache = FALSE;
struct crmd_ccm_data_s *event_data = NULL;
const oc_ev_membership_t *membership = data;
crm_devel("received callback");
if(data != NULL) {
instance = membership->m_instance;
}
crm_info("Quorum %s after event=%s (id=%d)",
ccm_have_quorum(event)?"(re)attained":"lost",
ccm_event_name(event), instance);
switch(event) {
case OC_EV_MS_NEW_MEMBERSHIP:
case OC_EV_MS_INVALID:
update_cache = TRUE;
if(AM_I_DC == FALSE) {
break;
}
register_fsa_action(A_TE_CANCEL); /*cause a transition*/
break;
case OC_EV_MS_NOT_PRIMARY:
#if UNTESTED
if(AM_I_DC == FALSE) {
break;
}
/* tell the TE to pretend it completed and stop */
/* side effect: we'll end up in S_IDLE */
register_fsa_action(A_TE_HALT);
#endif
break;
case OC_EV_MS_PRIMARY_RESTORED:
if(AM_I_DC == FALSE) {
break;
}
fsa_membership_copy->id = instance;
register_fsa_action(A_TE_CANCEL); /*cause a transition*/
break;
case OC_EV_MS_EVICTED:
register_fsa_input(C_FSA_INTERNAL, I_STOP, NULL);
break;
default:
crm_err("Unknown CCM event: %d", event);
}
if(update_cache) {
crm_malloc0(event_data, sizeof(struct crmd_ccm_data_s));
if(event_data != NULL) {
event_data->event = event;
if(data != NULL) {
event_data->oc = copy_ccm_oc_data(
(const oc_ev_membership_t *)data);
}
crm_devel("Sending callback to the FSA");
register_fsa_input(
C_CCM_CALLBACK, I_CCM_EVENT, event_data);
if (event_data->oc) {
crm_free(event_data->oc);
event_data->oc = NULL;
}
crm_free(event_data);
}
} else {
crm_debug("No futher action for transitional CCM event: %s",
ccm_event_name(event));
}
oc_ev_callback_done(cookie);
return;
}
void
crmd_cib_connection_destroy(gpointer user_data)
{
if(is_set(fsa_input_register, R_SHUTDOWN)) {
crm_info("Connection to the CIB terminated...");
return;
}
/* eventually this will trigger a reconnect, not a shutdown */
crm_err("Connection to the CIB terminated...");
register_fsa_input(C_FSA_INTERNAL, I_ERROR, NULL);
clear_bit_inplace(fsa_input_register, R_CIB_CONNECTED);
G_main_set_trigger(fsa_source);
return;
}
longclock_t fsa_start = 0;
longclock_t fsa_stop = 0;
longclock_t fsa_diff = 0;
int fsa_diff_ms = 0;
gboolean
crm_fsa_trigger(gpointer user_data)
{
if(fsa_diff_max_ms > 0) {
fsa_start = time_longclock();
}
s_crmd_fsa(C_FSA_INTERNAL);
if(fsa_diff_max_ms > 0) {
fsa_stop = time_longclock();
fsa_diff = sub_longclock(fsa_start, fsa_stop);
fsa_diff_ms = longclockto_ms(fsa_diff);
if(fsa_diff_ms > fsa_diff_max_ms) {
crm_err("FSA took %dms to complete", fsa_diff_ms);
}
}
return TRUE;
}

File Metadata

Mime Type
text/x-c
Expires
Thu, Oct 16, 12:15 AM (1 d, 12 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2530759
Default Alt Text
callbacks.c (15 KB)

Event Timeline