Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/cib/callbacks.c b/cib/callbacks.c
index bf09bddd17..40671cdc15 100644
--- a/cib/callbacks.c
+++ b/cib/callbacks.c
@@ -1,1432 +1,1430 @@
/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <crm_internal.h>
#include <sys/param.h>
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <fcntl.h>
#include <clplumbing/uids.h>
#include <clplumbing/cl_uuid.h>
#include <clplumbing/cl_malloc.h>
#include <clplumbing/Gmain_timeout.h>
#include <crm/crm.h>
#include <crm/cib.h>
#include <crm/msg_xml.h>
#include <crm/common/ipc.h>
#include <crm/common/cluster.h>
#include <crm/common/ctrl.h>
#include <crm/common/xml.h>
#include <crm/common/msg.h>
#include <cibio.h>
#include <callbacks.h>
#include <cibmessages.h>
#include <cibprimatives.h>
#include <notify.h>
#include <heartbeat.h>
#include "common.h"
extern GMainLoop* mainloop;
extern gboolean cib_shutdown_flag;
extern gboolean stand_alone;
extern const char* cib_root;
#if SUPPORT_HEARTBEAT
extern ll_cluster_t *hb_conn;
#endif
extern void cib_ha_connection_destroy(gpointer user_data);
extern enum cib_errors cib_update_counter(
xmlNode *xml_obj, const char *field, gboolean reset);
extern void GHFunc_count_peers(
gpointer key, gpointer value, gpointer user_data);
-extern enum cib_errors revision_check(
- xmlNode *cib_update, xmlNode *cib_copy, int flags);
void initiate_exit(void);
void terminate_cib(const char *caller);
gint cib_GCompareFunc(gconstpointer a, gconstpointer b);
void cib_GHFunc(gpointer key, gpointer value, gpointer user_data);
gboolean can_write(int flags);
void send_cib_replace(const xmlNode *sync_request, const char *host);
void cib_process_request(
xmlNode *request, gboolean privileged, gboolean force_synchronous,
gboolean from_peer, cib_client_t *cib_client);
void cib_common_callback_worker(xmlNode *op_request, cib_client_t *cib_client,
gboolean force_synchronous, gboolean privileged);
extern GHashTable *client_list;
int next_client_id = 0;
extern const char *cib_our_uname;
extern unsigned long cib_num_ops, cib_num_local, cib_num_updates, cib_num_fail;
extern unsigned long cib_bad_connects, cib_num_timeouts;
extern longclock_t cib_call_time;
extern enum cib_errors cib_status;
int send_via_callback_channel(xmlNode *msg, const char *token);
enum cib_errors cib_process_command(
xmlNode *request, xmlNode **reply,
xmlNode **cib_diff, gboolean privileged);
gboolean cib_common_callback(IPC_Channel *channel, cib_client_t *cib_client,
gboolean force_synchronous, gboolean privileged);
gboolean cib_process_disconnect(IPC_Channel *channel, cib_client_t *cib_client);
int num_clients = 0;
static void
cib_ipc_connection_destroy(gpointer user_data)
{
cib_client_t *cib_client = user_data;
/* cib_process_disconnect */
if(cib_client == NULL) {
crm_debug_4("Destroying %p", user_data);
return;
}
if(cib_client->source != NULL) {
crm_debug_4("Deleting %s (%p) from mainloop",
cib_client->name, cib_client->source);
G_main_del_IPC_Channel(cib_client->source);
cib_client->source = NULL;
}
crm_debug_3("Destroying %s (%p)", cib_client->name, user_data);
num_clients--;
crm_debug_2("Num unfree'd clients: %d", num_clients);
crm_free(cib_client->name);
crm_free(cib_client->callback_id);
crm_free(cib_client->id);
crm_free(cib_client);
crm_debug_4("Freed the cib client");
return;
}
gboolean
cib_client_connect(IPC_Channel *channel, gpointer user_data)
{
cl_uuid_t client_id;
xmlNode *reg_msg = NULL;
cib_client_t *new_client = NULL;
char uuid_str[UU_UNPARSE_SIZEOF];
const char *channel_name = user_data;
gboolean (*callback)(IPC_Channel *channel, gpointer user_data);
crm_debug_3("Connecting channel");
if (channel == NULL) {
crm_err("Channel was NULL");
cib_bad_connects++;
return FALSE;
} else if (channel->ch_status != IPC_CONNECT) {
crm_err("Channel was disconnected");
cib_bad_connects++;
return FALSE;
} else if(channel_name == NULL) {
crm_err("user_data must contain channel name");
cib_bad_connects++;
return FALSE;
} else if(cib_shutdown_flag) {
crm_info("Ignoring new client [%d] during shutdown",
channel->farside_pid);
return FALSE;
}
callback = cib_ro_callback;
if(safe_str_eq(channel_name, cib_channel_rw)) {
callback = cib_rw_callback;
}
crm_malloc0(new_client, sizeof(cib_client_t));
num_clients++;
new_client->channel = channel;
new_client->channel_name = channel_name;
crm_debug_3("Created channel %p for channel %s",
new_client, new_client->channel_name);
channel->ops->set_recv_qlen(channel, 1024);
channel->ops->set_send_qlen(channel, 1024);
new_client->source = G_main_add_IPC_Channel(
G_PRIORITY_DEFAULT, channel, FALSE, callback,
new_client, cib_ipc_connection_destroy);
crm_debug_3("Channel %s connected for client %s",
new_client->channel_name, new_client->id);
cl_uuid_generate(&client_id);
cl_uuid_unparse(&client_id, uuid_str);
CRM_CHECK(new_client->id == NULL, crm_free(new_client->id));
new_client->id = crm_strdup(uuid_str);
/* make sure we can find ourselves later for sync calls
* redirected to the master instance
*/
g_hash_table_insert(client_list, new_client->id, new_client);
reg_msg = create_xml_node(NULL, "callback");
crm_xml_add(reg_msg, F_CIB_OPERATION, CRM_OP_REGISTER);
crm_xml_add(reg_msg, F_CIB_CLIENTID, new_client->id);
send_ipc_message(channel, reg_msg);
free_xml(reg_msg);
return TRUE;
}
gboolean
cib_rw_callback(IPC_Channel *channel, gpointer user_data)
{
gboolean result = FALSE;
result = cib_common_callback(channel, user_data, FALSE, TRUE);
return result;
}
gboolean
cib_ro_callback(IPC_Channel *channel, gpointer user_data)
{
gboolean result = FALSE;
result = cib_common_callback(channel, user_data, FALSE, FALSE);
return result;
}
void
cib_common_callback_worker(xmlNode *op_request, cib_client_t *cib_client,
gboolean force_synchronous, gboolean privileged)
{
int rc = cib_ok;
int call_type = 0;
const char *op = NULL;
longclock_t call_stop = 0;
longclock_t call_start = 0;
call_start = time_longclock();
cib_client->num_calls++;
op = crm_element_value(op_request, F_CIB_OPERATION);
if(safe_str_eq(op, CRM_OP_REGISTER) ) {
goto done;
} else if(safe_str_eq(op, T_CIB_NOTIFY) ) {
/* Update the notify filters for this client */
int on_off = 0;
const char *type = crm_element_value(op_request, F_CIB_NOTIFY_TYPE);;
crm_element_value_int(op_request, F_CIB_NOTIFY_ACTIVATE, &on_off);
crm_info("Setting %s callbacks for %s: %s",
type, cib_client->name, on_off?"on":"off");
if(safe_str_eq(type, T_CIB_POST_NOTIFY)) {
cib_client->post_notify = on_off;
} else if(safe_str_eq(type, T_CIB_PRE_NOTIFY)) {
cib_client->pre_notify = on_off;
} else if(safe_str_eq(type, T_CIB_UPDATE_CONFIRM)) {
cib_client->confirmations = on_off;
} else if(safe_str_eq(type, T_CIB_DIFF_NOTIFY)) {
cib_client->diffs = on_off;
} else if(safe_str_eq(type, T_CIB_REPLACE_NOTIFY)) {
cib_client->replace = on_off;
}
goto done;
}
rc = cib_get_operation_id(op, &call_type);
if(rc != cib_ok) {
crm_debug("Invalid operation %s from %s/%s",
op, cib_client->name, cib_client->channel_name);
} else {
crm_debug_2("Processing %s operation from %s/%s",
op, cib_client->name, cib_client->channel_name);
}
if(rc == cib_ok) {
cib_process_request(
op_request, force_synchronous, privileged, FALSE,
cib_client);
}
done:
call_stop = time_longclock();
cib_call_time += (call_stop - call_start);
}
gboolean
cib_common_callback(IPC_Channel *channel, cib_client_t *cib_client,
gboolean force_synchronous, gboolean privileged)
{
int lpc = 0;
const char *value = NULL;
xmlNode *op_request = NULL;
gboolean keep_channel = TRUE;
if(cib_client == NULL) {
crm_err("Receieved call from unknown source. Discarding.");
return FALSE;
}
crm_debug_2("Callback for %s on %s channel",
cib_client->id, cib_client->channel_name);
while(IPC_ISRCONN(channel)) {
if(channel->ops->is_message_pending(channel) == 0) {
break;
}
op_request = xmlfromIPC(channel, 0);
if (op_request == NULL) {
break;
}
lpc++;
crm_assert_failed = FALSE;
crm_log_xml(LOG_MSG, "Client[inbound]", op_request);
if(cib_client->name == NULL) {
value = crm_element_value(op_request, F_CIB_CLIENTNAME);
if(value == NULL) {
cib_client->name = crm_itoa(channel->farside_pid);
} else {
cib_client->name = crm_strdup(value);
}
}
CRM_CHECK(cib_client->id != NULL, crm_err("Invalid client: %p", cib_client));
crm_xml_add(op_request, F_CIB_CLIENTID, cib_client->id);
crm_xml_add(op_request, F_CIB_CLIENTNAME, cib_client->name);
if(cib_client->callback_id == NULL) {
value = crm_element_value(op_request, F_CIB_CALLBACK_TOKEN);
if(value != NULL) {
cib_client->callback_id = crm_strdup(value);
crm_debug_2("Callback channel for %s is %s",
cib_client->id, cib_client->callback_id);
} else {
cib_client->callback_id = crm_strdup(cib_client->id);
}
}
cib_common_callback_worker(
op_request, cib_client, force_synchronous, privileged);
free_xml(op_request);
if(channel->ch_status == IPC_CONNECT) {
break;
}
}
crm_debug_2("Processed %d messages", lpc);
if(channel->ch_status != IPC_CONNECT) {
crm_debug_2("Client disconnected");
keep_channel = cib_process_disconnect(channel, cib_client);
}
return keep_channel;
}
extern void cib_send_remote_msg(void *session, xmlNode *msg);
static void
do_local_notify(xmlNode *notify_src, const char *client_id,
gboolean sync_reply, gboolean from_peer)
{
/* send callback to originating child */
cib_client_t *client_obj = NULL;
xmlNode *client_reply = NULL;
enum cib_errors local_rc = cib_ok;
crm_debug_2("Performing notification");
client_reply = cib_msg_copy(notify_src, TRUE);
if(client_id != NULL) {
client_obj = g_hash_table_lookup(
client_list, client_id);
} else {
crm_debug_2("No client to sent the response to."
" F_CIB_CLIENTID not set.");
}
crm_debug_3("Sending callback to request originator");
if(client_obj == NULL) {
local_rc = cib_reply_failed;
} else {
const char *client_id = client_obj->callback_id;
crm_debug_2("Sending %ssync response to %s %s",
sync_reply?"":"an a-",
client_obj->name,
from_peer?"(originator of delegated request)":"");
if(sync_reply) {
client_id = client_obj->id;
}
local_rc = send_via_callback_channel(client_reply, client_id);
}
if(local_rc != cib_ok && client_obj != NULL) {
crm_warn("%sSync reply to %s failed: %s",
sync_reply?"":"A-",
client_obj?client_obj->name:"<unknown>", cib_error2string(local_rc));
}
free_xml(client_reply);
}
static void
parse_local_options(
cib_client_t *cib_client, int call_type, int call_options, const char *host, const char *op,
gboolean *local_notify, gboolean *needs_reply, gboolean *process, gboolean *needs_forward)
{
if(cib_op_modifies(call_type)
&& !(call_options & cib_inhibit_bcast)) {
/* we need to send an update anyway */
*needs_reply = TRUE;
} else {
*needs_reply = FALSE;
}
if(host == NULL && (call_options & cib_scope_local)) {
crm_debug_2("Processing locally scoped %s op from %s",
op, cib_client->name);
*local_notify = TRUE;
} else if(host == NULL && cib_is_master) {
crm_debug_2("Processing master %s op locally from %s",
op, cib_client->name);
*local_notify = TRUE;
} else if(safe_str_eq(host, cib_our_uname)) {
crm_debug_2("Processing locally addressed %s op from %s",
op, cib_client->name);
*local_notify = TRUE;
} else if(stand_alone) {
*needs_forward = FALSE;
*local_notify = TRUE;
*process = TRUE;
} else {
crm_debug_2("%s op from %s needs to be forwarded to %s",
op, cib_client->name,
host?host:"the master instance");
*needs_forward = TRUE;
*process = FALSE;
}
}
static gboolean
parse_peer_options(
int call_type, xmlNode *request,
gboolean *local_notify, gboolean *needs_reply, gboolean *process, gboolean *needs_forward)
{
const char *op = crm_element_value(request, F_CIB_OPERATION);
const char *originator = crm_element_value(request, F_ORIG);
const char *host = crm_element_value(request, F_CIB_HOST);
const char *reply_to = crm_element_value(request, F_CIB_ISREPLY);
const char *update = crm_element_value(request, F_CIB_GLOBAL_UPDATE);
const char *delegated = crm_element_value(request, F_CIB_DELEGATED);
if(safe_str_eq(op, "cib_shutdown_req")) {
if(reply_to != NULL) {
crm_debug("Processing %s from %s", op, host);
*needs_reply = FALSE;
} else {
crm_debug("Processing %s reply from %s", op, host);
}
return TRUE;
} else if(crm_is_true(update) && safe_str_eq(reply_to, cib_our_uname)) {
crm_debug_2("Processing global/peer update from %s"
" that originated from us", originator);
*needs_reply = FALSE;
if(crm_element_value(request, F_CIB_CLIENTID) != NULL) {
*local_notify = TRUE;
}
return TRUE;
} else if(crm_is_true(update)) {
crm_debug_2("Processing global/peer update from %s", originator);
*needs_reply = FALSE;
return TRUE;
} else if(host != NULL && safe_str_eq(host, cib_our_uname)) {
crm_debug_2("Processing request sent to us from %s", originator);
return TRUE;
} else if(delegated != NULL && cib_is_master == TRUE) {
crm_debug_2("Processing request sent to master instance from %s",
originator);
return TRUE;
} else if(reply_to != NULL && safe_str_eq(reply_to, cib_our_uname)) {
crm_debug_2("Forward reply sent from %s to local clients",
originator);
*process = FALSE;
*needs_reply = FALSE;
*local_notify = TRUE;
return TRUE;
} else if(delegated != NULL) {
crm_debug_2("Ignoring msg for master instance");
} else if(host != NULL) {
/* this is for a specific instance and we're not it */
crm_debug_2("Ignoring msg for instance on %s", crm_str(host));
} else if(reply_to == NULL && cib_is_master == FALSE) {
/* this is for the master instance and we're not it */
crm_debug_2("Ignoring reply to %s", crm_str(reply_to));
} else {
crm_err("Nothing for us to do?");
crm_log_xml(LOG_ERR, "Peer[inbound]", request);
}
return FALSE;
}
static void
forward_request(xmlNode *request, cib_client_t *cib_client, int call_options)
{
xmlNode *forward_msg = NULL;
const char *op = crm_element_value(request, F_CIB_OPERATION);
const char *host = crm_element_value(request, F_CIB_HOST);
forward_msg = cib_msg_copy(request, TRUE);
crm_xml_add(forward_msg, F_CIB_DELEGATED, cib_our_uname);
if(host != NULL) {
crm_debug_2("Forwarding %s op to %s", op, host);
send_cluster_message(host, crm_msg_cib, forward_msg, FALSE);
} else {
crm_debug_2("Forwarding %s op to master instance", op);
send_cluster_message(NULL, crm_msg_cib, forward_msg, FALSE);
}
if(call_options & cib_discard_reply) {
crm_debug_2("Client not interested in reply");
} else if(call_options & cib_sync_call) {
/* keep track of the request so we can time it
* out if required
*/
crm_debug_2("Registering delegated call from %s",
cib_client->id);
cib_client->delegated_calls = g_list_append(
cib_client->delegated_calls, forward_msg);
forward_msg = NULL;
}
free_xml(forward_msg);
}
static void
send_peer_reply(
xmlNode *msg, xmlNode *result_diff, const char *originator, gboolean broadcast)
{
xmlNode *reply_copy = NULL;
CRM_ASSERT(msg != NULL);
reply_copy = cib_msg_copy(msg, TRUE);
if(broadcast) {
/* this (successful) call modified the CIB _and_ the
* change needs to be broadcast...
* send via HA to other nodes
*/
int diff_add_updates = 0;
int diff_add_epoch = 0;
int diff_add_admin_epoch = 0;
int diff_del_updates = 0;
int diff_del_epoch = 0;
int diff_del_admin_epoch = 0;
char *digest = NULL;
cib_diff_version_details(
result_diff,
&diff_add_admin_epoch, &diff_add_epoch, &diff_add_updates,
&diff_del_admin_epoch, &diff_del_epoch, &diff_del_updates);
crm_debug("Sending update diff %d.%d.%d -> %d.%d.%d",
diff_del_admin_epoch,diff_del_epoch,diff_del_updates,
diff_add_admin_epoch,diff_add_epoch,diff_add_updates);
crm_xml_add(reply_copy, F_CIB_ISREPLY, originator);
crm_xml_add(reply_copy, F_CIB_GLOBAL_UPDATE, XML_BOOLEAN_TRUE);
crm_xml_add(reply_copy, F_CIB_OPERATION, CIB_OP_APPLY_DIFF);
digest = calculate_xml_digest(the_cib, FALSE, TRUE);
crm_xml_add(result_diff, XML_ATTR_DIGEST, digest);
/* crm_log_xml_debug(the_cib, digest); */
crm_free(digest);
add_message_xml(reply_copy, F_CIB_UPDATE_DIFF, result_diff);
crm_log_xml(LOG_DEBUG_3, "copy", reply_copy);
send_cluster_message(NULL, crm_msg_cib, reply_copy, TRUE);
} else if(originator != NULL) {
/* send reply via HA to originating node */
crm_debug_2("Sending request result to originator only");
crm_xml_add(reply_copy, F_CIB_ISREPLY, originator);
send_cluster_message(originator, crm_msg_cib, reply_copy, FALSE);
}
free_xml(reply_copy);
}
void
cib_process_request(
xmlNode *request, gboolean force_synchronous, gboolean privileged,
gboolean from_peer, cib_client_t *cib_client)
{
int call_type = 0;
int call_options = 0;
gboolean process = TRUE;
gboolean needs_reply = TRUE;
gboolean local_notify = FALSE;
gboolean needs_forward = FALSE;
xmlNode *result_diff = NULL;
enum cib_errors rc = cib_ok;
xmlNode *op_reply = NULL;
const char *op = crm_element_value(request, F_CIB_OPERATION);
const char *originator = crm_element_value(request, F_ORIG);
const char *host = crm_element_value(request, F_CIB_HOST);
const char *update = crm_element_value(request, F_CIB_GLOBAL_UPDATE);
crm_debug_4("%s Processing msg %s",
cib_our_uname, crm_element_value(request, F_SEQ));
cib_num_ops++;
if(cib_num_ops == 0) {
cib_num_fail = 0;
cib_num_local = 0;
cib_num_updates = 0;
crm_info("Stats wrapped around");
}
if(host != NULL && strlen(host) == 0) {
host = NULL;
}
crm_element_value_int(request, F_CIB_CALLOPTS, &call_options);
crm_debug_4("Retrieved call options: %d", call_options);
if(force_synchronous) {
call_options |= cib_sync_call;
}
crm_debug_2("Processing %s message (%s) for %s...",
from_peer?"peer":"local",
from_peer?originator:cib_our_uname, host?host:"master");
rc = cib_get_operation_id(op, &call_type);
if(cib_op_modifies(call_type)) {
cib_num_updates++;
}
if(rc != cib_ok) {
/* TODO: construct error reply */
crm_err("Pre-processing of command failed: %s",
cib_error2string(rc));
} else if(from_peer == FALSE) {
parse_local_options(cib_client, call_type, call_options, host, op,
&local_notify, &needs_reply, &process, &needs_forward);
} else if(parse_peer_options(call_type, request, &local_notify,
&needs_reply, &process, &needs_forward) == FALSE) {
return;
}
crm_debug_3("Finished determining processing actions");
if(call_options & cib_discard_reply) {
needs_reply = cib_op_modifies(call_type);
local_notify = FALSE;
}
if(needs_forward) {
forward_request(request, cib_client, call_options);
return;
}
if(cib_status != cib_ok) {
rc = cib_status;
crm_err("Operation ignored, cluster configuration is invalid."
" Please repair and restart: %s",
cib_error2string(cib_status));
op_reply = cib_construct_reply(request, the_cib, cib_status);
} else if(process) {
cib_num_local++;
crm_debug_2("Performing local processing:"
" op=%s origin=%s/%s,%s (update=%s)",
crm_element_value(request, F_CIB_OPERATION), originator,
crm_element_value(request, F_CIB_CLIENTID),
crm_element_value(request, F_CIB_CALLID), update);
rc = cib_process_command(
request, &op_reply, &result_diff, privileged);
crm_debug_2("Processing complete");
if(rc == cib_diff_resync || rc == cib_diff_failed
|| rc == cib_old_data) {
crm_warn("%s operation failed: %s",
crm_str(op), cib_error2string(rc));
} else if(rc != cib_ok) {
cib_num_fail++;
crm_err("%s operation failed: %s",
crm_str(op), cib_error2string(rc));
crm_log_xml(LOG_DEBUG, "CIB[output]", op_reply);
crm_log_xml(LOG_INFO, "Input message", request);
}
if(op_reply == NULL && (needs_reply || local_notify)) {
crm_err("Unexpected NULL reply to message");
crm_log_xml(LOG_ERR, "null reply", request);
needs_reply = FALSE;
local_notify = FALSE;
}
}
crm_debug_3("processing response cases");
if(local_notify) {
const char *client_id = crm_element_value(request, F_CIB_CLIENTID);
if(process == FALSE) {
do_local_notify(request, client_id, call_options & cib_sync_call, from_peer);
} else {
do_local_notify(op_reply, client_id, call_options & cib_sync_call, from_peer);
}
}
/* from now on we are the server */
if(needs_reply == FALSE || stand_alone) {
/* nothing more to do...
* this was a non-originating slave update
*/
crm_debug_2("Completed slave update");
} else if(rc == cib_ok
&& result_diff != NULL
&& !(call_options & cib_inhibit_bcast)) {
send_peer_reply(request, result_diff, originator, TRUE);
} else if(call_options & cib_discard_reply) {
crm_debug_4("Caller isn't interested in reply");
} else if (from_peer) {
crm_debug_2("Directing reply to %s", originator);
if(call_options & cib_inhibit_bcast) {
crm_debug_3("Request not broadcast: inhibited");
}
if(cib_op_modifies(call_type) == FALSE || result_diff == NULL) {
crm_debug_3("Request not broadcast: R/O call");
}
if(rc != cib_ok) {
crm_debug_3("Request not broadcast: call failed: %s",
cib_error2string(rc));
}
send_peer_reply(op_reply, result_diff, originator, FALSE);
}
free_xml(op_reply);
free_xml(result_diff);
return;
}
xmlNode *
cib_construct_reply(xmlNode *request, xmlNode *output, int rc)
{
int lpc = 0;
xmlNode *reply = NULL;
const char *name = NULL;
const char *value = NULL;
const char *names[] = {
F_CIB_OPERATION,
F_CIB_CALLID,
F_CIB_CLIENTID,
F_CIB_CALLOPTS
};
crm_debug_4("Creating a basic reply");
reply = create_xml_node(NULL, "cib-reply");
crm_xml_add(reply, F_TYPE, T_CIB);
for(lpc = 0; lpc < DIMOF(names); lpc++) {
name = names[lpc];
value = crm_element_value(request, name);
crm_xml_add(reply, name, value);
}
crm_xml_add_int(reply, F_CIB_RC, rc);
if(output != NULL) {
crm_debug_4("Attaching reply output");
add_message_xml(reply, F_CIB_CALLDATA, output);
}
return reply;
}
enum cib_errors
cib_process_command(xmlNode *request, xmlNode **reply,
xmlNode **cib_diff, gboolean privileged)
{
gboolean send_r_notify = FALSE;
xmlNode *output = NULL;
xmlNode *input = NULL;
xmlNode *current_cib = NULL;
xmlNode *result_cib = NULL;
int call_type = 0;
int call_options = 0;
enum cib_errors rc = cib_ok;
enum cib_errors rc2 = cib_ok;
int log_level = LOG_DEBUG_3;
xmlNode *filtered = NULL;
const char *op = NULL;
const char *section = NULL;
gboolean config_changed = FALSE;
gboolean global_update = crm_is_true(crm_element_value(request, F_CIB_GLOBAL_UPDATE));
CRM_ASSERT(cib_status == cib_ok);
*reply = NULL;
*cib_diff = NULL;
if(per_action_cib) {
CRM_CHECK(the_cib == NULL, free_xml(the_cib));
the_cib = readCibXmlFile(cib_root, "cib.xml", FALSE);
CRM_CHECK(the_cib != NULL, return cib_NOOBJECT);
}
current_cib = the_cib;
/* Start processing the request... */
op = crm_element_value(request, F_CIB_OPERATION);
crm_element_value_int(request, F_CIB_CALLOPTS, &call_options);
rc = cib_get_operation_id(op, &call_type);
if(rc == cib_ok) {
rc = cib_op_can_run(call_type, call_options, privileged, global_update);
}
/* prevent NUMUPDATES from being incrimented - apply the change as-is */
if(global_update) {
call_options |= cib_inhibit_bcast;
call_options |= cib_force_diff;
}
rc2 = cib_op_prepare(call_type, request, &input, &section);
if(rc == cib_ok) {
rc = rc2;
}
if(rc != cib_ok) {
crm_debug_2("Call setup failed: %s", cib_error2string(rc));
goto done;
} else if(cib_op_modifies(call_type) == FALSE) {
rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE,
section, request, input, FALSE, &config_changed,
current_cib, &result_cib, &output);
CRM_CHECK(result_cib == NULL, free_xml(result_cib));
goto done;
}
/* Handle a valid write action */
if((call_options & cib_inhibit_notify) == 0) {
cib_pre_notify(call_options, op,
get_object_root(section, current_cib), input);
}
if(rc == cib_ok) {
gboolean manage_counters = TRUE;
if(global_update) {
/* skip */
CRM_CHECK(call_type == 4 || call_type == 11,
crm_err("Call type: %d", call_type);
crm_log_xml(LOG_ERR, "bad op", request));
crm_debug_2("Skipping update: global replace");
manage_counters = FALSE;
} else if(call_options & cib_inhibit_bcast) {
/* skip */
crm_debug_2("Skipping update: inhibit broadcast");
manage_counters = FALSE;
}
rc = cib_perform_op(op, call_options, cib_op_func(call_type), FALSE,
section, request, input, manage_counters, &config_changed,
current_cib, &result_cib, &output);
*cib_diff = diff_cib_object(current_cib, result_cib, FALSE);
}
if(rc != cib_ok) {
free_xml(result_cib);
} else {
rc = activateCibXml(result_cib, config_changed);
if(rc != cib_ok) {
crm_warn("Activation failed");
}
}
if((call_options & cib_inhibit_notify) == 0) {
const char *call_id = crm_element_value(request, F_CIB_CALLID);
const char *client = crm_element_value(request, F_CIB_CLIENTNAME);
cib_post_notify(call_options, op, input, rc, the_cib);
cib_diff_notify(call_options, client, call_id, op,
input, rc, *cib_diff);
}
if(rc == cib_ok && safe_str_eq(CIB_OP_ERASE, op)) {
send_r_notify = TRUE;
} else if(rc == cib_ok && safe_str_eq(CIB_OP_REPLACE, op)) {
if(section == NULL) {
send_r_notify = TRUE;
} else if(safe_str_eq(section, XML_TAG_CIB)) {
send_r_notify = TRUE;
} else if(safe_str_eq(section, XML_CIB_TAG_NODES)) {
send_r_notify = TRUE;
} else if(safe_str_eq(section, XML_CIB_TAG_STATUS)) {
send_r_notify = TRUE;
}
}
if(send_r_notify) {
cib_replace_notify(the_cib, rc, *cib_diff);
}
if(rc == cib_dtd_validation && global_update) {
log_level = LOG_WARNING;
crm_log_xml_info(input, "cib:global_update");
} else if(rc != cib_ok) {
log_level = LOG_DEBUG_4;
} else if(cib_is_master && config_changed) {
log_level = LOG_INFO;
} else if(cib_is_master) {
log_level = LOG_DEBUG;
log_xml_diff(LOG_DEBUG_2, filtered, "cib:diff:filtered");
} else if(config_changed) {
log_level = LOG_DEBUG_2;
} else {
log_level = LOG_DEBUG_3;
}
log_xml_diff(log_level, *cib_diff, "cib:diff");
free_xml(filtered);
done:
if((call_options & cib_discard_reply) == 0) {
*reply = cib_construct_reply(request, output, rc);
/* crm_log_xml_info(*reply, "cib:reply"); */
}
if(call_type >= 0) {
cib_op_cleanup(call_type, op, &input, &output);
}
if(per_action_cib) {
uninitializeCib();
}
return rc;
}
int
send_via_callback_channel(xmlNode *msg, const char *token)
{
cib_client_t *hash_client = NULL;
GList *list_item = NULL;
enum cib_errors rc = cib_ok;
crm_debug_3("Delivering msg %p to client %s", msg, token);
if(token == NULL) {
crm_err("No client id token, cant send message");
if(rc == cib_ok) {
rc = cib_missing;
}
} else {
/* A client that left before we could reply is not really
* _our_ error. Warn instead.
*/
hash_client = g_hash_table_lookup(client_list, token);
if(hash_client == NULL) {
crm_warn("Cannot find client for token %s", token);
rc = cib_client_gone;
} else if (crm_str_eq(hash_client->channel_name, "remote", FALSE)) {
/* just hope it's alive */
} else if(hash_client->channel == NULL) {
crm_err("Cannot find channel for client %s", token);
rc = cib_client_corrupt;
} else if(hash_client->channel->ops->get_chan_status(
hash_client->channel) == IPC_DISCONNECT) {
crm_warn("Client %s has disconnected", token);
rc = cib_client_gone;
cib_num_timeouts++;
}
}
/* this is a more important error so overwriting rc is warrented */
if(msg == NULL) {
crm_err("No message to send");
rc = cib_reply_failed;
}
if(rc == cib_ok) {
list_item = g_list_find_custom(
hash_client->delegated_calls, msg, cib_GCompareFunc);
}
if(list_item != NULL) {
/* remove it - no need to time it out */
xmlNode *orig_msg = list_item->data;
crm_debug_3("Removing msg from delegated list");
hash_client->delegated_calls = g_list_remove(
hash_client->delegated_calls, orig_msg);
CRM_DEV_ASSERT(orig_msg != msg);
free_xml(orig_msg);
}
if(rc == cib_ok) {
crm_debug_3("Delivering reply to client %s (%s)",
token, hash_client->channel_name);
if (crm_str_eq(hash_client->channel_name, "remote", FALSE)) {
cib_send_remote_msg(hash_client->channel, msg);
} else if(send_ipc_message(hash_client->channel, msg) == FALSE) {
crm_warn("Delivery of reply to client %s/%s failed",
hash_client->name, token);
rc = cib_reply_failed;
}
}
return rc;
}
gint cib_GCompareFunc(gconstpointer a, gconstpointer b)
{
const xmlNode *a_msg = a;
const xmlNode *b_msg = b;
int msg_a_id = 0;
int msg_b_id = 0;
const char *value = NULL;
value = crm_element_value_const(a_msg, F_CIB_CALLID);
msg_a_id = crm_parse_int(value, NULL);
value = crm_element_value_const(b_msg, F_CIB_CALLID);
msg_b_id = crm_parse_int(value, NULL);
if(msg_a_id == msg_b_id) {
return 0;
} else if(msg_a_id < msg_b_id) {
return -1;
}
return 1;
}
void
cib_GHFunc(gpointer key, gpointer value, gpointer user_data)
{
int timeout = 0; /* 1 iteration == 10 seconds */
xmlNode *msg = NULL;
xmlNode *reply = NULL;
const char *host_to = NULL;
cib_client_t *client = value;
GListPtr list = client->delegated_calls;
while(list != NULL) {
msg = list->data;
crm_element_value_int(msg, F_CIB_TIMEOUT, &timeout);
if(timeout <= 0) {
list = list->next;
continue;
} else {
int seen = 0;
crm_element_value_int(msg, F_CIB_SEENCOUNT, &seen);
crm_debug_4("Timeout %d, seen %d", timeout, seen);
if(seen < timeout) {
crm_debug_4("Updating seen count for msg from client %s",
client->id);
seen += 10;
crm_xml_add_int(msg, F_CIB_SEENCOUNT, seen);
list = list->next;
continue;
}
}
cib_num_timeouts++;
host_to = crm_element_value(msg, F_CIB_HOST);
crm_warn("Sending operation timeout msg to client %s",
client->id);
reply = create_xml_node(NULL, "cib-reply");
crm_xml_add(reply, F_TYPE, T_CIB);
crm_xml_add(reply, F_CIB_OPERATION,
crm_element_value(msg, F_CIB_OPERATION));
crm_xml_add(reply, F_CIB_CALLID,
crm_element_value(msg, F_CIB_CALLID));
if(host_to == NULL) {
crm_xml_add_int(reply, F_CIB_RC, cib_master_timeout);
} else {
crm_xml_add_int(reply, F_CIB_RC, cib_remote_timeout);
}
send_ipc_message(client->channel, reply);
list = list->next;
client->delegated_calls = g_list_remove(
client->delegated_calls, msg);
free_xml(msg);
free_xml(reply);
}
}
gboolean
cib_process_disconnect(IPC_Channel *channel, cib_client_t *cib_client)
{
if (channel == NULL) {
CRM_DEV_ASSERT(cib_client == NULL);
} else if (cib_client == NULL) {
crm_err("No client");
} else {
CRM_DEV_ASSERT(channel->ch_status != IPC_CONNECT);
crm_debug_2("Cleaning up after client disconnect: %s/%s/%s",
crm_str(cib_client->name),
cib_client->channel_name,
cib_client->id);
if(cib_client->id != NULL) {
if(!g_hash_table_remove(client_list, cib_client->id)) {
crm_err("Client %s not found in the hashtable",
cib_client->name);
}
}
}
if(cib_shutdown_flag && g_hash_table_size(client_list) == 0) {
crm_info("All clients disconnected...");
initiate_exit();
}
return FALSE;
}
void
cib_ha_peer_callback(HA_Message * msg, void* private_data)
{
xmlNode *xml = convert_ha_message(NULL, msg, __FUNCTION__);
cib_peer_callback(xml, private_data);
}
void
cib_peer_callback(xmlNode * msg, void* private_data)
{
int call_type = 0;
int call_options = 0;
const char *originator = crm_element_value(msg, F_ORIG);
const char *seq = crm_element_value(msg, F_SEQ);
const char *op = crm_element_value(msg, F_CIB_OPERATION);
crm_node_t *node = NULL;
crm_log_xml(LOG_MSG, "Peer[inbound]", msg);
crm_debug_2("Peer %s message (%s) from %s", op, seq, originator);
if(originator == NULL || safe_str_eq(originator, cib_our_uname)) {
crm_debug_2("Discarding %s message %s from ourselves", op, seq);
return;
}
if(crm_peer_cache == NULL) {
crm_info("Discarding %s message (%s) from %s:"
" membership not established", op, seq, originator);
return;
}
node = g_hash_table_lookup(crm_peer_cache, originator);
if(node == NULL || crm_is_member_active(node) == FALSE) {
crm_warn("Discarding %s message (%s) from %s:"
" not in our membership", op, seq, originator);
return;
}
if(cib_get_operation_id(op, &call_type) != cib_ok) {
crm_debug("Discarding %s message (%s) from %s:"
" Invalid operation", op, seq, originator);
return;
}
crm_debug_2("Processing %s msg (%s) from %s",op, seq, originator);
crm_element_value_int(msg, F_CIB_CALLOPTS, &call_options);
crm_debug_4("Retrieved call options: %d", call_options);
if(crm_element_value(msg, F_CIB_CLIENTNAME) == NULL) {
crm_xml_add(msg, F_CIB_CLIENTNAME, originator);
}
cib_process_request(msg, FALSE, TRUE, TRUE, NULL);
return;
}
void
cib_client_status_callback(const char * node, const char * client,
const char * status, void * private)
{
crm_node_t *member = NULL;
if(safe_str_eq(client, CRM_SYSTEM_CIB)) {
crm_info("Status update: Client %s/%s now has status [%s]",
node, client, status);
if(safe_str_eq(status, JOINSTATUS)){
status = ONLINESTATUS;
} else if(safe_str_eq(status, LEAVESTATUS)){
status = OFFLINESTATUS;
}
member = g_hash_table_lookup(crm_peer_cache, node);
if(member == NULL) {
/* Make sure it gets created */
const char *uuid = get_uuid(node);
member = crm_update_peer(0, 0, -1, 0, uuid, node, NULL, NULL);
}
crm_update_peer_proc(node, crm_proc_cib, status);
set_connected_peers(the_cib);
}
return;
}
#if SUPPORT_HEARTBEAT
extern oc_ev_t *cib_ev_token;
gboolean cib_ccm_dispatch(int fd, gpointer user_data)
{
int rc = 0;
oc_ev_t *ccm_token = (oc_ev_t*)user_data;
crm_debug_2("received callback");
rc = oc_ev_handle_event(ccm_token);
if(0 == rc) {
return TRUE;
}
crm_err("CCM connection appears to have failed: rc=%d.", rc);
/* eventually it might be nice to recover and reconnect... but until then... */
crm_err("Exiting to recover from CCM connection failure");
exit(2);
return FALSE;
}
int current_instance = 0;
void
cib_ccm_msg_callback(
oc_ed_t event, void *cookie, size_t size, const void *data)
{
gboolean update_id = FALSE;
const oc_ev_membership_t *membership = data;
CRM_ASSERT(membership != NULL);
crm_info("Processing CCM event=%s (id=%d)",
ccm_event_name(event), membership->m_instance);
if(current_instance > membership->m_instance) {
crm_err("Membership instance ID went backwards! %d->%d",
current_instance, membership->m_instance);
CRM_ASSERT(current_instance <= membership->m_instance);
}
switch(event) {
case OC_EV_MS_NEW_MEMBERSHIP:
case OC_EV_MS_INVALID:
update_id = TRUE;
break;
case OC_EV_MS_PRIMARY_RESTORED:
update_id = TRUE;
break;
case OC_EV_MS_NOT_PRIMARY:
crm_debug_2("Ignoring transitional CCM event: %s",
ccm_event_name(event));
break;
case OC_EV_MS_EVICTED:
crm_err("Evicted from CCM: %s", ccm_event_name(event));
break;
default:
crm_err("Unknown CCM event: %d", event);
}
if(update_id) {
unsigned int lpc = 0;
CRM_CHECK(membership != NULL, return);
current_instance = membership->m_instance;
for(lpc=0; lpc < membership->m_n_out; lpc++) {
crm_update_ccm_node(
membership, lpc+membership->m_out_idx, CRM_NODE_LOST);
}
for(lpc=0; lpc < membership->m_n_member; lpc++) {
crm_update_ccm_node(
membership, lpc+membership->m_memb_idx,CRM_NODE_ACTIVE);
}
}
oc_ev_callback_done(cookie);
set_connected_peers(the_cib);
return;
}
#endif
gboolean
can_write(int flags)
{
return TRUE;
}
static gboolean
cib_force_exit(gpointer data)
{
crm_notice("Forcing exit!");
terminate_cib(__FUNCTION__);
return FALSE;
}
void
initiate_exit(void)
{
int active = 0;
xmlNode *leaving = NULL;
active = crm_active_peers(crm_proc_cib);
if(active < 2) {
terminate_cib(__FUNCTION__);
return;
}
crm_info("Sending disconnect notification to %d peers...", active);
leaving = create_xml_node(NULL, "exit-notification");
crm_xml_add(leaving, F_TYPE, "cib");
crm_xml_add(leaving, F_CIB_OPERATION, "cib_shutdown_req");
send_cluster_message(NULL, crm_msg_cib, leaving, TRUE);
free_xml(leaving);
Gmain_timeout_add(crm_get_msec("5s"), cib_force_exit, NULL);
}
void
terminate_cib(const char *caller)
{
#if SUPPORT_AIS
if(is_openais_cluster()) {
cib_ha_connection_destroy(NULL);
return;
}
#endif
#if SUPPORT_HEARTBEAT
if(hb_conn != NULL) {
crm_info("%s: Disconnecting heartbeat", caller);
hb_conn->llc_ops->signoff(hb_conn, FALSE);
} else {
crm_err("%s: No heartbeat connection", caller);
}
#endif
uninitializeCib();
crm_info("Exiting...");
if (mainloop != NULL && g_main_is_running(mainloop)) {
g_main_quit(mainloop);
} else {
exit(LSB_EXIT_OK);
}
}

File Metadata

Mime Type
text/x-diff
Expires
Sat, Jan 25, 12:16 PM (19 h, 55 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1322524
Default Alt Text
(39 KB)

Event Timeline