diff --git a/crm/cib/Makefile.am b/crm/cib/Makefile.am index dd1ef02c4e..8b11137272 100644 --- a/crm/cib/Makefile.am +++ b/crm/cib/Makefile.am @@ -1,72 +1,77 @@ # # Copyright (C) 2004 Andrew Beekhof # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # MAINTAINERCLEANFILES = Makefile.in INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include \ -I$(top_builddir)/libltdl -I$(top_srcdir)/libltdl \ -I$(top_builddir)/linux-ha -I$(top_srcdir)/linux-ha \ -I$(top_builddir) -I$(top_srcdir) hadir = $(sysconfdir)/ha.d halibdir = $(libdir)/@HB_PKG@ commmoddir = $(halibdir)/modules/comm havarlibdir = $(localstatedir)/lib/@HB_PKG@ PIDFILE = $(localstatedir)/run/crmd.pid XML_FLAGS = `xml2-config --cflags` XML_LIBS = `xml2-config --libs` # sockets with path crmdir = $(havarlibdir)/crm apigid = @HA_APIGID@ crmuid = @HA_CCMUID@ crmreqsocket = $(havarlibdir)/api/crm.req crmressocket = $(havarlibdir)/api/crm.rsp COMMONLIBS = $(CRM_DEBUG_LIBS) \ $(top_builddir)/lib/clplumbing/libplumb.la \ $(top_builddir)/$(CRM_DIR)/common/libcrmcommon.la \ $(top_builddir)/lib/apphb/libapphb.la \ $(top_builddir)/lib/hbclient/libhbclient.la \ $(top_builddir)/lib/crm/cib/libcib.la \ $(GLIBLIB) \ $(LIBRT) LIBRT = @LIBRT@ AM_CFLAGS = @CFLAGS@ -DPIDFILE='"$(PIDFILE)"' $(CRM_DEBUG_FLAGS) ## binary progs -halib_PROGRAMS = cib +halib_PROGRAMS = cib cibmon ## SOURCES #noinst_HEADERS = config.h control.h crmd.h noinst_HEADERS = cibio.h cibmessages.h cibprimatives.h notify.h \ callbacks.h cib_SOURCES = io.c primatives.c messages.c cib.c notify.c \ callbacks.c main.c cib_CFLAGS = $(XML_FLAGS) -DHA_VARLIBDIR='"@HA_VARLIBDIR@"' cib_LDFLAGS = $(XML_LIBS) cib_LDADD = $(COMMONLIBS) +cibmon_SOURCES = cibmon.c +cibmon_CFLAGS = $(XML_FLAGS) -DHA_VARLIBDIR='"@HA_VARLIBDIR@"' +cibmon_LDFLAGS = $(XML_LIBS) +cibmon_LDADD = $(COMMONLIBS) + clean-generic: rm -f *.log *.debug *.xml *~ install-exec-local: uninstall-local: diff --git a/crm/cib/callbacks.c b/crm/cib/callbacks.c index 90d29ded57..ada113310e 100644 --- a/crm/cib/callbacks.c +++ b/crm/cib/callbacks.c @@ -1,805 +1,787 @@ -/* $Id: callbacks.c,v 1.3 2004/12/09 14:46:21 andrew Exp $ */ +/* $Id: callbacks.c,v 1.4 2004/12/10 20:07:07 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include int next_client_id = 0; gboolean cib_is_master = FALSE; GHashTable *client_list = NULL; extern const char *cib_our_uname; extern ll_cluster_t *hb_conn; -extern FILE *msg_cib_strm; /* technically bump does modify the cib... * but we want to split the "bump" from the "sync" */ cib_operation_t cib_server_ops[] = { {NULL, FALSE, FALSE, FALSE, FALSE, cib_process_default}, {CRM_OP_NOOP, FALSE, FALSE, FALSE, FALSE, cib_process_default}, {CRM_OP_RETRIVE_CIB, FALSE, FALSE, FALSE, FALSE, cib_process_query}, {CRM_OP_CIB_SLAVE, FALSE, TRUE, FALSE, FALSE, cib_process_readwrite}, {CRM_OP_CIB_SLAVEALL,TRUE, TRUE, FALSE, FALSE, cib_process_readwrite}, {CRM_OP_CIB_MASTER, FALSE, TRUE, FALSE, FALSE, cib_process_readwrite}, {CRM_OP_CIB_ISMASTER,FALSE, TRUE, FALSE, FALSE, cib_process_readwrite}, {CRM_OP_CIB_BUMP, FALSE, TRUE, TRUE, FALSE, cib_process_bump}, {CRM_OP_CIB_REPLACE, TRUE, TRUE, TRUE, TRUE, cib_process_replace}, {CRM_OP_CIB_CREATE, TRUE, TRUE, TRUE, TRUE, cib_process_modify}, {CRM_OP_CIB_UPDATE, TRUE, TRUE, TRUE, TRUE, cib_process_modify}, {CRM_OP_JOINACK, TRUE, TRUE, TRUE, TRUE, cib_process_modify}, {CRM_OP_SHUTDOWN_REQ,TRUE, TRUE, TRUE, TRUE, cib_process_modify}, {CRM_OP_CIB_DELETE, TRUE, TRUE, TRUE, TRUE, cib_process_modify}, {CRM_OP_CIB_QUERY, FALSE, FALSE, TRUE, FALSE, cib_process_query}, {CRM_OP_QUIT, FALSE, TRUE, FALSE, FALSE, cib_process_quit}, {CRM_OP_PING, FALSE, FALSE, FALSE, FALSE, cib_process_ping}, {CRM_OP_CIB_ERASE, TRUE, TRUE, TRUE, FALSE, cib_process_erase} }; int send_via_callback_channel(struct ha_msg *msg, const char *token); enum cib_errors cib_process_command( const struct ha_msg *request, struct ha_msg **reply, gboolean privileged); gboolean cib_common_callback( IPC_Channel *channel, gpointer user_data, gboolean privileged); enum cib_errors cib_get_operation_id(const struct ha_msg* msg, int *operation); gboolean cib_process_disconnect(IPC_Channel *channel, cib_client_t *cib_client); gboolean cib_client_connect(IPC_Channel *channel, gpointer user_data) { gboolean auth_failed = FALSE; gboolean can_connect = TRUE; gboolean (*client_callback)(IPC_Channel *channel, gpointer user_data) = NULL; cib_client_t *new_client = NULL; crm_debug("Connecting channel"); if (channel == NULL) { crm_err("Channel was NULL"); can_connect = FALSE; } else if (channel->ch_status == IPC_DISCONNECT) { crm_err("Channel was disconnected"); can_connect = FALSE; } else if(user_data == NULL) { crm_err("user_data must contain channel name"); can_connect = FALSE; } else { crm_malloc(new_client, sizeof(crmd_client_t)); new_client->id = NULL; new_client->callback_id = NULL; new_client->source = NULL; new_client->channel = channel; new_client->channel_name = user_data; client_callback = NULL; /* choose callback and do auth based on channel_name */ if(safe_str_eq(new_client->channel_name, "cib_callback")) { client_callback = cib_null_callback; } else { uuid_t client_id; uuid_generate(client_id); crm_malloc(new_client->id, sizeof(char)*36); uuid_unparse(client_id, new_client->id); new_client->id[35] = EOS; uuid_generate(client_id); crm_malloc(new_client->callback_id, sizeof(char)*30); uuid_unparse(client_id, new_client->callback_id); new_client->callback_id[35] = EOS; client_callback = cib_ro_callback; if(safe_str_eq(new_client->channel_name, "cib_rw")) { client_callback = cib_rw_callback; } } } if(auth_failed) { crm_err("Connection to %s channel failed authentication", (char *)user_data); can_connect = FALSE; } if(can_connect == FALSE) { if(new_client) { crm_free(new_client->id); crm_free(new_client->callback_id); } crm_free(new_client); return FALSE; } channel->ops->set_recv_qlen(channel, 100); channel->ops->set_send_qlen(channel, 100); if(client_callback != NULL) { new_client->source = G_main_add_IPC_Channel( G_PRIORITY_LOW, channel, FALSE, client_callback, new_client, default_ipc_connection_destroy); } if(client_callback != cib_null_callback) { /* send msg to client with uuid to use when signing up for * callback channel */ struct ha_msg *reg_msg = ha_msg_new(3); ha_msg_add(reg_msg, F_CIB_OPERATION, CRM_OP_REGISTER); ha_msg_add(reg_msg, F_CIB_CLIENTID, new_client->id); ha_msg_add( reg_msg, F_CIB_CALLBACK_TOKEN, new_client->callback_id); msg2ipcchan(reg_msg, channel); ha_msg_del(reg_msg); /* make sure we can find ourselves later for sync calls * redirected to the master instance */ g_hash_table_insert(client_list, new_client->id, new_client); } crm_info("Channel %s connected for client %s", new_client->channel_name, new_client->id); return TRUE; } gboolean cib_rw_callback(IPC_Channel *channel, gpointer user_data) { return cib_common_callback(channel, user_data, TRUE); } gboolean cib_ro_callback(IPC_Channel *channel, gpointer user_data) { return cib_common_callback(channel, user_data, FALSE); } gboolean cib_null_callback(IPC_Channel *channel, gpointer user_data) { struct ha_msg *op_request = NULL; cib_client_t *cib_client = user_data; cib_client_t *hash_client = NULL; const char *type = NULL; const char *uuid_ticket = NULL; if(cib_client == NULL) { crm_err("Discarding IPC message from unknown source" " on callback channel."); return FALSE; } while(channel->ops->is_message_pending(channel)) { if (channel->ch_status == IPC_DISCONNECT) { /* The message which was pending for us is that * the IPC status is now IPC_DISCONNECT */ break; } op_request = msgfromIPC_noauth(channel); type = cl_get_string(op_request, F_CIB_OPERATION); if(safe_str_neq(type, CRM_OP_REGISTER) ) { crm_warn("Discarding IPC message from %s on callback channel", cib_client->id); ha_msg_del(op_request); continue; } uuid_ticket = cl_get_string(op_request, F_CIB_CALLBACK_TOKEN); hash_client = g_hash_table_lookup(client_list, uuid_ticket); if(hash_client != NULL) { crm_err("Duplicate registration request... disconnecting"); ha_msg_del(op_request); return FALSE; } cib_client->id = crm_strdup(uuid_ticket); g_hash_table_insert(client_list, cib_client->id, cib_client); crm_info("Registered %s on %s channel", cib_client->id, cib_client->channel_name); ha_msg_del(op_request); op_request = ha_msg_new(2); ha_msg_add(op_request, F_CIB_OPERATION, CRM_OP_REGISTER); ha_msg_add(op_request, F_CIB_CLIENTID, cib_client->id); msg2ipcchan(op_request, channel); ha_msg_del(op_request); } return cib_process_disconnect(channel, cib_client); } gboolean cib_common_callback( IPC_Channel *channel, gpointer user_data, gboolean privileged) { int rc = cib_ok; int lpc = 0; int call_type = 0; int call_options = 0; const char *op = NULL; const char *host = NULL; struct ha_msg *op_request = NULL; struct ha_msg *op_reply = NULL; cib_client_t *cib_client = user_data; if(cib_client == NULL) { crm_err("Receieved call from unknown source. Discarding."); return FALSE; } crm_verbose("Callback for %s on %s channel", cib_client->id, cib_client->channel_name); while(channel->ops->is_message_pending(channel)) { if (channel->ch_status == IPC_DISCONNECT) { /* The message which was pending for us is that * the IPC status is now IPC_DISCONNECT */ break; } op_request = msgfromIPC(channel); if (op_request == NULL) { perror("Receive failure:"); break; } crm_verbose("Processing IPC message from %s on %s channel", cib_client->id, cib_client->channel_name); cl_log_message(op_request); lpc++; if(ha_msg_add(op_request, F_CIB_CLIENTID, cib_client->id) != HA_OK) { crm_err("Couldnt add F_CIB_CLIENTID to message"); rc = cib_msg_field_add; } if(rc == cib_ok) { ha_msg_value_int( op_request, F_CIB_CALLOPTS, &call_options); crm_trace("Call options: %.8lx", (long)call_options); host = cl_get_string(op_request, F_CIB_HOST); crm_trace("Destination host: %s", host); op = cl_get_string(op_request, F_CIB_OPERATION); crm_trace("Retrieved command: %s", op); rc = cib_get_operation_id(op_request, &call_type); crm_trace("Command offset: %d", call_type); } if(rc == cib_ok && cib_server_ops[call_type].needs_privileges && privileged == FALSE) { rc = cib_not_authorized; } if(rc != cib_ok) { /* TODO: construct error reply */ crm_err("Pre-processing of command failed: %s", cib_error2string(rc)); } else if(host == NULL && cib_is_master && !(call_options & cib_scope_local)) { crm_info("Processing master %s op locally", op); rc = cib_process_command( op_request, &op_reply, privileged); } else if((host == NULL && (call_options & cib_scope_local)) || safe_str_eq(host, cib_our_uname)) { crm_info("Processing %s op locally", op); rc = cib_process_command( op_request, &op_reply, privileged); } else if(host != NULL) { crm_info("Forwarding %s op to %s", op, host); ha_msg_add(op_request, F_CIB_DELEGATED, cib_our_uname); hb_conn->llc_ops->send_ordered_nodemsg( hb_conn, op_request, host); ha_msg_del(op_request); continue; } else { /* send via HA to other nodes */ crm_info("Forwarding %s op to master instance", op); ha_msg_add(op_request, F_CIB_DELEGATED, cib_our_uname); hb_conn->llc_ops->sendclustermsg(hb_conn, op_request); ha_msg_del(op_request); continue; } if(call_options & cib_sync_call) { crm_info("Sending sync reply to %s op", op); if(msg2ipcchan(op_reply, channel) != HA_OK) { rc = cib_reply_failed; } } else { /* send reply via client's callback channel */ crm_info("Sending async reply to %s op", op); rc = send_via_callback_channel( op_reply, cib_client->callback_id); } if(rc == cib_ok && cib_server_ops[call_type].modifies_cib && !(call_options & cib_scope_local)) { /* send via HA to other nodes */ crm_info("Forwarding %s op to all instances", op); ha_msg_add(op_request, F_CIB_GLOBAL_UPDATE, "true"); hb_conn->llc_ops->sendclustermsg(hb_conn, op_request); } else { if(call_options & cib_scope_local ) { crm_debug("Request not broadcast : local scope"); } if(cib_server_ops[call_type].modifies_cib == FALSE) { crm_debug("Request not broadcast : R/O call"); } if(rc != cib_ok) { crm_debug("Request not broadcast : call failed : %s", cib_error2string(rc)); } } ha_msg_del(op_request); ha_msg_del(op_reply); } crm_verbose("Processed %d messages", lpc); return cib_process_disconnect(channel, cib_client); } enum cib_errors cib_process_command(const struct ha_msg *request, struct ha_msg **reply, gboolean privileged) { xmlNodePtr input = NULL; const char *input_s = NULL; char *output_s = NULL; xmlNodePtr output = NULL; int call_type = 0; int call_options = 0; enum cib_errors rc = cib_ok; const char *op = NULL; const char *call_id = NULL; const char *section = NULL; /* Start processing the request... */ op = cl_get_string(request, F_CIB_OPERATION); call_id = cl_get_string(request, F_CIB_CALLID); ha_msg_value_int(request, F_CIB_CALLOPTS, &call_options); crm_trace("Processing call id: %s", call_id); rc = cib_get_operation_id(request, &call_type); if(rc == cib_ok && cib_server_ops[call_type].needs_privileges && privileged == FALSE) { /* abort */ rc = cib_not_authorized; } if(rc == cib_ok && cib_server_ops[call_type].needs_section) { crm_trace("Unpacking section"); section = cl_get_string(request, F_CIB_SECTION); } if(rc == cib_ok && cib_server_ops[call_type].needs_data) { crm_trace("Unpacking data in %s", F_CIB_CALLDATA); input_s = cl_get_string(request, F_CIB_CALLDATA); if(input_s != NULL) { crm_trace("Converting to xmlNodePtr"); input = string2xml(input_s); if(input == NULL) { crm_err("Invalid XML input"); rc = CIBRES_CORRUPT; } } } if(rc == cib_ok) { -#ifdef MSG_LOG - if(msg_cib_strm == NULL) { - msg_cib_strm = fopen(DEVEL_DIR"/cib.log", "w"); - } - fprintf(msg_cib_strm, "\n====================\n"); - fprintf(msg_cib_strm, "[Input %s]\t%s\n", op, - dump_xml_formatted(input)); - fflush(msg_cib_strm); -#endif - rc = cib_server_ops[call_type].fn( op, call_options, section, input, &output); - -#ifdef MSG_LOG - fprintf(msg_cib_strm, "[Reply (%s:%s)]\t%s\n", - op, cib_error2string(rc), - dump_xml_formatted(output)); - fflush(msg_cib_strm); -#endif } if(call_options & cib_discard_reply || reply == NULL) { if(reply) *reply = NULL; return rc; } /* make the basic reply */ *reply = ha_msg_new(8); ha_msg_add(*reply, F_TYPE, "cib"); ha_msg_add(*reply, F_CIB_OPERATION, op); ha_msg_add(*reply, F_CIB_CALLID, call_id); { char *tmp = crm_itoa(rc); ha_msg_add(*reply, F_CIB_RC, tmp); crm_free(tmp); } { const char *tmp = cl_get_string(request, F_CIB_CLIENTID); ha_msg_add(*reply, F_CIB_CLIENTID, tmp); tmp = cl_get_string(request, F_CIB_CALLOPTS); ha_msg_add(*reply, F_CIB_CALLOPTS, tmp); tmp = cl_get_string(request, F_CIB_CALLID); ha_msg_add(*reply, F_CIB_CALLID, tmp); } /* attach the output if necessary */ output_s = dump_xml_unformatted(output); if(output != NULL && output_s == NULL) { crm_err("Currupt output in reply to \"%s\" op",op); rc = cib_output_data; } else if(output_s != NULL && ha_msg_add(*reply, F_CIB_CALLDATA, output_s) != HA_OK) { rc = cib_msg_field_add; } crm_free(output_s); free_xml(output); free_xml(input); return rc; } int send_via_callback_channel(struct ha_msg *msg, const char *token) { cib_client_t *hash_client = NULL; if(msg == NULL) { crm_err("No message to send"); return cib_reply_failed; } else if(token == NULL) { crm_err("No client id token, cant send message"); return cib_missing; } hash_client = g_hash_table_lookup(client_list, token); if(hash_client == NULL) { crm_err("Cannot find client for token %s", token); return cib_client_gone; } else if(hash_client->channel == NULL) { crm_err("Cannot find channel for client %s", token); return cib_client_corrupt; } crm_debug("Delivering reply to client %s", token); if(msg2ipcchan(msg, hash_client->channel) != HA_OK) { crm_err("Delivery of reply to client %s failed", token); return cib_reply_failed; } return cib_ok; } gboolean cib_process_disconnect(IPC_Channel *channel, cib_client_t *cib_client) { if (channel->ch_status == IPC_DISCONNECT && cib_client != NULL) { crm_info("Cleaning up after %s channel disconnect from client %s", cib_client->channel_name, cib_client->id); g_hash_table_remove(client_list, cib_client->id); if(cib_client->source != NULL) { G_main_del_IPC_Channel(cib_client->source); cib_client->source = NULL; } /* crm_free(cib_client->callback_id); */ /* crm_free(cib_client->id); */ crm_free(cib_client); return FALSE; } else if (channel->ch_status == IPC_DISCONNECT) { crm_warn("Unknown client disconnected"); return FALSE; } return TRUE; } gboolean cib_ha_dispatch(IPC_Channel *channel, gpointer user_data) { int lpc = 0; ll_cluster_t *hb_cluster = (ll_cluster_t*)user_data; while(hb_cluster->llc_ops->msgready(hb_cluster)) { lpc++; /* invoke the callbacks but dont block */ hb_cluster->llc_ops->rcvmsg(hb_cluster, 0); } crm_trace("%d HA messages dispatched", lpc); if (channel && (channel->ch_status == IPC_DISCONNECT)) { crm_crit("Lost connection to heartbeat service... exiting"); exit(100); return FALSE; } return TRUE; } void cib_peer_callback(const struct ha_msg* msg, void* private_data) { int is_done = 1; int call_type = 0; int call_options = 0; gboolean process = TRUE; gboolean needs_reply = TRUE; gboolean local_notify = FALSE; enum cib_errors rc = cib_ok; struct ha_msg *op_reply = NULL; const char *originator = cl_get_string(msg, F_ORIG); const char *request_to = cl_get_string(msg, F_CIB_HOST); const char *reply_to = cl_get_string(msg, F_CIB_ISREPLY); const char *update = cl_get_string(msg, F_CIB_GLOBAL_UPDATE); const char *delegated = cl_get_string(msg, F_CIB_DELEGATED); const char *client_id = NULL; if(safe_str_eq(originator, cib_our_uname)) { crm_debug("Discarding message from ourselves"); return; } if(cib_get_operation_id(msg, &call_type) != cib_ok) { crm_err("Invalid operation... discarding msg"); return; } if(request_to != NULL && strlen(request_to) == 0) { request_to = NULL; } if(cib_server_ops[call_type].modifies_cib || (reply_to == NULL && cib_is_master) || request_to != NULL) { is_done = 0; } crm_info("Processing message from peer to %s...", request_to); cl_log_message(msg); if(safe_str_eq(update, "true") && safe_str_eq(reply_to, cib_our_uname)) { crm_debug("Processing global update that originated from us"); needs_reply = FALSE; local_notify = TRUE; } else if(safe_str_eq(update, "true")) { crm_debug("Processing global update"); needs_reply = FALSE; } else if(request_to != NULL && safe_str_eq(request_to, cib_our_uname)) { crm_debug("Processing request sent to us"); } else if(delegated != NULL && cib_is_master == TRUE) { crm_debug("Processing request sent to master instance"); } else if(reply_to != NULL && safe_str_eq(reply_to, cib_our_uname)) { crm_debug("Forward reply sent from %s to local clients", originator); process = FALSE; needs_reply = FALSE; local_notify = TRUE; } else if(delegated != NULL) { crm_debug("Ignoring msg for master instance"); return; } else if(request_to != NULL) { /* this is for a specific instance and we're not it */ crm_debug("Ignoring msg for instance on %s", request_to); return; } else if(reply_to == NULL && cib_is_master == FALSE) { /* this is for the master instance and we're not it */ crm_debug("Ignoring reply to %s", reply_to); return; } else { crm_warn("Nothing for us to do?"); return; } ha_msg_value_int(msg, F_CIB_CALLOPTS, &call_options); crm_trace("Retrieved call options: %d", call_options); if(process) { crm_debug("Performing local processing"); rc = cib_process_command(msg, &op_reply, TRUE); } if(local_notify) { /* send callback to originating child */ cib_client_t *client_obj = NULL; crm_trace("find the client"); if(process == FALSE) { op_reply = ha_msg_copy(msg); } client_id = cl_get_string(msg, F_CIB_CLIENTID); if(client_id != NULL) { client_obj = g_hash_table_lookup( client_list, client_id); } else { crm_err("No client to sent the response to." " F_CIB_CLIENTID not set."); } crm_debug("Sending callback to originator of delegated request"); if(client_obj != NULL) { if(is_done == 0) { crm_debug("Sending local modify response"); } else { crm_debug("Sending master response"); } if(call_options & cib_sync_call) { crm_debug("Sending sync response: %d", call_options); msg2ipcchan(op_reply, client_obj->channel); } else { crm_debug("Sending async response"); send_via_callback_channel( op_reply, client_obj->callback_id); } } else { crm_warn("Client %s may have left us", client_id); } if(process == FALSE) { ha_msg_del(op_reply); } } if(needs_reply == FALSE) { /* nothing more to do... * this was a non-originating slave update */ crm_debug("Completed slave update"); return; } crm_trace("add the originator to message"); ha_msg_add(op_reply, F_CIB_ISREPLY, originator); /* from now on we are the server */ if(rc == cib_ok && cib_server_ops[call_type].modifies_cib && !(call_options & cib_scope_local)) { /* this (successful) call modified the CIB _and_ the * change needs to be broadcast... * send via HA to other nodes */ crm_debug("Sending update request to everyone"); hb_conn->llc_ops->sendclustermsg(hb_conn, op_reply); } else { /* send reply via HA to originating node */ crm_debug("Sending request result to originator only"); hb_conn->llc_ops->send_ordered_nodemsg( hb_conn, op_reply, originator); } return; } enum cib_errors cib_get_operation_id(const struct ha_msg* msg, int *operation) { int lpc = 0; int max_msg_types = DIMOF(cib_server_ops); const char *op = cl_get_string(msg, F_CIB_OPERATION); for (lpc = 0; lpc < max_msg_types; lpc++) { if (safe_str_eq(op, cib_server_ops[lpc].operation)) { *operation = lpc; return cib_ok; } } crm_err("Operation %s is not valid", op); *operation = -1; return cib_operation; } diff --git a/crm/cib/cibmon.c b/crm/cib/cibmon.c new file mode 100644 index 0000000000..36b43a8202 --- /dev/null +++ b/crm/cib/cibmon.c @@ -0,0 +1,449 @@ +/* $Id: cibmon.c,v 1.1 2004/12/10 20:07:07 andrew Exp $ */ + +/* + * Copyright (C) 2004 Andrew Beekhof + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include + +#include + +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include +#include /* someone complaining about _ha_msg_mod not being found */ +#include + +FILE *msg_cib_strm = NULL; + +int exit_code = cib_ok; + +GMainLoop *mainloop = NULL; +const char *crm_system_name = "cibmon"; +void usage(const char *cmd, int exit_status); +void cib_connection_destroy(gpointer user_data); + +void cibmon_pre_notify(const char *event, struct ha_msg *msg); +void cibmon_post_notify(const char *event, struct ha_msg *msg); +void cibmon_update_confirm(const char *event, struct ha_msg *msg); + +cib_t *the_cib = NULL; + +#define OPTARGS "V?pPUam:i" + +gboolean intermediate_changes = FALSE; +gboolean pre_notify = FALSE; +gboolean post_notify = FALSE; +gboolean update_notify = FALSE; +int max_failures = 30; + +int +main(int argc, char **argv) +{ + int option_index = 0; + int argerr = 0; + int flag; + int level = 0; + int attempts = 0; + + static struct option long_options[] = { + /* Top-level Options */ + {"verbose", 0, 0, 'V'}, + {"help", 0, 0, '?'}, + {"pre", 0, 0, 'p'}, + {"post", 0, 0, 'P'}, + {"update", 0, 0, 'U'}, + {"all", 0, 0, 'a'}, + {"intermediate", 0, 0, 'i'}, + {"max-conn-fail",1, 0, 'm'}, + {0, 0, 0, 0} + }; + + if(argc < 2) { + usage(crm_system_name, LSB_EXIT_EINVAL); + } + + /* Redirect messages from glib functions to our handler */ + g_log_set_handler(NULL, + G_LOG_LEVEL_ERROR | G_LOG_LEVEL_CRITICAL + | G_LOG_LEVEL_WARNING | G_LOG_LEVEL_MESSAGE + | G_LOG_LEVEL_INFO | G_LOG_LEVEL_DEBUG + | G_LOG_FLAG_RECURSION | G_LOG_FLAG_FATAL, + cl_glib_msg_handler, NULL); + /* and for good measure... */ + g_log_set_always_fatal((GLogLevelFlags)0); + + cl_log_set_entity(crm_system_name); + cl_log_set_facility(LOG_LOCAL7); + + /* docs say only do this once, but in their code they do it every time! */ + xmlInitParser(); + msg_cib_strm = fopen(DEVEL_DIR"/cibmon.log", "w"); + + while (1) { + flag = getopt_long(argc, argv, OPTARGS, + long_options, &option_index); + if (flag == -1) + break; + + switch(flag) { + case 0: + printf("option %s", + long_options[option_index].name); + if (optarg) + printf(" with arg %s", optarg); + printf("\n"); + printf("Long option (--%s) is not (yet?) properly supported\n", + long_options[option_index].name); + ++argerr; + case 'V': + level = get_crm_log_level(); + cl_log_enable_stderr(TRUE); + set_crm_log_level(level+1); + break; + case '?': + usage(crm_system_name, LSB_EXIT_OK); + break; + case 'm': + max_failures = crm_atoi(optarg, "30"); + break; + case 'a': + pre_notify = TRUE; + post_notify = TRUE; + update_notify = TRUE; + break; + case 'p': + pre_notify = TRUE; + break; + case 'P': + post_notify = TRUE; + break; + case 'U': + update_notify = TRUE; + break; + case 'i': + intermediate_changes = TRUE; + break; + default: + printf("Argument code 0%o (%c)" + " is not (?yet?) supported\n", + flag, flag); + ++argerr; + break; + } + } + + if (optind < argc) { + printf("non-option ARGV-elements: "); + while (optind < argc) + printf("%s ", argv[optind++]); + printf("\n"); + } + + if (optind > argc) { + ++argerr; + } + + if (argerr) { + usage(crm_system_name, LSB_EXIT_GENERIC); + } + + the_cib = cib_new(); + + do { + if(attempts != 0) { + sleep(1); + } + exit_code = the_cib->cmds->signon(the_cib, cib_query); + + } while(exit_code == cib_connection && attempts++ < max_failures); + + if(exit_code != cib_ok) { + crm_err("Signon to CIB failed: %s", + cib_error2string(exit_code)); + fprintf(stderr, "Signon to CIB failed: %s\n", + cib_error2string(exit_code)); + } + + if(exit_code == cib_ok && pre_notify) { + exit_code = the_cib->cmds->add_notify_callback( + the_cib, T_CIB_PRE_NOTIFY, cibmon_pre_notify); + + if(exit_code != cib_ok) { + crm_err("Failed to set %s callback: %s", + T_CIB_PRE_NOTIFY, cib_error2string(exit_code)); + fprintf(stderr, "Failed to set %s callback: %s", + T_CIB_PRE_NOTIFY, cib_error2string(exit_code)); + } + } + if(exit_code == cib_ok && post_notify) { + exit_code = the_cib->cmds->add_notify_callback( + the_cib, T_CIB_POST_NOTIFY, cibmon_post_notify); + + if(exit_code != cib_ok) { + crm_err("Failed to set %s callback: %s", + T_CIB_POST_NOTIFY, cib_error2string(exit_code)); + fprintf(stderr, "Failed to set %s callback: %s", + T_CIB_POST_NOTIFY, cib_error2string(exit_code)); + } + } + if(exit_code == cib_ok && update_notify) { + exit_code = the_cib->cmds->add_notify_callback( + the_cib, T_CIB_UPDATE_CONFIRM, cibmon_update_confirm); + + if(exit_code != cib_ok) { + crm_err("Failed to set %s callback: %s", + T_CIB_UPDATE_CONFIRM, cib_error2string(exit_code)); + fprintf(stderr, "Failed to set %s callback: %s", + T_CIB_UPDATE_CONFIRM, cib_error2string(exit_code)); + } + } + + if(exit_code != cib_ok) { + crm_err("Setup failed, could not monitor CIB actions"); + fprintf(stderr, "Setup failed, could not monitor CIB actions\n"); + fflush(stderr); + fflush(stdout); + return -exit_code; + } + + mainloop = g_main_new(FALSE); + crm_info("Starting mainloop"); + g_main_run(mainloop); + crm_debug("%s exiting normally", crm_system_name); + fflush(stderr); + return -exit_code; +} + + +void +usage(const char *cmd, int exit_status) +{ + FILE *stream; + + stream = exit_status != 0 ? stderr : stdout; +#if 0 + fprintf(stream, "usage: %s [-?Vio] command\n" + "\twhere necessary, XML data will be expected using -X" + " or on STDIN if -X isnt specified\n", cmd); + + fprintf(stream, "Options\n"); + fprintf(stream, "\t--%s (-%c) \tid of the object being operated on\n", + XML_ATTR_ID, 'i'); + fprintf(stream, "\t--%s (-%c) \tobject type being operated on\n", + "obj_type", 'o'); + fprintf(stream, "\t--%s (-%c)\tturn on debug info." + " additional instance increase verbosity\n", "verbose", 'V'); + fprintf(stream, "\t--%s (-%c)\tthis help message\n", "help", '?'); + fprintf(stream, "\nCommands\n"); + fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_ERASE, 'E'); + fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_QUERY, 'Q'); + fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_CREATE, 'C'); + fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_REPLACE,'R'); + fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_UPDATE, 'U'); + fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_DELETE, 'D'); + fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_BUMP, 'B'); + fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_ISMASTER,'M'); + fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_SYNC, 'S'); + fprintf(stream, "\nXML data\n"); + fprintf(stream, "\t--%s (-%c) \t\n", "xml", 'X'); + fprintf(stream, "\nAdvanced Options\n"); + fprintf(stream, "\t--%s (-%c)\tsend command to specified host." + " Applies to %s and %s commands only\n", "host", 'h', + CRM_OP_CIB_QUERY, CRM_OP_CIB_SYNC); + fprintf(stream, "\t--%s (-%c)\tcommand only takes effect locally" + " on the specified host\n", "local", 'l'); + fprintf(stream, "\t--%s (-%c)\twait for call to complete before" + " returning\n", "sync-call", 's'); +#endif + fflush(stream); + + exit(exit_status); +} + +void +cib_connection_destroy(gpointer user_data) +{ + crm_err("Connection to the CIB terminated... exiting"); + g_main_quit(mainloop); + return; +} + +int update_depth = 0; +gboolean last_notify_pre = TRUE; + +void +cibmon_pre_notify(const char *event, struct ha_msg *msg) +{ + int rc = -1; + const char *op = cl_get_string(msg, F_CIB_OPERATION); + const char *id = cl_get_string(msg, F_CIB_OBJID); + const char *type = cl_get_string(msg, F_CIB_OBJTYPE); + const char *update_s = cl_get_string(msg, F_CIB_UPDATE); + const char *pre_update_s = cl_get_string(msg, F_CIB_EXISTING); + + xmlNodePtr update = string2xml(update_s); + xmlNodePtr pre_update = string2xml(pre_update_s); + char *xml_text = dump_xml_formatted(update); + + ha_msg_value_int(msg, F_CIB_RC, &rc); + + update_depth++; + last_notify_pre = TRUE; + + if(update_depth > 1 && intermediate_changes == FALSE) { + crm_trace("Ignoring intermediate update"); + return; + } + + if(update == NULL) { + crm_verbose("Performing operation %s (on section=%s)", op, type); + fprintf(msg_cib_strm, "[%s] Performing %s (to %s)\n", + event, op, crm_str(type)); + + } else { + crm_verbose("Performing %s on <%s%s%s>", + op, type, id?" id=":"", id?id:""); + fprintf(msg_cib_strm, "[%s] Performing %s to <%s%s%s>." + " Update follows\n%s\n", + event, op, crm_str(type), id?" id=":"", id?id:"", + xml_text); + } + crm_free(xml_text); + + xml_text = dump_xml_formatted(pre_update); + fprintf(msg_cib_strm, "[%s] Existing object\n%s\n", + event, xml_text); + + fflush(msg_cib_strm); + crm_free(xml_text); +} + + +void +cibmon_post_notify(const char *event, struct ha_msg *msg) +{ + int rc = -1; + const char *op = cl_get_string(msg, F_CIB_OPERATION); + const char *id = cl_get_string(msg, F_CIB_OBJID); + const char *type = cl_get_string(msg, F_CIB_OBJTYPE); + const char *update_s = cl_get_string(msg, F_CIB_UPDATE); + const char *output_s = cl_get_string(msg, F_CIB_UPDATE_RESULT); + + xmlNodePtr output = string2xml(output_s); + xmlNodePtr update = string2xml(update_s); + + char *xml_text = dump_xml_formatted(output); + + update_depth--; + if(last_notify_pre == FALSE + && update_depth > 0 + && intermediate_changes == FALSE) { + crm_trace("Ignoring intermediate update"); + return; + } + + last_notify_pre = FALSE; + ha_msg_value_int(msg, F_CIB_RC, &rc); + + if(update == NULL) { + if(rc == cib_ok) { + crm_verbose("Operation %s (to section=%s) completed", + op, crm_str(type)); + fprintf(msg_cib_strm, "[%s] %s (to %s) completed\n", + event, op, crm_str(type)); + + } else { + crm_warn("Operation %s (to section=%s) FAILED: (%d) %s", + op, crm_str(type), rc, cib_error2string(rc)); + fprintf(msg_cib_strm, "[%s] %s (to %s) FAILED: (%d) %s\n", + event, op, crm_str(type), rc, cib_error2string(rc)); + } + + } else { + if(rc == cib_ok) { + crm_verbose("Completed %s of <%s%s%s>", + op, type, id?" id=":"", id); + fprintf(msg_cib_strm, "[%s] Operation %s to <%s%s%s> completed.\n", + event, op, crm_str(type), id?" id=":"", id?id:""); + + } else { + crm_warn("%s of <%s%s%s> FAILED: (%d) %s", op, type, + id?" id=":"", id?id:"", rc, cib_error2string(rc)); + fprintf(msg_cib_strm, "[%s] Operation %s to <%s %s%s> FAILED: (%d) %s\n", + event, op, crm_str(type), id?" id=":"", id?id:"", + rc, cib_error2string(rc)); + } + } + fprintf(msg_cib_strm, "[%s] Operation %s result:\n%s\n", + event, op, xml_text); + crm_free(xml_text); + + fflush(msg_cib_strm); +} + +void +cibmon_update_confirm(const char *event, struct ha_msg *msg) +{ + int rc = -1; + const char *op = cl_get_string(msg, F_CIB_OPERATION); + const char *id = cl_get_string(msg, F_CIB_OBJID); + const char *type = cl_get_string(msg, F_CIB_OBJTYPE); + + ha_msg_value_int(msg, F_CIB_RC, &rc); + + if(id == NULL) { + if(rc == cib_ok) { + fprintf(msg_cib_strm, "[%s] %s (to section=%s) confirmed.\n", + event, op, crm_str(type)); + } else { + fprintf(msg_cib_strm, "[%s] %s (to section=%s) ABORTED: (%d) %s\n", + event, op, crm_str(type), + rc, cib_error2string(rc)); + } + + } else { + if(rc == cib_ok) { + fprintf(msg_cib_strm, "[%s] %s (to <%s%s%s>) confirmed\n", + event, op, crm_str(type), id?" id=":"", id?id:""); + } else { + fprintf(msg_cib_strm, "[%s] %s (to <%s%s%s>) ABORTED: (%d) %s\n", + event, op, crm_str(type), id?" id=":"", id?id:"", + rc, cib_error2string(rc)); + } + } + fprintf(msg_cib_strm, "\n=================================\n\n"); + + fflush(msg_cib_strm); +} diff --git a/crm/cib/messages.c b/crm/cib/messages.c index 5b645b3bbb..7f8dfd2bef 100644 --- a/crm/cib/messages.c +++ b/crm/cib/messages.c @@ -1,630 +1,627 @@ -/* $Id: messages.c,v 1.7 2004/12/09 14:47:21 andrew Exp $ */ +/* $Id: messages.c,v 1.8 2004/12/10 20:07:07 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include extern const char *cib_our_uname; enum cib_errors updateList( xmlNodePtr local_cib, xmlNodePtr update_command, xmlNodePtr failed, int operation, const char *section); xmlNodePtr createCibFragmentAnswer(const char *section, xmlNodePtr failed); gboolean replace_section( const char *section, xmlNodePtr tmpCib, xmlNodePtr command); gboolean check_generation(xmlNodePtr newCib, xmlNodePtr oldCib); gboolean update_results( xmlNodePtr failed, xmlNodePtr target, int operation, int return_code); enum cib_errors cib_update_counter( xmlNodePtr xml_obj, const char *field, gboolean reset); enum cib_errors cib_process_default( const char *op, int options, const char *section, xmlNodePtr input, xmlNodePtr *answer) { enum cib_errors result = cib_ok; crm_debug("Processing \"%s\" event", op); if(answer != NULL) *answer = NULL; if(op == NULL) { result = cib_operation; crm_err("No operation specified\n"); } else if(strcmp(CRM_OP_NOOP, op) == 0) { ; } else { result = cib_NOTSUPPORTED; crm_err("Action [%s] is not supported by the CIB", op); } return result; } enum cib_errors cib_process_quit( const char *op, int options, const char *section, xmlNodePtr input, xmlNodePtr *answer) { enum cib_errors result = cib_ok; crm_debug("Processing \"%s\" event", op); - cib_pre_notify(op, NULL, NULL, get_the_CIB()); + cib_pre_notify(op, get_the_CIB(), NULL); crm_warn("The CRMd has asked us to exit... complying"); exit(0); return result; } enum cib_errors cib_process_readwrite( const char *op, int options, const char *section, xmlNodePtr input, xmlNodePtr *answer) { enum cib_errors result = cib_ok; crm_debug("Processing \"%s\" event", op); if(safe_str_eq(op, CRM_OP_CIB_ISMASTER)) { if(cib_is_master == TRUE) { result = cib_ok; } else { result = cib_not_master; } return result; } - cib_pre_notify(op, NULL, NULL, NULL); + cib_pre_notify(op, get_the_CIB(), NULL); if(safe_str_eq(op, CRM_OP_CIB_MASTER)) { crm_info("We are now in R/W mode"); cib_is_master = TRUE; } else { crm_info("We are now in R/O mode"); cib_is_master = FALSE; } - cib_post_notify(op, NULL, NULL, NULL, result, NULL); + cib_post_notify(op, NULL, result, NULL); return result; } enum cib_errors cib_process_ping( const char *op, int options, const char *section, xmlNodePtr input, xmlNodePtr *answer) { enum cib_errors result = cib_ok; crm_debug("Processing \"%s\" event", op); if(answer != NULL) *answer = NULL; *answer = createPingAnswerFragment(CRM_SYSTEM_CIB, "ok"); return result; } enum cib_errors cib_process_query( const char *op, int options, const char *section, xmlNodePtr input, xmlNodePtr *answer) { xmlNodePtr cib = NULL; xmlNodePtr obj_root = NULL; enum cib_errors result = cib_ok; if(answer != NULL) *answer = NULL; crm_debug("Processing \"%s\" event", op); crm_verbose("Handling a query for section=%s of the cib", section); *answer = create_xml_node(NULL, XML_TAG_FRAGMENT); set_xml_property_copy(*answer, XML_ATTR_SECTION, section); set_xml_property_copy(*answer, "generated_on", cib_our_uname); if (safe_str_eq("all", section)) { section = NULL; } obj_root = get_object_root(section, get_the_CIB()); if(obj_root == NULL) { result = cib_NOTEXISTS; } else if(obj_root == get_the_CIB()) { add_node_copy(*answer, obj_root); } else { cib = create_xml_node(*answer, XML_TAG_CIB); add_node_copy(cib, obj_root); copy_in_properties(cib, get_the_CIB()); } return result; } enum cib_errors cib_process_erase( const char *op, int options, const char *section, xmlNodePtr input, xmlNodePtr *answer) { xmlNodePtr tmpCib = NULL; enum cib_errors result = cib_ok; crm_debug("Processing \"%s\" event", op); if(answer != NULL) *answer = NULL; tmpCib = createEmptyCib(); copy_in_properties(tmpCib, get_the_CIB()); - cib_pre_notify(op, tmpCib->name, NULL, tmpCib); + cib_pre_notify(op, get_the_CIB(), tmpCib); cib_update_counter(tmpCib, XML_ATTR_NUMUPDATES, TRUE); if(activateCibXml(tmpCib, CIB_FILENAME) < 0) { result = cib_ACTIVATION; } - cib_post_notify(op, tmpCib->name, NULL, NULL, result, get_the_CIB()); + cib_post_notify(op, NULL, result, get_the_CIB()); *answer = createCibFragmentAnswer(NULL, NULL); return result; } enum cib_errors cib_process_bump( const char *op, int options, const char *section, xmlNodePtr input, xmlNodePtr *answer) { xmlNodePtr tmpCib = NULL; enum cib_errors result = cib_ok; crm_debug("Processing \"%s\" event", op); if(answer != NULL) *answer = NULL; tmpCib = copy_xml_node_recursive(the_cib); - cib_pre_notify(op, tmpCib->name, NULL, NULL); + cib_pre_notify(op, get_the_CIB(), NULL); crm_verbose("Handling a %s for section=%s of the cib", CRM_OP_CIB_BUMP, section); cib_update_counter(tmpCib, XML_ATTR_GENERATION, FALSE); cib_update_counter(tmpCib, XML_ATTR_NUMUPDATES, TRUE); if(activateCibXml(tmpCib, CIB_FILENAME) < 0) { result = cib_ACTIVATION; } - cib_post_notify(op, tmpCib->name, NULL, NULL, result, get_the_CIB()); + cib_post_notify(op, NULL, result, get_the_CIB()); *answer = createCibFragmentAnswer(NULL, NULL); return result; } enum cib_errors cib_update_counter(xmlNodePtr xml_obj, const char *field, gboolean reset) { char *new_value = NULL; char *old_value = NULL; int int_value = -1; /* modify the timestamp */ set_node_tstamp(xml_obj); if(reset == FALSE) { old_value = xmlGetProp(xml_obj, field); } if(old_value != NULL) { crm_malloc(new_value, 128*(sizeof(char))); int_value = atoi(old_value); sprintf(new_value, "%d", ++int_value); } else { new_value = crm_strdup("1"); } crm_trace("%s %d(%s)->%s", field, int_value, crm_str(old_value), crm_str(new_value)); set_xml_property_copy(xml_obj, field, new_value); crm_free(new_value); return cib_ok; } enum cib_errors cib_process_replace( const char *op, int options, const char *section, xmlNodePtr input, xmlNodePtr *answer) { gboolean verbose = FALSE; xmlNodePtr tmpCib = NULL; xmlNodePtr cib_update = NULL; xmlNodePtr the_update = NULL; const char *section_name = section; enum cib_errors result = cib_ok; crm_debug("Processing \"%s\" event", op); if(answer != NULL) *answer = NULL; if (options & cib_verbose) { verbose = TRUE; } if(safe_str_eq("all", section)) { section = NULL; } cib_update = find_xml_node(input, XML_TAG_CIB); if (cib_update == NULL) { result = cib_NOOBJECT; } else if (section == NULL) { tmpCib = copy_xml_node_recursive(cib_update); the_update = cib_update; section_name = tmpCib->name; } else { tmpCib = copy_xml_node_recursive(get_the_CIB()); replace_section(section, tmpCib, input); the_update = get_object_root(section, cib_update); } - cib_pre_notify(op, section_name, NULL, the_update); + cib_pre_notify(op, get_object_root(section, get_the_CIB()), the_update); cib_update_counter(tmpCib, XML_ATTR_NUMUPDATES, FALSE); if (result == cib_ok && activateCibXml(tmpCib, CIB_FILENAME) < 0) { crm_warn("Replacment of section=%s failed", section); result = cib_ACTIVATION; } if (verbose || result != cib_ok) { *answer = createCibFragmentAnswer(section, NULL); } - cib_post_notify(op, section_name, NULL, the_update, - result, get_object_root(section, get_the_CIB())); + cib_post_notify(op, the_update, result, + get_object_root(section, get_the_CIB())); return result; } enum cib_errors cib_process_modify( const char *op, int options, const char *section, xmlNodePtr input, xmlNodePtr *answer) { gboolean verbose = FALSE; enum cib_errors result = cib_ok; const char *section_name = section; xmlNodePtr failed = NULL; xmlNodePtr cib_update = NULL; xmlNodePtr the_update = NULL; int cib_update_op = CIB_OP_NONE; xmlNodePtr tmpCib = NULL; char *xml_text = NULL; crm_debug("Processing \"%s\" event", op); failed = create_xml_node(NULL, XML_TAG_FAILED); if (strcmp(CRM_OP_CIB_CREATE, op) == 0) { cib_update_op = CIB_OP_ADD; } else if (strcmp(CRM_OP_CIB_UPDATE, op) == 0 || strcmp(CRM_OP_JOINACK, op) == 0 || strcmp(CRM_OP_SHUTDOWN_REQ, op) == 0) { cib_update_op = CIB_OP_MODIFY; } else if (strcmp(CRM_OP_CIB_DELETE, op) == 0) { cib_update_op = CIB_OP_DELETE; } else { crm_err("Incorrect request handler invoked for \"%s\" op", crm_str(op)); return cib_operation; } result = cib_ok; if (options & cib_verbose) { verbose = TRUE; } if(safe_str_eq("all", section)) { section = NULL; } if(input == NULL) { crm_err("Cannot perform modification with no data"); return cib_NOOBJECT; } tmpCib = copy_xml_node_recursive(get_the_CIB()); cib_update = find_xml_node(input, XML_TAG_CIB); /* should we be doing this? */ /* do logging */ + cib_pre_notify(op, get_object_root(section, get_the_CIB()), the_update); + /* make changes to a temp copy then activate */ if(section == NULL) { /* order is no longer important here */ section_name = tmpCib->name; the_update = cib_update; - cib_pre_notify(op, tmpCib->name, NULL, the_update); result = updateList(tmpCib, input, failed, cib_update_op, XML_CIB_TAG_NODES); if(result == cib_ok) { result = updateList( tmpCib, input, failed, cib_update_op, XML_CIB_TAG_RESOURCES); } if(result == cib_ok) { result = updateList( tmpCib, input, failed, cib_update_op, XML_CIB_TAG_CONSTRAINTS); } if(result == cib_ok) { result = updateList(tmpCib, input, failed, cib_update_op, XML_CIB_TAG_STATUS); } } else { the_update = get_object_root(section, cib_update); - - cib_pre_notify(op, section, NULL, the_update); result = updateList(tmpCib, input, failed, cib_update_op, section); } crm_trace("Activating temporary CIB"); cib_update_counter(tmpCib, XML_ATTR_NUMUPDATES, FALSE); if (result == cib_ok && activateCibXml(tmpCib, CIB_FILENAME) < 0) { result = cib_ACTIVATION; } else if (result != cib_ok || failed->children != NULL) { if(result == cib_ok) { result = cib_unknown; } crm_xml_info(failed, "CIB Update failures"); xml_text = dump_xml_formatted(failed); - fprintf(msg_cib_strm, "[CIB %s failures]\t%s\n", - op, xml_text); crm_free(xml_text); } if (verbose || failed->children != NULL || result != cib_ok) { *answer = createCibFragmentAnswer(section, failed); } - cib_post_notify(op, section_name, NULL, the_update, - result, get_object_root(section, get_the_CIB())); + cib_post_notify(op, the_update, result, + get_object_root(section, get_the_CIB())); free_xml(failed); return result; } gboolean replace_section(const char *section, xmlNodePtr tmpCib, xmlNodePtr fragment) { xmlNodePtr parent = NULL, cib_updates = NULL, new_section = NULL, old_section = NULL; cib_updates = find_xml_node(fragment, XML_TAG_CIB); /* find the old and new versions of the section */ new_section = get_object_root(section, cib_updates); old_section = get_object_root(section, tmpCib); if(old_section == NULL) { crm_err("The CIB is corrupt, cannot replace missing section %s", section); return FALSE; } else if(new_section == NULL) { crm_err("The CIB is corrupt, cannot set section %s to nothing", section); return FALSE; } parent = old_section->parent; /* unlink and free the old one */ unlink_xml_node(old_section); free_xml(old_section); /* add the new copy */ add_node_copy(parent, new_section); return TRUE; } enum cib_errors updateList(xmlNodePtr local_cib, xmlNodePtr update_fragment, xmlNodePtr failed, int operation, const char *section) { int rc = cib_ok; xmlNodePtr this_section = get_object_root(section, local_cib); xmlNodePtr cib_updates = NULL; xmlNodePtr xml_section = NULL; cib_updates = find_xml_node(update_fragment, XML_TAG_CIB); xml_section = get_object_root(section, cib_updates); if (section == NULL || xml_section == NULL) { crm_err("Section %s not found in message." " CIB update is corrupt, ignoring.", crm_str(section)); return cib_NOSECTION; } if(CIB_OP_NONE > operation > CIB_OP_MAX) { crm_err("Invalid operation on section %s", crm_str(section)); return cib_operation; } set_node_tstamp(this_section); xml_child_iter( xml_section, a_child, NULL, rc = cib_ok; if(operation == CIB_OP_DELETE) { rc = delete_cib_object(this_section, a_child); update_results(failed, a_child, operation, rc); } else if(operation == CIB_OP_MODIFY) { rc = update_cib_object(this_section, a_child, FALSE); update_results(failed, a_child, operation, rc); } else { rc = add_cib_object(this_section, a_child); update_results(failed, a_child, operation, rc); } ); if(rc == cib_ok && failed->children != NULL) { rc = cib_unknown; } return rc; } xmlNodePtr createCibFragmentAnswer(const char *section, xmlNodePtr failed) { xmlNodePtr cib = NULL; xmlNodePtr fragment = NULL; fragment = create_xml_node(NULL, XML_TAG_FRAGMENT); if (section == NULL || strlen(section) == 0 || strcmp("all", section) == 0) { cib = get_the_CIB(); if(cib != NULL) { add_node_copy(fragment, get_the_CIB()); } } else { xmlNodePtr obj_root = get_object_root(section, get_the_CIB()); if(obj_root != NULL) { cib = create_xml_node(fragment, XML_TAG_CIB); add_node_copy(cib, obj_root); copy_in_properties(cib, get_the_CIB()); } } if (failed != NULL && failed->children != NULL) { add_node_copy(fragment, failed); } set_xml_property_copy(fragment, XML_ATTR_SECTION, section); set_xml_property_copy(fragment, "generated_on", cib_our_uname); return fragment; } gboolean check_generation(xmlNodePtr newCib, xmlNodePtr oldCib) { if(cib_compare_generation(newCib, oldCib) >= 0) { return TRUE; } crm_warn("Generation from update is older than the existing one"); return FALSE; } gboolean update_results( xmlNodePtr failed, xmlNodePtr target, int operation, int return_code) { gboolean was_error = FALSE; const char *error_msg = NULL; const char *operation_msg = NULL; xmlNodePtr xml_node = NULL; operation_msg = cib_op2string(operation); if (return_code != cib_ok) { error_msg = cib_error2string(return_code); xml_node = create_xml_node(failed, XML_FAIL_TAG_CIB); was_error = TRUE; add_node_copy(xml_node, target); set_xml_property_copy( xml_node, XML_FAILCIB_ATTR_ID, ID(target)); set_xml_property_copy( xml_node, XML_FAILCIB_ATTR_OBJTYPE, TYPE(target)); set_xml_property_copy( xml_node, XML_FAILCIB_ATTR_OP, operation_msg); set_xml_property_copy( xml_node, XML_FAILCIB_ATTR_REASON, error_msg); crm_debug("Action %s failed: %s (cde=%d)", operation_msg, error_msg, return_code); } else { crm_debug("CIB %s passed", operation_msg); } return was_error; } diff --git a/crm/cib/notify.c b/crm/cib/notify.c index 401ca6e68f..a6bf077a39 100644 --- a/crm/cib/notify.c +++ b/crm/cib/notify.c @@ -1,117 +1,197 @@ -/* $Id: notify.c,v 1.1 2004/12/05 16:14:07 andrew Exp $ */ +/* $Id: notify.c,v 1.2 2004/12/10 20:07:07 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include +#include #include #include -FILE *msg_cib_strm = NULL; +extern GHashTable *client_list; +int pending_updates = 0; + +void cib_notify_client(gpointer key, gpointer value, gpointer user_data); + +void +cib_notify_client(gpointer key, gpointer value, gpointer user_data) +{ + + struct ha_msg *update_msg = user_data; + cib_client_t *client = value; + + if(safe_str_eq(client->channel_name, "cib_callback")) { + crm_trace("Notifying client %s of update", client->id); + if(msg2ipcchan(update_msg, client->channel) != HA_OK) { + crm_err("Notification of client %s failed", client->id); + } + } +} void cib_pre_notify( - const char *op, const char *type, const char *id, xmlNodePtr update) + const char *op, xmlNodePtr existing, xmlNodePtr update) { - char *xml_text = NULL; + struct ha_msg *update_msg = ha_msg_new(6); + const char *id = xmlGetProp(update, XML_ATTR_ID); + const char *type = NULL; - if(update == NULL) { - crm_verbose("Performing oeration %s (on section=%s)", op, type); + ha_msg_add(update_msg, F_TYPE, T_CIB_NOTIFY); + ha_msg_add(update_msg, F_SUBTYPE, T_CIB_PRE_NOTIFY); + ha_msg_add(update_msg, F_CIB_OPERATION, op); - } else { - crm_verbose("Performing %s on <%s %s%s>", - op, type, id?"id=":"", id); + if(id != NULL) { + ha_msg_add(update_msg, F_CIB_OBJID, id); } - - if(msg_cib_strm == NULL) { - msg_cib_strm = fopen(DEVEL_DIR"/cib.log", "w"); + + if(update != NULL) { + ha_msg_add(update_msg, F_CIB_OBJTYPE, update->name); + } else if(existing != NULL) { + ha_msg_add(update_msg, F_CIB_OBJTYPE, existing->name); } - xml_text = dump_xml_formatted(update); - fprintf(msg_cib_strm, "[Request (%s : %s : %s)]\t%s\n", - op, crm_str(type), crm_str(id), xml_text); - crm_free(xml_text); + type = cl_get_string(update_msg, F_CIB_OBJTYPE); + + if(existing != NULL) { + char *existing_s = dump_xml_unformatted(existing); + if(existing_s != NULL) { + ha_msg_add(update_msg, F_CIB_EXISTING, existing_s); + } else { + crm_debug("Update string was NULL (xml=%p)", update); + } + crm_free(existing_s); + } + if(update != NULL) { + char *update_s = dump_xml_unformatted(update); + if(update_s != NULL) { + ha_msg_add(update_msg, F_CIB_UPDATE, update_s); + } else { + crm_debug("Update string was NULL (xml=%p)", update); + } + crm_free(update_s); + } - xml_text = dump_xml_formatted(get_the_CIB()); - fprintf(msg_cib_strm, "[CIB before %s]\t%s\n", op, xml_text); - crm_free(xml_text); + g_hash_table_foreach(client_list, cib_notify_client, update_msg); + + pending_updates++; + + if(update == NULL) { + crm_verbose("Performing operation %s (on section=%s)", + op, type); - fflush(msg_cib_strm); + } else { + crm_verbose("Performing %s on <%s%s%s>", + op, type, id?" id=":"", id?id?""); + } + + ha_msg_del(update_msg); } void cib_post_notify( - const char *op, const char *type, const char *id, xmlNodePtr update, - enum cib_errors result, xmlNodePtr new_obj) + const char *op, xmlNodePtr update, enum cib_errors result, xmlNodePtr new_obj) { - char *xml_text = NULL; + struct ha_msg *update_msg = ha_msg_new(8); + const char *id = xmlGetProp(new_obj, XML_ATTR_ID); + const char *type = NULL; + + ha_msg_add(update_msg, F_TYPE, T_CIB_NOTIFY); + ha_msg_add(update_msg, F_SUBTYPE, T_CIB_POST_NOTIFY); + ha_msg_add(update_msg, F_CIB_OPERATION, op); + ha_msg_add_int(update_msg, F_CIB_RC, result); + + if(id != NULL) { + ha_msg_add(update_msg, F_CIB_OBJID, id); + } + if(update != NULL) { + ha_msg_add(update_msg, F_CIB_OBJTYPE, update->name); + } else if(new_obj != NULL) { + ha_msg_add(update_msg, F_CIB_OBJTYPE, new_obj->name); + } + + type = cl_get_string(update_msg, F_CIB_OBJTYPE); + + if(update != NULL) { + char *update_s = dump_xml_unformatted(update); + if(update_s != NULL) { + ha_msg_add(update_msg, F_CIB_UPDATE, update_s); + } else { + crm_debug("Update string was NULL (xml=%p)", update); + } + crm_free(update_s); + } + if(new_obj != NULL) { + char *new_obj_s = dump_xml_unformatted(new_obj); + if(new_obj_s != NULL) { + ha_msg_add(update_msg, F_CIB_UPDATE_RESULT, new_obj_s); + } else { + crm_debug("Update string was NULL (xml=%p)", new_obj); + } + crm_free(new_obj_s); + } + + g_hash_table_foreach(client_list, cib_notify_client, update_msg); + + pending_updates--; + + if(pending_updates == 0) { + ha_msg_mod(update_msg, F_SUBTYPE, T_CIB_UPDATE_CONFIRM); + g_hash_table_foreach(client_list, cib_notify_client, update_msg); + } + + ha_msg_del(update_msg); if(update == NULL) { if(result == cib_ok) { crm_verbose("Operation %s (on section=%s) completed", op, type); } else { crm_warn("Operation %s (on section=%s) FAILED: (%d) %s", op, type, result, cib_error2string(result)); } } else { if(result == cib_ok) { crm_verbose("Completed %s of <%s %s%s>", op, type, id?"id=":"", id); } else { crm_warn("%s of <%s %s%s> FAILED: %s", op, type, id?"id=":"", id, cib_error2string(result)); } } - - if(msg_cib_strm == NULL) { - msg_cib_strm = fopen(DEVEL_DIR"/cib.log", "w"); - } - - if(new_obj != NULL) { - xml_text = dump_xml_formatted(new_obj); - } - - fprintf(msg_cib_strm, "[Response (%s : %s : %s)]\t%s\n%s\n", - op, crm_str(type), crm_str(id), - cib_error2string(result), xml_text); - crm_free(xml_text); - - fflush(msg_cib_strm); } diff --git a/crm/cib/notify.h b/crm/cib/notify.h index c006c19a97..d3750389fd 100644 --- a/crm/cib/notify.h +++ b/crm/cib/notify.h @@ -1,38 +1,37 @@ -/* $Id: notify.h,v 1.1 2004/12/05 16:14:07 andrew Exp $ */ +/* $Id: notify.h,v 1.2 2004/12/10 20:07:07 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include extern FILE *msg_cib_strm; void cib_pre_notify( - const char *op, const char *type, const char *id, xmlNodePtr update); + const char *op, xmlNodePtr existing, xmlNodePtr update); void cib_post_notify( - const char *op, const char *type, const char *id, xmlNodePtr update, - enum cib_errors result, xmlNodePtr new_obj); + const char *op, xmlNodePtr update, enum cib_errors result, xmlNodePtr new_obj); diff --git a/crm/cib/primatives.c b/crm/cib/primatives.c index d01557eae0..13d94e3e1a 100644 --- a/crm/cib/primatives.c +++ b/crm/cib/primatives.c @@ -1,591 +1,587 @@ -/* $Id: primatives.c,v 1.3 2004/12/05 16:14:07 andrew Exp $ */ +/* $Id: primatives.c,v 1.4 2004/12/10 20:07:07 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* * In case of confusion, this is the memory management policy for * all functions in this file. * * All add/modify functions use copies of supplied data. * It is therefore appropriate that the callers free the supplied data * at some point after the function has finished. * * All delete functions will handle the freeing of deleted data * but not the function arguments. */ void update_node_state(xmlNodePtr existing_node, xmlNodePtr update); /* --- Resource */ int addResource(xmlNodePtr cib, xmlNodePtr anXmlNode) { const char *id = ID(anXmlNode); xmlNodePtr root; if (id == NULL || strlen(id) < 1) { return CIBRES_MISSING_ID; } crm_verbose("Adding " XML_CIB_TAG_RESOURCE " (%s)...", id); root = get_object_root(XML_CIB_TAG_RESOURCES, cib); return add_cib_object(root, anXmlNode); } xmlNodePtr findResource(xmlNodePtr cib, const char *id) { xmlNodePtr root = NULL, ret = NULL; root = get_object_root(XML_CIB_TAG_RESOURCES, cib); ret = find_entity(root, XML_CIB_TAG_RESOURCE, id, FALSE); return ret; } int updateResource(xmlNodePtr cib, xmlNodePtr anXmlNode) { const char *id = ID(anXmlNode); xmlNodePtr root; if (id == NULL || strlen(id) < 1) { return CIBRES_MISSING_ID; } crm_verbose("Updating " XML_CIB_TAG_RESOURCE " (%s)...", id); root = get_object_root(XML_CIB_TAG_RESOURCES, cib); return update_cib_object(root, anXmlNode, FALSE); } int delResource(xmlNodePtr cib, xmlNodePtr delete_spec) { const char *id = ID(delete_spec); xmlNodePtr root; if(id == NULL || strlen(id) == 0) { return CIBRES_MISSING_ID; } crm_verbose("Deleting " XML_CIB_TAG_RESOURCE " (%s)...", id); root = get_object_root(XML_CIB_TAG_RESOURCES, cib); return delete_cib_object(root, delete_spec); } /* --- Constraint */ int addConstraint(xmlNodePtr cib, xmlNodePtr anXmlNode) { const char *id = ID(anXmlNode); xmlNodePtr root; if (id == NULL || strlen(id) < 1) { return CIBRES_MISSING_ID; } crm_verbose("Adding " XML_CIB_TAG_CONSTRAINT " (%s)...", id); root = get_object_root(XML_CIB_TAG_CONSTRAINTS, cib); return add_cib_object(root, anXmlNode); } xmlNodePtr findConstraint(xmlNodePtr cib, const char *id) { xmlNodePtr root = NULL, ret = NULL; root = get_object_root(XML_CIB_TAG_CONSTRAINTS, cib); ret = find_entity(root, XML_CIB_TAG_CONSTRAINT, id, FALSE); return ret; } int updateConstraint(xmlNodePtr cib, xmlNodePtr anXmlNode) { const char *id = ID(anXmlNode); xmlNodePtr root; if (id == NULL || strlen(id) < 1) { return CIBRES_MISSING_ID; } crm_verbose("Updating " XML_CIB_TAG_CONSTRAINT " (%s)...", id); root = get_object_root(XML_CIB_TAG_CONSTRAINTS, cib); return update_cib_object(root, anXmlNode, FALSE); } int delConstraint(xmlNodePtr cib, xmlNodePtr delete_spec) { const char *id = ID(delete_spec); xmlNodePtr root; if(id == NULL || strlen(id) == 0) { return CIBRES_MISSING_ID; } crm_verbose("Deleting " XML_CIB_TAG_CONSTRAINT " (%s)...", id); root = get_object_root(XML_CIB_TAG_CONSTRAINTS, cib); return delete_cib_object(root, delete_spec); } /* --- HaNode */ int addHaNode(xmlNodePtr cib, xmlNodePtr anXmlNode) { const char *id = ID(anXmlNode); xmlNodePtr root; if (id == NULL || strlen(id) < 1) { return CIBRES_MISSING_ID; } crm_verbose("Adding " XML_CIB_TAG_NODE " (%s)...", id); root = get_object_root(XML_CIB_TAG_NODES, cib); return add_cib_object(root, anXmlNode); } xmlNodePtr findHaNode(xmlNodePtr cib, const char *id) { xmlNodePtr root = NULL, ret = NULL; root = get_object_root(XML_CIB_TAG_NODES, cib); ret = find_entity(root, XML_CIB_TAG_NODE, id, FALSE); return ret; } int updateHaNode(xmlNodePtr cib, cibHaNode *anXmlNode) { const char *id = ID(anXmlNode); xmlNodePtr root; if (id == NULL || strlen(id) < 1) { return CIBRES_MISSING_ID; } crm_verbose("Updating " XML_CIB_TAG_NODE " (%s)...", id); root = get_object_root(XML_CIB_TAG_NODES, cib); return update_cib_object(root, anXmlNode, FALSE); } int delHaNode(xmlNodePtr cib, xmlNodePtr delete_spec) { const char *id = ID(delete_spec); xmlNodePtr root; if(id == NULL || strlen(id) == 0) { return CIBRES_MISSING_ID; } crm_verbose("Deleting " XML_CIB_TAG_NODE " (%s)...", id); root = get_object_root(XML_CIB_TAG_CONSTRAINTS, cib); return delete_cib_object(root, delete_spec); } /* --- Status */ int addStatus(xmlNodePtr cib, xmlNodePtr anXmlNode) { const char *id = ID(anXmlNode); xmlNodePtr root; if (id == NULL || strlen(id) < 1) { return CIBRES_MISSING_ID; } crm_verbose("Adding " XML_CIB_TAG_NODE " (%s)...", id); root = get_object_root(XML_CIB_TAG_STATUS, cib); return add_cib_object(root, anXmlNode); } xmlNodePtr findStatus(xmlNodePtr cib, const char *id) { xmlNodePtr root = NULL, ret = NULL; root = get_object_root(XML_CIB_TAG_STATUS, cib); ret = find_entity(root, XML_CIB_TAG_STATE, id, FALSE); return ret; } int updateStatus(xmlNodePtr cib, xmlNodePtr anXmlNode) { const char *id = ID(anXmlNode); xmlNodePtr root; if (id == NULL || strlen(id) < 1) { return CIBRES_MISSING_ID; } crm_verbose("Updating " XML_CIB_TAG_NODE " (%s)...", id); root = get_object_root(XML_CIB_TAG_STATUS, cib); return update_cib_object(root, anXmlNode, FALSE); } int delStatus(xmlNodePtr cib, xmlNodePtr delete_spec) { const char *id = ID(delete_spec); xmlNodePtr root; if(id == NULL || strlen(id) == 0) { return CIBRES_MISSING_ID; } crm_verbose("Deleting " XML_CIB_TAG_STATE " (%s)...", id); root = get_object_root(XML_CIB_TAG_STATUS, cib); return delete_cib_object(root, delete_spec); } int delete_cib_object(xmlNodePtr parent, xmlNodePtr delete_spec) { const char *object_name = NULL; const char *object_id = NULL; xmlNodePtr equiv_node = NULL; int result = cib_ok; if(delete_spec != NULL) { object_name = delete_spec->name; } object_id = xmlGetProp(delete_spec, XML_ATTR_ID); - cib_pre_notify(CRM_OP_CIB_DELETE, object_name, object_id, delete_spec); if(delete_spec == NULL) { result = cib_NOOBJECT; } else if(parent == NULL) { result = cib_NOPARENT; } else if(object_id == NULL) { /* placeholder object */ equiv_node = find_xml_node(parent, object_name); } else { equiv_node = find_entity( parent, object_name, object_id, FALSE); } + cib_pre_notify(CRM_OP_CIB_DELETE, equiv_node, delete_spec); if(result != cib_ok) { ; /* nothing */ } else if(equiv_node == NULL) { result = cib_NOTEXISTS; } else if(delete_spec->children == NULL) { /* only leaves are deleted */ unlink_xml_node(equiv_node); free_xml(equiv_node); equiv_node = NULL; } else { xml_child_iter( delete_spec, child, NULL, int tmp_result = delete_cib_object(equiv_node, child); /* only the first error is likely to be interesting */ if(tmp_result != cib_ok && result == cib_ok) { result = tmp_result; } ); } - cib_post_notify(CRM_OP_CIB_DELETE, delete_spec->name, object_id, - delete_spec, result, equiv_node); + cib_post_notify(CRM_OP_CIB_DELETE, delete_spec, result, equiv_node); return result; } int add_cib_object(xmlNodePtr parent, xmlNodePtr new_obj) { enum cib_errors result = cib_ok; const char *object_name = NULL; const char *object_id = NULL; xmlNodePtr equiv_node = NULL; if(new_obj != NULL) { object_name = new_obj->name; } object_id = xmlGetProp(new_obj, XML_ATTR_ID); - cib_pre_notify(CRM_OP_CIB_CREATE, object_name, object_id, new_obj); if(new_obj == NULL) { result = cib_NOOBJECT; } else if(parent == NULL) { result = cib_NOPARENT; } else if(object_id == NULL) { /* placeholder object */ equiv_node = find_xml_node(parent, object_name); } else { equiv_node = find_entity( parent, object_name, object_id, FALSE); } + cib_pre_notify(CRM_OP_CIB_CREATE, equiv_node, new_obj); if(result != cib_ok) { ; /* do nothing */ } else if(equiv_node != NULL) { result = cib_EXISTS; } else if(add_node_copy(parent, new_obj) == NULL) { result = cib_NODECOPY; } - cib_post_notify(CRM_OP_CIB_CREATE, object_name, object_id, - new_obj, result, new_obj); + cib_post_notify(CRM_OP_CIB_CREATE, new_obj, result, new_obj); return cib_ok; } int update_cib_object(xmlNodePtr parent, xmlNodePtr new_obj, gboolean force) { const char *replace = NULL; const char *object_name = NULL; const char *object_id = NULL; xmlNodePtr equiv_node = NULL; int result = cib_ok; if(new_obj != NULL) { object_name = new_obj->name; } object_id = xmlGetProp(new_obj, XML_ATTR_ID); - cib_pre_notify(CRM_OP_CIB_UPDATE, object_name, object_id, - new_obj); if(new_obj == NULL) { result = cib_NOOBJECT; } else if(parent == NULL) { result = cib_NOPARENT; } else if(object_id == NULL) { /* placeholder object */ equiv_node = find_xml_node(parent, object_name); } else { equiv_node = find_entity(parent, object_name, object_id, FALSE); } + cib_pre_notify(CRM_OP_CIB_UPDATE, equiv_node, new_obj); if(result != cib_ok) { ; /* nothing */ } else if(equiv_node == NULL) { crm_debug("No node to update, creating %s instead", new_obj->name); if(parent == NULL) { crm_warn("Failed to add <%s id=%s> (NULL parent)", object_name, object_id); result = cib_NODECOPY; } else if(add_node_copy(parent, new_obj) == NULL) { crm_warn("Failed to add <%s id=%s>", object_name, object_id); result = cib_NODECOPY; } else { crm_debug("Added <%s id=%s>", object_name, object_id); if(object_id == NULL) { /* placeholder object */ equiv_node = find_xml_node(parent, object_name); } else { equiv_node = find_entity(parent, object_name, object_id, FALSE); } } } else { crm_verbose("Found node <%s id=%s> to update", object_name, object_id); replace = xmlGetProp(new_obj, "replace"); if(replace != NULL) { xmlNodePtr remove = find_xml_node(equiv_node, replace); if(remove != NULL) { crm_debug("Replacing node <%s> in <%s>", replace, equiv_node->name); xmlUnlinkNode(remove); remove->doc = NULL; free_xml(remove); } xmlUnsetProp(new_obj, "replace"); xmlUnsetProp(equiv_node, "replace"); } if(safe_str_eq(XML_CIB_TAG_STATE, object_name)){ update_node_state(equiv_node, new_obj); } else { copy_in_properties(equiv_node, new_obj); } crm_debug("Processing children of <%s id=%s>", object_name, object_id); xml_child_iter( new_obj, a_child, NULL, int tmp_result = 0; crm_debug("Updating child <%s id=%s>", a_child->name, xmlGetProp(a_child, XML_ATTR_ID)); tmp_result = update_cib_object(equiv_node, a_child, force); /* only the first error is likely to be interesting */ if(tmp_result != cib_ok) { crm_err("Error updating child <%s id=%s>", a_child->name, xmlGetProp(a_child, XML_ATTR_ID)); if(result == cib_ok) { result = tmp_result; } } ); } crm_debug("Finished with <%s id=%s>", object_name, object_id); - cib_post_notify(CRM_OP_CIB_UPDATE, object_name, object_id, - new_obj, result, equiv_node); + cib_post_notify(CRM_OP_CIB_UPDATE, new_obj, result, equiv_node); return result; } void update_node_state(xmlNodePtr target, xmlNodePtr update) { const char *source = NULL; xmlAttrPtr prop_iter = NULL; gboolean any_updates = FALSE; gboolean clear_stonith = FALSE; gboolean clear_shutdown = FALSE; prop_iter = update->properties; while(prop_iter != NULL) { const char *local_prop_name = prop_iter->name; const char *local_prop_value = xmlGetProp(update, local_prop_name); if(local_prop_name == NULL) { /* error */ } else if(strcmp(local_prop_name, XML_ATTR_ID) == 0) { } else if(strcmp(local_prop_name, XML_ATTR_TSTAMP) == 0) { } else if(strcmp(local_prop_name, XML_CIB_ATTR_CLEAR_SHUTDOWN) == 0) { clear_shutdown = TRUE; } else if(strcmp(local_prop_name, XML_CIB_ATTR_CLEAR_STONITH) == 0) { clear_stonith = TRUE; clear_shutdown = TRUE; } else if(strcmp(local_prop_name, "source") == 0) { source = local_prop_value; } else { any_updates = TRUE; set_xml_property_copy(target, local_prop_name, local_prop_value); } prop_iter = prop_iter->next; } if(clear_shutdown) { /* unset XML_CIB_ATTR_SHUTDOWN */ crm_verbose("Clearing %s", XML_CIB_ATTR_SHUTDOWN); xmlUnsetProp(target, XML_CIB_ATTR_SHUTDOWN); any_updates = TRUE; } if(clear_stonith) { /* unset XML_CIB_ATTR_STONITH */ crm_verbose("Clearing %s", XML_CIB_ATTR_STONITH); xmlUnsetProp(target, XML_CIB_ATTR_STONITH); any_updates = TRUE; } if(any_updates) { set_node_tstamp(target); set_xml_property_copy(target, "source", source); } } diff --git a/include/crm/cib.h b/include/crm/cib.h index 6847930053..d5095bacc6 100644 --- a/include/crm/cib.h +++ b/include/crm/cib.h @@ -1,237 +1,266 @@ -/* $Id: cib.h,v 1.9 2004/12/09 14:53:41 andrew Exp $ */ +/* $Id: cib.h,v 1.10 2004/12/10 20:07:07 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef CIB__H #define CIB__H #include #include #include enum cib_variant { cib_native, cib_database, cib_edir }; enum cib_state { cib_connected_command, cib_connected_query, cib_disconnected }; enum cib_conn_type { cib_command, cib_query, cib_no_connection }; enum cib_call_options { cib_none = 0x000000, cib_verbose = 0x000001, cib_discard_reply = 0x000004, cib_scope_local = 0x000010, /* cib_scope_global = 0x000020, */ cib_sync_call = 0x000040, /* cib_async_call = 0x000080, */ cib_inhibit_notify= 0x000100 }; #define cib_default_options = cib_none enum cib_errors { cib_ok = 0, cib_operation = -1, cib_create_msg = -2, cib_not_connected = -3, cib_not_authorized = -4, cib_send_failed = -5, cib_reply_failed = -6, cib_return_code = -7, cib_output_ptr = -8, cib_output_data = -9, cib_connection = -10, cib_authentication = -11, cib_missing = -12, cib_variant = -28, CIBRES_MISSING_ID = -13, CIBRES_MISSING_TYPE = -14, CIBRES_MISSING_FIELD = -15, CIBRES_OBJTYPE_MISMATCH = -16, CIBRES_CORRUPT = -17, CIBRES_OTHER = -18, cib_unknown = -19, cib_STALE = -20, cib_EXISTS = -21, cib_NOTEXISTS = -22, cib_ACTIVATION = -23, cib_NOSECTION = -24, cib_NOOBJECT = -25, cib_NOPARENT = -26, cib_NODECOPY = -27, cib_NOTSUPPORTED = -29, cib_registration_msg = -30, cib_callback_token = -31, cib_callback_register = -32, cib_msg_field_add = -33, cib_client_gone = -34, cib_not_master = -35, cib_client_corrupt = -36 }; enum cib_op { CIB_OP_NONE = 0, CIB_OP_ADD, CIB_OP_MODIFY, CIB_OP_DELETE, CIB_OP_MAX }; enum cib_section { cib_section_none, cib_section_all, cib_section_nodes, cib_section_constraints, cib_section_resources, cib_section_crmconfig, cib_section_status }; -#define T_CIB "cib" #define F_CIB_CLIENTID "cib_clientid" #define F_CIB_CALLOPTS "cib_callopt" #define F_CIB_CALLID "cib_callid" #define F_CIB_CALLDATA "cib_calldata" #define F_CIB_OPERATION "cib_op" #define F_CIB_ISREPLY "cib_isreplyto" #define F_CIB_SECTION "cib_section" #define F_CIB_HOST "cib_host" #define F_CIB_RC "cib_rc" #define F_CIB_CALLBACK_TOKEN "cib_callback_token" #define F_CIB_GLOBAL_UPDATE "cib_update" #define F_CIB_DELEGATED "cib_delegated_from" +#define F_CIB_OBJID "cib_object" +#define F_CIB_OBJTYPE "cib_object_type" +#define F_CIB_UPDATE "cib_update" +#define F_CIB_UPDATE_RESULT "cib_update_result" +#define F_CIB_EXISTING "cib_existing_object" + +#define T_CIB "cib" +#define T_CIB_NOTIFY "cib_notify" +/* notify sub-types */ +#define T_CIB_PRE_NOTIFY "cib_pre_notify" +#define T_CIB_POST_NOTIFY "cib_post_notify" +#define T_CIB_UPDATE_CONFIRM "cib_update_confirmation" typedef struct cib_s cib_t; typedef struct cib_api_operations_s { int (*variant_op)( cib_t *cib, const char *op, const char *host, const char *section, xmlNodePtr data, xmlNodePtr *output_data, int call_options); int (*signon) (cib_t *cib, enum cib_conn_type type); int (*signoff)(cib_t *cib); int (*free) (cib_t *cib); int (*set_op_callback)( cib_t *cib, void (*callback)( const struct ha_msg *msg, int callid , int rc, xmlNodePtr output)); + int (*add_notify_callback)( + cib_t *cib, const char *event, void (*callback)( + const char *event, struct ha_msg *msg)); + + int (*del_notify_callback)( + cib_t *cib, const char *event, void (*callback)( + const char *event, struct ha_msg *msg)); + int (*set_connection_dnotify)( cib_t *cib, void (*dnotify)(gpointer user_data)); IPC_Channel *(*channel)(cib_t* cib); int (*inputfd)(cib_t* cib); int (*noop)(cib_t *cib, int call_options); int (*ping)( cib_t *cib, xmlNodePtr *output_data, int call_options); int (*query)(cib_t *cib, const char *section, xmlNodePtr *output_data, int call_options); int (*query_from)( cib_t *cib, const char *host, const char *section, xmlNodePtr *output_data, int call_options); gboolean (*is_master) (cib_t *cib); int (*set_master)(cib_t *cib, int call_options); int (*set_slave) (cib_t *cib, int call_options); int (*set_slave_all)(cib_t *cib, int call_options); int (*sync)(cib_t *cib, const char *section, int call_options); int (*sync_from)( cib_t *cib, const char *host, const char *section, int call_options); int (*bump_epoch)(cib_t *cib, int call_options); int (*create)(cib_t *cib, const char *section, xmlNodePtr data, xmlNodePtr *output_data, int call_options) ; int (*modify)(cib_t *cib, const char *section, xmlNodePtr data, xmlNodePtr *output_data, int call_options) ; int (*replace)(cib_t *cib, const char *section, xmlNodePtr data, xmlNodePtr *output_data, int call_options) ; int (*delete)(cib_t *cib, const char *section, xmlNodePtr data, xmlNodePtr *output_data, int call_options) ; int (*erase)( cib_t *cib, xmlNodePtr *output_data, int call_options); int (*quit)(cib_t *cib, int call_options); gboolean (*msgready)(cib_t* cib); int (*rcvmsg)(cib_t* cib, int blocking); gboolean (*dispatch)(IPC_Channel *channel, gpointer user_data); } cib_api_operations_t; struct cib_s { enum cib_state state; enum cib_conn_type type; int call_id; void *variant_opaque; - GList *notify_callback_list; + GList *notify_list; void (*op_callback)(const struct ha_msg *msg, int call_id, int rc, xmlNodePtr output); cib_api_operations_t *cmds; }; +typedef struct cib_notify_client_s +{ + const char *event; + const char *obj_id; /* implement one day */ + const char *obj_type; /* implement one day */ + void (*callback)( + const char *event, struct ha_msg *msg); + +} cib_notify_client_t; + /* Core functions */ extern cib_t *cib_new(void); extern gboolean startCib(const char *filename); extern xmlNodePtr get_cib_copy(cib_t *cib); extern xmlNodePtr cib_get_generation(cib_t *cib); extern int cib_compare_generation(xmlNodePtr left, xmlNodePtr right); /* Utility functions */ extern xmlNodePtr get_object_root(const char *object_type,xmlNodePtr the_root); extern xmlNodePtr create_cib_fragment_adv( xmlNodePtr update, const char *section, const char *source); extern char *cib_pluralSection(const char *a_section); /* Error Interpretation*/ extern const char *cib_error2string(enum cib_errors); extern const char *cib_op2string(enum cib_op); extern xmlNodePtr createEmptyCib(void); extern gboolean verifyCibXml(xmlNodePtr cib); extern int cib_section2enum(const char *a_section); #define create_cib_fragment(update,section) create_cib_fragment_adv(update, section, __FUNCTION__) #endif diff --git a/lib/crm/cib/cib_client.c b/lib/crm/cib/cib_client.c index 400badfa7c..647d14df40 100755 --- a/lib/crm/cib/cib_client.c +++ b/lib/crm/cib/cib_client.c @@ -1,986 +1,1084 @@ /* * Copyright (c) 2004 International Business Machines * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ #include #include #include #include #include #include #include #include #include #include #include #include #include gboolean verify_cib_cmds(cib_t *cib); int cib_client_set_op_callback( cib_t *cib, void (*callback)(const struct ha_msg *msg, int call_id, int rc, xmlNodePtr output)); int cib_client_noop(cib_t *cib, int call_options); int cib_client_ping(cib_t *cib, xmlNodePtr *output_data, int call_options); int cib_client_query(cib_t *cib, const char *section, xmlNodePtr *output_data, int call_options); int cib_client_query_from(cib_t *cib, const char *host, const char *section, xmlNodePtr *output_data, int call_options); int cib_client_sync(cib_t *cib, const char *section, int call_options); int cib_client_sync_from( cib_t *cib, const char *host, const char *section, int call_options); gboolean cib_client_is_master(cib_t *cib); int cib_client_set_slave(cib_t *cib, int call_options); int cib_client_set_slave_all(cib_t *cib, int call_options); int cib_client_set_master(cib_t *cib, int call_options); int cib_client_bump_epoch(cib_t *cib, int call_options); int cib_client_create(cib_t *cib, const char *section, xmlNodePtr data, xmlNodePtr *output_data, int call_options) ; int cib_client_modify(cib_t *cib, const char *section, xmlNodePtr data, xmlNodePtr *output_data, int call_options) ; int cib_client_replace(cib_t *cib, const char *section, xmlNodePtr data, xmlNodePtr *output_data, int call_options) ; int cib_client_delete(cib_t *cib, const char *section, xmlNodePtr data, xmlNodePtr *output_data, int call_options) ; int cib_client_erase( cib_t *cib, xmlNodePtr *output_data, int call_options); int cib_client_quit(cib_t *cib, int call_options); +int cib_client_add_notify_callback( + cib_t *cib, const char *event, void (*callback)( + const char *event, struct ha_msg *msg)); + +int cib_client_del_notify_callback( + cib_t *cib, const char *event, void (*callback)( + const char *event, struct ha_msg *msg)); + +gint cib_GCompareFunc(gconstpointer a, gconstpointer b); + extern cib_t *cib_native_new(cib_t *cib); static enum cib_variant configured_variant = cib_native; /* define of the api functions*/ cib_t* cib_new(void) { cib_t* new_cib = NULL; if(configured_variant != cib_native) { crm_err("Only the native CIB type is currently implemented"); return NULL; } crm_malloc(new_cib, sizeof(cib_t)); new_cib->call_id = 1; new_cib->type = cib_none; new_cib->state = cib_disconnected; new_cib->op_callback = NULL; new_cib->variant_opaque = NULL; - new_cib->notify_callback_list = NULL; + new_cib->notify_list = NULL; /* the rest will get filled in by the variant constructor */ crm_malloc(new_cib->cmds, sizeof(cib_api_operations_t)); memset(new_cib->cmds, 0, sizeof(cib_api_operations_t)); - new_cib->cmds->set_op_callback = cib_client_set_op_callback; - - new_cib->cmds->sync = cib_client_sync; + new_cib->cmds->set_op_callback = cib_client_set_op_callback; + new_cib->cmds->add_notify_callback = cib_client_add_notify_callback; + new_cib->cmds->del_notify_callback = cib_client_del_notify_callback; + new_cib->cmds->noop = cib_client_noop; new_cib->cmds->ping = cib_client_ping; new_cib->cmds->query = cib_client_query; + new_cib->cmds->sync = cib_client_sync; + new_cib->cmds->query_from = cib_client_query_from; new_cib->cmds->sync_from = cib_client_sync_from; + new_cib->cmds->is_master = cib_client_is_master; + new_cib->cmds->set_master = cib_client_set_master; new_cib->cmds->set_slave = cib_client_set_slave; new_cib->cmds->set_slave_all = cib_client_set_slave_all; - new_cib->cmds->set_master = cib_client_set_master; + new_cib->cmds->bump_epoch = cib_client_bump_epoch; + new_cib->cmds->create = cib_client_create; new_cib->cmds->modify = cib_client_modify; new_cib->cmds->replace = cib_client_replace; new_cib->cmds->delete = cib_client_delete; new_cib->cmds->erase = cib_client_erase; new_cib->cmds->quit = cib_client_quit; cib_native_new(new_cib); if(verify_cib_cmds(new_cib) == FALSE) { return NULL; } return new_cib; } int cib_client_set_op_callback( cib_t *cib, void (*callback)(const struct ha_msg *msg, int call_id, int rc, xmlNodePtr output)) { if(callback == NULL) { crm_info("Un-Setting operation callback"); } else { crm_debug("Setting operation callback"); } cib->op_callback = callback; return cib_ok; } int cib_client_noop(cib_t *cib, int call_options) { if(cib == NULL) { return cib_missing; } else if(cib->state == cib_disconnected) { return cib_not_connected; } else if(cib->cmds->variant_op == NULL) { return cib_variant; } return cib->cmds->variant_op( cib, CRM_OP_NOOP, NULL, NULL, NULL, NULL, call_options); } int cib_client_ping(cib_t *cib, xmlNodePtr *output_data, int call_options) { if(cib == NULL) { return cib_missing; } else if(cib->state == cib_disconnected) { return cib_not_connected; } else if(cib->cmds->variant_op == NULL) { return cib_variant; } return cib->cmds->variant_op( cib, CRM_OP_PING, NULL,NULL,NULL, output_data, call_options); } int cib_client_query(cib_t *cib, const char *section, xmlNodePtr *output_data, int call_options) { return cib->cmds->query_from( cib, NULL, section, output_data, call_options); } int cib_client_query_from(cib_t *cib, const char *host, const char *section, xmlNodePtr *output_data, int call_options) { if(cib == NULL) { return cib_missing; } else if(cib->state == cib_disconnected) { return cib_not_connected; } else if(cib->cmds->variant_op == NULL) { return cib_variant; } return cib->cmds->variant_op(cib, CRM_OP_CIB_QUERY, host, section, NULL, output_data, call_options); } gboolean cib_client_is_master(cib_t *cib) { if(cib == NULL) { return cib_missing; } else if(cib->state == cib_disconnected) { return cib_not_connected; } else if(cib->cmds->variant_op == NULL) { return cib_variant; } return cib->cmds->variant_op( cib, CRM_OP_CIB_ISMASTER, NULL, NULL,NULL,NULL, cib_scope_local|cib_sync_call); } int cib_client_set_slave(cib_t *cib, int call_options) { if(cib == NULL) { return cib_missing; } else if(cib->state == cib_disconnected) { return cib_not_connected; } else if(cib->cmds->variant_op == NULL) { return cib_variant; } return cib->cmds->variant_op( cib, CRM_OP_CIB_SLAVE, NULL,NULL,NULL,NULL, call_options); } int cib_client_set_slave_all(cib_t *cib, int call_options) { if(cib == NULL) { return cib_missing; } else if(cib->state == cib_disconnected) { return cib_not_connected; } else if(cib->cmds->variant_op == NULL) { return cib_variant; } return cib->cmds->variant_op( cib, CRM_OP_CIB_SLAVEALL, NULL,NULL,NULL,NULL, call_options); } int cib_client_set_master(cib_t *cib, int call_options) { if(cib == NULL) { return cib_missing; } else if(cib->state == cib_disconnected) { return cib_not_connected; } else if(cib->cmds->variant_op == NULL) { return cib_variant; } crm_debug("Adding cib_scope_local to options"); return cib->cmds->variant_op( cib, CRM_OP_CIB_MASTER, NULL,NULL,NULL,NULL, call_options|cib_scope_local); } int cib_client_bump_epoch(cib_t *cib, int call_options) { if(cib == NULL) { return cib_missing; } else if(cib->state == cib_disconnected) { return cib_not_connected; } else if(cib->cmds->variant_op == NULL) { return cib_variant; } return cib->cmds->variant_op( cib, CRM_OP_CIB_BUMP, NULL, NULL, NULL, NULL, call_options); } int cib_client_sync(cib_t *cib, const char *section, int call_options) { return cib->cmds->sync_from(cib, NULL, section, call_options); } int cib_client_sync_from( cib_t *cib, const char *host, const char *section, int call_options) { enum cib_errors rc = cib_ok; xmlNodePtr stored_cib = NULL; xmlNodePtr current_cib = NULL; if(cib == NULL) { return cib_missing; } else if(cib->state == cib_disconnected) { return cib_not_connected; } else if(cib->cmds->variant_op == NULL) { return cib_variant; } crm_debug("Retrieving current CIB from %s", host); rc = cib->cmds->query_from( cib, host, section, ¤t_cib, call_options|cib_sync_call); if(rc == cib_ok) { if(call_options & cib_scope_local) { /* having scope == local makes no sense from here on */ call_options ^= cib_scope_local; } crm_debug("Storing current CIB (should trigger a store everywhere)"); crm_xml_debug(current_cib, "XML to store"); rc = cib->cmds->replace( cib, section, current_cib, &stored_cib, call_options); } free_xml(current_cib); free_xml(stored_cib); return rc; } int cib_client_create(cib_t *cib, const char *section, xmlNodePtr data, xmlNodePtr *output_data, int call_options) { if(cib == NULL) { return cib_missing; } else if(cib->state == cib_disconnected) { return cib_not_connected; } else if(cib->cmds->variant_op == NULL) { return cib_variant; } return cib->cmds->variant_op(cib, CRM_OP_CIB_CREATE, NULL, section, data, output_data, call_options); } int cib_client_modify(cib_t *cib, const char *section, xmlNodePtr data, xmlNodePtr *output_data, int call_options) { if(cib == NULL) { return cib_missing; } else if(cib->state == cib_disconnected) { return cib_not_connected; } else if(cib->cmds->variant_op == NULL) { return cib_variant; } return cib->cmds->variant_op(cib, CRM_OP_CIB_UPDATE, NULL, section, data, output_data, call_options); } int cib_client_replace(cib_t *cib, const char *section, xmlNodePtr data, xmlNodePtr *output_data, int call_options) { if(cib == NULL) { return cib_missing; } else if(cib->state == cib_disconnected) { return cib_not_connected; } else if(cib->cmds->variant_op == NULL) { return cib_variant; } return cib->cmds->variant_op(cib, CRM_OP_CIB_REPLACE, NULL, NULL, data, output_data, call_options); } int cib_client_delete(cib_t *cib, const char *section, xmlNodePtr data, xmlNodePtr *output_data, int call_options) { if(cib == NULL) { return cib_missing; } else if(cib->state == cib_disconnected) { return cib_not_connected; } else if(cib->cmds->variant_op == NULL) { return cib_variant; } return cib->cmds->variant_op(cib, CRM_OP_CIB_DELETE, NULL, section, data, output_data, call_options); } int cib_client_erase( cib_t *cib, xmlNodePtr *output_data, int call_options) { if(cib == NULL) { return cib_missing; } else if(cib->state == cib_disconnected) { return cib_not_connected; } else if(cib->cmds->variant_op == NULL) { return cib_variant; } return cib->cmds->variant_op(cib, CRM_OP_CIB_ERASE, NULL, NULL, NULL, output_data, call_options); } int cib_client_quit(cib_t *cib, int call_options) { if(cib == NULL) { return cib_missing; } else if(cib->state == cib_disconnected) { return cib_not_connected; } else if(cib->cmds->variant_op == NULL) { return cib_variant; } return cib->cmds->variant_op( cib, CRM_OP_QUIT, NULL, NULL, NULL, NULL, call_options); } +int cib_client_add_notify_callback( + cib_t *cib, const char *event, void (*callback)( + const char *event, struct ha_msg *msg)) +{ + GList *list_item = NULL; + cib_notify_client_t *new_client = NULL; + + crm_debug("Adding callback for %s events (%d)", + event, g_list_length(cib->notify_list)); + + crm_malloc(new_client, sizeof(cib_notify_client_t)); + new_client->event = event; + new_client->callback = callback; + + list_item = g_list_find_custom( + cib->notify_list, new_client, cib_GCompareFunc); + + if(list_item != NULL) { + crm_warn("Callback already present"); + + } else { + cib->notify_list = g_list_append( + cib->notify_list, new_client); + + crm_debug("Callback added (%d)", g_list_length(cib->notify_list)); + } + return cib_ok; +} + + +int cib_client_del_notify_callback( + cib_t *cib, const char *event, void (*callback)( + const char *event, struct ha_msg *msg)) +{ + GList *list_item = NULL; + cib_notify_client_t *new_client = NULL; + + crm_debug("Removing callback for %s events", event); + + crm_malloc(new_client, sizeof(cib_notify_client_t)); + new_client->event = event; + new_client->callback = callback; + + list_item = g_list_find_custom( + cib->notify_list, new_client, cib_GCompareFunc); + + if(list_item != NULL) { + cib_notify_client_t *list_client = list_item->data; + cib->notify_list = + g_list_remove(cib->notify_list, list_client); + crm_free(list_client); + crm_debug("Removed callback"); + + } else { + crm_debug("Callback not present"); + } + crm_free(new_client); + return cib_ok; +} + +gint cib_GCompareFunc(gconstpointer a, gconstpointer b) +{ + const cib_notify_client_t *a_client = a; + const cib_notify_client_t *b_client = b; + if(a_client->callback == b_client->callback + && safe_str_neq(a_client->event, b_client->event)) { + return 0; + } else if(((int)a_client->callback) < ((int)b_client->callback)) { + return -1; + } + return 1; +} + char * cib_pluralSection(const char *a_section) { char *a_section_parent = NULL; if (a_section == NULL) { a_section_parent = crm_strdup("all"); } else if(strcmp(a_section, XML_TAG_CIB) == 0) { a_section_parent = crm_strdup("all"); } else if(strcmp(a_section, XML_CIB_TAG_NODE) == 0) { a_section_parent = crm_strdup(XML_CIB_TAG_NODES); } else if(strcmp(a_section, XML_CIB_TAG_STATE) == 0) { a_section_parent = crm_strdup(XML_CIB_TAG_STATUS); } else if(strcmp(a_section, XML_CIB_TAG_CONSTRAINT) == 0) { a_section_parent = crm_strdup(XML_CIB_TAG_CONSTRAINTS); } else if(strcmp(a_section, "rsc_location") == 0) { a_section_parent = crm_strdup(XML_CIB_TAG_CONSTRAINTS); } else if(strcmp(a_section, "rsc_dependancy") == 0) { a_section_parent = crm_strdup(XML_CIB_TAG_CONSTRAINTS); } else if(strcmp(a_section, "rsc_order") == 0) { a_section_parent = crm_strdup(XML_CIB_TAG_CONSTRAINTS); } else if(strcmp(a_section, XML_CIB_TAG_RESOURCE) == 0) { a_section_parent = crm_strdup(XML_CIB_TAG_RESOURCES); } else if(strcmp(a_section, XML_CIB_TAG_NVPAIR) == 0) { a_section_parent = crm_strdup(XML_CIB_TAG_CRMCONFIG); } else { crm_err("Unknown section %s", a_section); a_section_parent = crm_strdup("all"); } crm_verbose("Plural of %s is %s", crm_str(a_section), a_section_parent); return a_section_parent; } const char * cib_error2string(enum cib_errors return_code) { const char *error_msg = NULL; switch(return_code) { case cib_msg_field_add: error_msg = "failed adding field to cib message"; break; case cib_operation: error_msg = "invalid operation"; break; case cib_create_msg: error_msg = "couldnt create cib message"; break; case cib_client_gone: error_msg = "client left before we could send reply"; break; case cib_not_connected: error_msg = "not connected"; break; case cib_not_authorized: error_msg = "not authorized"; break; case cib_send_failed: error_msg = "send failed"; break; case cib_reply_failed: error_msg = "reply failed"; break; case cib_return_code: error_msg = "no return code"; break; case cib_output_ptr: error_msg = "nowhere to store output"; break; case cib_output_data: error_msg = "corrupt output data"; break; case cib_connection: error_msg = "connection failed"; break; case cib_callback_register: error_msg = "couldnt register callback channel"; break; case cib_authentication: error_msg = ""; break; case cib_registration_msg: error_msg = "invalid registration msg"; break; case cib_callback_token: error_msg = "callback token not found"; break; case cib_missing: error_msg = "cib object missing"; break; case cib_variant: error_msg = "unknown/corrupt cib variant"; break; case CIBRES_MISSING_ID: error_msg = "The id field is missing"; break; case CIBRES_MISSING_TYPE: error_msg = "The type field is missing"; break; case CIBRES_MISSING_FIELD: error_msg = "A required field is missing"; break; case CIBRES_OBJTYPE_MISMATCH: error_msg = "CIBRES_OBJTYPE_MISMATCH"; break; case cib_EXISTS: error_msg = "The object already exists"; break; case cib_NOTEXISTS: error_msg = "The object does not exist"; break; case CIBRES_CORRUPT: error_msg = "The CIB is corrupt"; break; case cib_NOOBJECT: error_msg = "The update was empty"; break; case cib_NOPARENT: error_msg = "The parent object does not exist"; break; case cib_NODECOPY: error_msg = "Failed while copying update"; break; case CIBRES_OTHER: error_msg = "CIBRES_OTHER"; break; case cib_ok: error_msg = "ok"; break; case cib_unknown: error_msg = "Unknown error"; break; case cib_STALE: error_msg = "Discarded old update"; break; case cib_ACTIVATION: error_msg = "Activation Failed"; break; case cib_NOSECTION: error_msg = "Required section was missing"; break; case cib_NOTSUPPORTED: error_msg = "Supplied information is not supported"; break; case cib_not_master: error_msg = "Local service is not the master instance"; break; case cib_client_corrupt: error_msg = "Service client not valid"; break; } if(error_msg == NULL) { crm_err("Unknown CIB Error Code: %d", return_code); error_msg = ""; } return error_msg; } const char * cib_op2string(enum cib_op operation) { const char *operation_msg = NULL; switch(operation) { case 0: operation_msg = "none"; break; case 1: operation_msg = "add"; break; case 2: operation_msg = "modify"; break; case 3: operation_msg = "delete"; break; case CIB_OP_MAX: operation_msg = "invalid operation"; break; } if(operation_msg == NULL) { crm_err("Unknown CIB operation %d", operation); operation_msg = ""; } return operation_msg; } int cib_section2enum(const char *a_section) { if(a_section == NULL || strcmp(a_section, "all") == 0) { return cib_section_all; } else if(strcmp(a_section, XML_CIB_TAG_NODES) == 0) { return cib_section_nodes; } else if(strcmp(a_section, XML_CIB_TAG_STATUS) == 0) { return cib_section_status; } else if(strcmp(a_section, XML_CIB_TAG_CONSTRAINTS) == 0) { return cib_section_constraints; } else if(strcmp(a_section, XML_CIB_TAG_RESOURCES) == 0) { return cib_section_resources; } else if(strcmp(a_section, XML_CIB_TAG_CRMCONFIG) == 0) { return cib_section_crmconfig; } crm_err("Unknown CIB section: %s", a_section); return cib_section_none; } int cib_compare_generation(xmlNodePtr left, xmlNodePtr right) { int int_gen_l = -1; int int_gen_r = -1; const char *gen_l = xmlGetProp(left, XML_ATTR_GENERATION); const char *gen_r = xmlGetProp(right, XML_ATTR_GENERATION); if(gen_l != NULL) int_gen_l = atoi(gen_l); if(gen_r != NULL) int_gen_r = atoi(gen_r); crm_xml_trace(left, "left"); crm_xml_trace(left, "right"); if(int_gen_l < int_gen_r) { crm_trace("lt"); return -1; } else if(int_gen_l > int_gen_r) { crm_trace("gt"); return 1; } crm_trace("eq"); return 0; } xmlNodePtr get_cib_copy(cib_t *cib) { xmlNodePtr xml_cib; int options = cib_scope_local|cib_sync_call; if(cib->cmds->query(cib, NULL, &xml_cib, options) != cib_ok) { crm_err("Couldnt retrieve the CIB"); return NULL; } return xml_cib; } xmlNodePtr cib_get_generation(cib_t *cib) { xmlNodePtr the_cib = get_cib_copy(cib); xmlNodePtr generation = create_xml_node(NULL, "generation_tuple"); copy_in_properties(generation, the_cib); free_xml(the_cib); return generation; } /* * The caller should never free the return value */ xmlNodePtr get_object_root(const char *object_type, xmlNodePtr the_root) { const char *node_stack[2]; xmlNodePtr tmp_node = NULL; if(the_root == NULL) { crm_err("CIB root object was NULL"); return NULL; } else if(object_type == NULL) { crm_debug("Returning the whole CIB"); return the_root; } node_stack[0] = XML_CIB_TAG_CONFIGURATION; node_stack[1] = object_type; if(object_type == NULL || strlen(object_type) == 0 || safe_str_eq("all", object_type)) { return the_root; /* get the whole cib */ } else if(strcmp(object_type, XML_CIB_TAG_STATUS) == 0) { /* these live in a different place */ tmp_node = find_xml_node(the_root, XML_CIB_TAG_STATUS); node_stack[0] = XML_CIB_TAG_STATUS; node_stack[1] = NULL; /* } else if(strcmp(object_type, XML_CIB_TAG_CRMCONFIG) == 0) { */ /* /\* these live in a different place too *\/ */ /* tmp_node = find_xml_node(the_root, XML_CIB_TAG_CRMCONFIG); */ /* node_stack[0] = XML_CIB_TAG_CRMCONFIG; */ /* node_stack[1] = NULL; */ } else { tmp_node = find_xml_node_nested(the_root, node_stack, 2); } if (tmp_node == NULL) { crm_err("[cib] Section %s [%s [%s]] not present", the_root->name, node_stack[0], node_stack[1]?node_stack[1]:""); } return tmp_node; } xmlNodePtr create_cib_fragment_adv( xmlNodePtr update, const char *section, const char *source) { gboolean whole_cib = FALSE; xmlNodePtr fragment = create_xml_node(NULL, XML_TAG_FRAGMENT); xmlNodePtr cib = NULL; xmlNodePtr object_root = NULL; char *auto_section = cib_pluralSection(update?update->name:NULL); if(update == NULL) { crm_err("No update to create a fragment for"); crm_free(auto_section); return NULL; } else if(section == NULL) { section = auto_section; } else if(strcmp(auto_section, section) != 0) { crm_err("Values for update (tag=%s) and section (%s)" " were not consistent", update->name, section); crm_free(auto_section); return NULL; } if(strcmp(section, "all")==0 && strcmp(update->name, XML_TAG_CIB)==0) { whole_cib = TRUE; } set_xml_property_copy(fragment, XML_ATTR_SECTION, section); if(whole_cib == FALSE) { cib = createEmptyCib(); object_root = get_object_root(section, cib); xmlAddChildList(object_root, xmlCopyNodeList(update)); } else { cib = xmlCopyNodeList(update); } xmlAddChild(fragment, cib); set_xml_property_copy(cib, "debug_source", source); crm_free(auto_section); crm_debug("Verifying created fragment"); if(verifyCibXml(cib) == FALSE) { crm_err("Fragment creation failed"); crm_err("[src] %s", dump_xml_formatted(update)); crm_err("[created] %s", dump_xml_formatted(fragment)); free_xml(fragment); fragment = NULL; } return fragment; } /* * It is the callers responsibility to free both the new CIB (output) * and the new CIB (input) */ xmlNodePtr createEmptyCib(void) { xmlNodePtr cib_root = NULL, config = NULL, status = NULL; cib_root = create_xml_node(NULL, XML_TAG_CIB); config = create_xml_node(cib_root, XML_CIB_TAG_CONFIGURATION); status = create_xml_node(cib_root, XML_CIB_TAG_STATUS); set_node_tstamp(cib_root); set_node_tstamp(config); set_node_tstamp(status); set_xml_property_copy(cib_root, "version", "1"); set_xml_property_copy(cib_root, "generated", XML_BOOLEAN_TRUE); create_xml_node(config, XML_CIB_TAG_CRMCONFIG); create_xml_node(config, XML_CIB_TAG_NODES); create_xml_node(config, XML_CIB_TAG_RESOURCES); create_xml_node(config, XML_CIB_TAG_CONSTRAINTS); if (verifyCibXml(cib_root)) { return cib_root; } crm_crit("The generated CIB did not pass integrity testing!!" " All hope is lost."); return NULL; } gboolean verifyCibXml(xmlNodePtr cib) { gboolean is_valid = TRUE; xmlNodePtr tmp_node = NULL; if (cib == NULL) { crm_warn("XML Buffer was empty."); return FALSE; } tmp_node = get_object_root(XML_CIB_TAG_NODES, cib); if (tmp_node == NULL) is_valid = FALSE; tmp_node = get_object_root(XML_CIB_TAG_RESOURCES, cib); if (tmp_node == NULL) is_valid = FALSE; tmp_node = get_object_root(XML_CIB_TAG_CONSTRAINTS, cib); if (tmp_node == NULL) is_valid = FALSE; tmp_node = get_object_root(XML_CIB_TAG_STATUS, cib); if (tmp_node == NULL) is_valid = FALSE; tmp_node = get_object_root(XML_CIB_TAG_CRMCONFIG, cib); if (tmp_node == NULL) is_valid = FALSE; /* more integrity tests */ return is_valid; } gboolean verify_cib_cmds(cib_t *cib) { gboolean valid = TRUE; if(cib->cmds->variant_op == NULL) { crm_err("Operation variant_op not set"); valid = FALSE; } if(cib->cmds->signon == NULL) { crm_err("Operation signon not set"); valid = FALSE; } if(cib->cmds->signoff == NULL) { crm_err("Operation signoff not set"); valid = FALSE; } if(cib->cmds->free == NULL) { crm_err("Operation free not set"); valid = FALSE; } if(cib->cmds->set_op_callback == NULL) { crm_err("Operation set_op_callback not set"); valid = FALSE; } + if(cib->cmds->add_notify_callback == NULL) { + crm_err("Operation add_notify_callback not set"); + valid = FALSE; + } + if(cib->cmds->del_notify_callback == NULL) { + crm_err("Operation del_notify_callback not set"); + valid = FALSE; + } if(cib->cmds->set_connection_dnotify == NULL) { crm_err("Operation set_connection_dnotify not set"); valid = FALSE; } if(cib->cmds->channel == NULL) { crm_err("Operation channel not set"); valid = FALSE; } if(cib->cmds->inputfd == NULL) { crm_err("Operation inputfd not set"); valid = FALSE; } if(cib->cmds->noop == NULL) { crm_err("Operation noop not set"); valid = FALSE; } if(cib->cmds->ping == NULL) { crm_err("Operation ping not set"); valid = FALSE; } if(cib->cmds->query == NULL) { crm_err("Operation query not set"); valid = FALSE; } if(cib->cmds->query_from == NULL) { crm_err("Operation query_from not set"); valid = FALSE; } if(cib->cmds->is_master == NULL) { crm_err("Operation is_master not set"); valid = FALSE; } if(cib->cmds->set_master == NULL) { crm_err("Operation set_master not set"); valid = FALSE; } if(cib->cmds->set_slave == NULL) { crm_err("Operation set_slave not set"); valid = FALSE; } if(cib->cmds->set_slave_all == NULL) { crm_err("Operation set_slave_all not set"); valid = FALSE; } if(cib->cmds->sync == NULL) { crm_err("Operation sync not set"); valid = FALSE; } if(cib->cmds->sync_from == NULL) { crm_err("Operation sync_from not set"); valid = FALSE; } if(cib->cmds->bump_epoch == NULL) { crm_err("Operation bump_epoch not set"); valid = FALSE; } if(cib->cmds->create == NULL) { crm_err("Operation create not set"); valid = FALSE; } if(cib->cmds->modify == NULL) { crm_err("Operation modify not set"); valid = FALSE; } if(cib->cmds->replace == NULL) { crm_err("Operation replace not set"); valid = FALSE; } if(cib->cmds->delete == NULL) { crm_err("Operation delete not set"); valid = FALSE; } if(cib->cmds->erase == NULL) { crm_err("Operation erase not set"); valid = FALSE; } if(cib->cmds->quit == NULL) { crm_err("Operation quit not set"); valid = FALSE; } if(cib->cmds->msgready == NULL) { crm_err("Operation msgready not set"); valid = FALSE; } if(cib->cmds->rcvmsg == NULL) { crm_err("Operation rcvmsg not set"); valid = FALSE; } if(cib->cmds->dispatch == NULL) { crm_err("Operation dispatch not set"); valid = FALSE; } + return valid; } diff --git a/lib/crm/cib/cib_native.c b/lib/crm/cib/cib_native.c index 445d06c718..b43a25b5b6 100755 --- a/lib/crm/cib/cib_native.c +++ b/lib/crm/cib/cib_native.c @@ -1,535 +1,571 @@ /* * Copyright (c) 2004 International Business Machines * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ #include #include #include #include #include #include #include #include #include #include #include #include #include typedef struct cib_native_opaque_s { IPC_Channel *command_channel; IPC_Channel *callback_channel; -/* GCHSource *callback_source; */ + GCHSource *callback_source; } cib_native_opaque_t; int cib_native_perform_op( cib_t *cib, const char *op, const char *host, const char *section, xmlNodePtr data, xmlNodePtr *output_data, int call_options); int cib_native_signon(cib_t* cib, enum cib_conn_type type); int cib_native_signoff(cib_t* cib); int cib_native_free(cib_t* cib); IPC_Channel *cib_native_channel(cib_t* cib); int cib_native_inputfd(cib_t* cib); gboolean cib_native_msgready(cib_t* cib); int cib_native_rcvmsg(cib_t* cib, int blocking); gboolean cib_native_dispatch(IPC_Channel *channel, gpointer user_data); cib_t *cib_native_new (cib_t *cib); int cib_native_set_connection_dnotify( cib_t *cib, void (*dnotify)(gpointer user_data)); +void cib_native_notify(gpointer data, gpointer user_data); + +void cib_native_callback(cib_t *cib, struct ha_msg *msg); + + cib_t* cib_native_new (cib_t *cib) { cib_native_opaque_t *native = NULL; crm_malloc(cib->variant_opaque, sizeof(cib_native_opaque_t)); native = cib->variant_opaque; native->command_channel = NULL; native->callback_channel = NULL; /* assign variant specific ops*/ cib->cmds->variant_op = cib_native_perform_op; cib->cmds->signon = cib_native_signon; cib->cmds->signoff = cib_native_signoff; cib->cmds->free = cib_native_free; cib->cmds->channel = cib_native_channel; cib->cmds->inputfd = cib_native_inputfd; cib->cmds->msgready = cib_native_msgready; cib->cmds->rcvmsg = cib_native_rcvmsg; cib->cmds->dispatch = cib_native_dispatch; cib->cmds->set_connection_dnotify = cib_native_set_connection_dnotify; return cib; } int cib_native_signon(cib_t* cib, enum cib_conn_type type) { int rc = cib_ok; char *uuid_ticket = NULL; struct ha_msg *reg_msg = NULL; cib_native_opaque_t *native = cib->variant_opaque; crm_trace("Connecting command channel"); if(type == cib_command) { cib->state = cib_connected_command; native->command_channel = init_client_ipc_comms_nodispatch( "cib_rw"); } else { cib->state = cib_connected_query; native->command_channel = init_client_ipc_comms_nodispatch( "cib_ro"); } if(native->command_channel == NULL) { crm_err("Connection to command channel failed"); rc = cib_connection; } else if(native->command_channel->ch_status != IPC_CONNECT) { crm_err("Connection may have succeeded," " but authentication to command channel failed"); rc = cib_authentication; } if(rc == cib_ok) { crm_trace("Connecting callback channel"); -/* native->callback_source = init_client_ipc_comms( */ - native->callback_channel = init_client_ipc_comms( - "cib_callback", cib_native_dispatch, cib); -/* native->callback_channel = native->callback_source->ch; */ + native->callback_source = init_client_ipc_comms( + "cib_callback", cib_native_dispatch, cib, &(native->callback_channel)); if(native->callback_channel == NULL) { crm_err("Connection to callback channel failed"); rc = cib_connection; } } else if(rc == cib_ok && native->callback_channel->ch_status != IPC_CONNECT) { crm_err("Connection may have succeeded," " but authentication to callback channel failed"); rc = cib_authentication; } if(rc == cib_ok) { const char *msg_type = NULL; crm_trace("Waiting for msg on command channel"); reg_msg = msgfromIPC_noauth(native->command_channel); msg_type = cl_get_string(reg_msg, F_CIB_OPERATION); if(safe_str_neq(msg_type, CRM_OP_REGISTER) ) { crm_err("Invalid registration message: %s", msg_type); rc = cib_registration_msg; } else { const char *tmp_ticket = NULL; crm_trace("Retrieving callback channel ticket"); tmp_ticket = cl_get_string( reg_msg, F_CIB_CALLBACK_TOKEN); if(tmp_ticket == NULL) { rc = cib_callback_token; } else { uuid_ticket = crm_strdup(tmp_ticket); } } ha_msg_del(reg_msg); reg_msg = NULL; } if(rc == cib_ok) { crm_trace("Registering callback channel with ticket %s", uuid_ticket); reg_msg = ha_msg_new(2); ha_msg_add(reg_msg, F_CIB_OPERATION, CRM_OP_REGISTER); ha_msg_add(reg_msg, F_CIB_CALLBACK_TOKEN, uuid_ticket); if(msg2ipcchan(reg_msg, native->callback_channel) != HA_OK) { rc = cib_callback_register; } crm_free(uuid_ticket); ha_msg_del(reg_msg); } if(rc == cib_ok) { crm_trace("wait for the callback channel setup to complete"); reg_msg = msgfromIPC_noauth(native->callback_channel); if(reg_msg == NULL) { crm_err("Connection to callback channel not maintined"); rc = cib_connection; } ha_msg_del(reg_msg); } if(rc == cib_ok) { crm_info("Connection to CIB successful"); return cib_ok; } crm_err("Connection to CIB failed: %s", cib_error2string(rc)); cib_native_signoff(cib); return rc; } int cib_native_signoff(cib_t* cib) { cib_native_opaque_t *native = cib->variant_opaque; crm_info("Signing out of the CIB Service"); /* close channels */ if (native->command_channel != NULL) { native->command_channel->ops->destroy( native->command_channel); native->command_channel = NULL; } if (native->callback_channel != NULL) { native->callback_channel->ops->destroy( native->callback_channel); native->callback_channel = NULL; } cib->state = cib_disconnected; cib->type = cib_none; return cib_ok; } int cib_native_free (cib_t* cib) { int rc = cib_ok; crm_warn("Freeing CIB"); if(cib->state != cib_disconnected) { rc = cib_native_signoff(cib); if(rc == cib_ok) { crm_free(cib); } } return rc; } IPC_Channel * cib_native_channel(cib_t* cib) { cib_native_opaque_t *native = NULL; if(cib == NULL) { crm_err("Missing cib object"); return NULL; } native = cib->variant_opaque; if(native != NULL) { return native->callback_channel; } crm_err("couldnt find variant specific data in %p", cib); return NULL; } int cib_native_inputfd(cib_t* cib) { IPC_Channel *ch = cib_native_channel(cib); return ch->ops->get_recv_select_fd(ch); } int cib_native_perform_op( cib_t *cib, const char *op, const char *host, const char *section, xmlNodePtr data, xmlNodePtr *output_data, int call_options) { int rc = HA_OK; - size_t calldata_len = 0; - - char *calldata = NULL; const char *output = NULL; struct ha_msg *op_msg = NULL; struct ha_msg *op_reply = NULL; cib_native_opaque_t *native = cib->variant_opaque; if(cib->state == cib_disconnected) { return cib_not_connected; } if(output_data != NULL) { *output_data = NULL; } op_msg = ha_msg_new(7); if(op == NULL) { crm_err("No operation specified"); rc = cib_operation; } if(rc == HA_OK) { rc = ha_msg_add(op_msg, F_TYPE, "cib"); } if(rc == HA_OK) { rc = ha_msg_add(op_msg, F_CIB_OPERATION, op); } if(rc == HA_OK && host != NULL) { rc = ha_msg_add(op_msg, F_CIB_HOST, host); } if(rc == HA_OK && section != NULL) { rc = ha_msg_add(op_msg, F_CIB_SECTION, section); } if(rc == HA_OK) { char *tmp = crm_itoa(cib->call_id); rc = ha_msg_add(op_msg, F_CIB_CALLID, tmp); crm_free(tmp); } if(rc == HA_OK) { char *tmp = crm_itoa(call_options); crm_trace("Sending call options: %.8lx, %d, %s", (long)call_options, call_options, tmp); rc = ha_msg_add(op_msg, F_CIB_CALLOPTS, tmp); crm_free(tmp); } if(rc == HA_OK) { - calldata = dump_xml_unformatted(data); + char *calldata = dump_xml_unformatted(data); if(calldata != NULL) { - calldata_len = strlen(calldata) + 1; rc = ha_msg_add(op_msg, F_CIB_CALLDATA, calldata); } else { crm_debug("Calldata string was NULL (xml=%p)", data); } crm_free(calldata); } if (rc != HA_OK) { ha_msg_del(op_msg); crm_err("Failed to create CIB operation message"); return cib_create_msg; } cib->call_id++; crm_debug("Sending message to CIB service"); cl_log_message(op_msg); rc = msg2ipcchan(op_msg, native->command_channel); ha_msg_del(op_msg); if (rc != HA_OK) { return cib_send_failed; } if( !(call_options & cib_sync_call)) { return cib->call_id - 1; } op_reply = msgfromIPC_noauth(native->command_channel); if (op_reply == NULL) { crm_err("No reply message"); return cib_reply_failed; } rc = cib_ok; /* Start processing the reply... */ if(ha_msg_value_int(op_reply, F_CIB_RC, &rc) != HA_OK) { rc = cib_return_code; } /* eg. op may have been a privelidged action and we are in query mode */ if(rc != cib_ok) { ha_msg_del(op_reply); return rc; } if(!(call_options & cib_discard_reply)) { output = cl_get_string(op_msg, F_CIB_CALLDATA); } if(output != NULL && output_data == NULL) { rc = cib_output_ptr; } else if(output != NULL) { *output_data = string2xml(output); if(*output_data == NULL) { rc = cib_output_data; } } else { crm_debug("No output in reply to \"%s\" command %d", op, cib->call_id - 1); } ha_msg_del(op_reply); return rc; } gboolean cib_native_msgready(cib_t* cib) { IPC_Channel *ch = NULL; cib_native_opaque_t *native = NULL; if (cib == NULL) { crm_err("No CIB!"); return FALSE; } native = cib->variant_opaque; ch = cib_native_channel(cib); if (ch == NULL) { crm_err("No channel"); return FALSE; } if(native->command_channel->ops->is_message_pending( native->command_channel)) { - crm_warn("Message pending on command channel"); + crm_verbose("Message pending on command channel"); } if(native->callback_channel->ops->is_message_pending( native->callback_channel)) { - crm_info("Message pending on callback channel"); + crm_trace("Message pending on callback channel"); return TRUE; - } - crm_info("No message pending"); + } + crm_verbose("No message pending"); return FALSE; } int cib_native_rcvmsg(cib_t* cib, int blocking) { const char *type = NULL; struct ha_msg* msg = NULL; IPC_Channel *ch = cib_native_channel(cib); /* if it is not blocking mode and no message in the channel, return */ if (blocking == 0 && cib_native_msgready(cib) == FALSE) { crm_debug("No message ready and non-blocking..."); return 0; } else if (cib_native_msgready(cib) == FALSE) { crm_debug("Waiting for message from CIB service..."); ch->ops->waitin(ch); } /* get the message */ msg = msgfromIPC_noauth(ch); if (msg == NULL) { crm_warn("Received a NULL msg from CIB service."); return 0; } /* do callbacks */ type = cl_get_string(msg, F_TYPE); crm_trace("Activating %s callbacks...", type); + + if(safe_str_eq(type, T_CIB)) { + cib_native_callback(cib, msg); + + } else if(safe_str_eq(type, T_CIB_NOTIFY)) { + g_list_foreach(cib->notify_list, cib_native_notify, msg); + + } else { + crm_err("Unknown message type: %s", type); + } + ha_msg_del(msg); + + return 1; +} + +void +cib_native_callback(cib_t *cib, struct ha_msg *msg) +{ + int rc = 0; + int call_id = 0; + xmlNodePtr output = NULL; + const char *output_s = NULL; + if(cib->op_callback == NULL) { crm_debug("No OP callback set, ignoring reply"); + return; + } + + ha_msg_value_int(msg, F_CIB_CALLID, &call_id); + ha_msg_value_int(msg, F_CIB_RC, &rc); + output_s = cl_get_string(msg, F_CIB_CALLDATA); + if(output_s != NULL) { + output = string2xml(output_s); + } + + cib->op_callback(msg, call_id, rc, output); + + crm_trace("OP callback activated."); +} - } else { - int rc = 0; - int call_id = 0; - xmlNodePtr output = NULL; - const char *output_s = NULL; - - ha_msg_value_int(msg, F_CIB_CALLID, &call_id); - ha_msg_value_int(msg, F_CIB_RC, &rc); - output_s = cl_get_string(msg, F_CIB_CALLDATA); - if(output_s != NULL) { - output = string2xml(output_s); - } - cib->op_callback(msg, call_id, rc, output); +void +cib_native_notify(gpointer data, gpointer user_data) +{ + struct ha_msg *msg = user_data; + cib_notify_client_t *entry = data; + const char *event = cl_get_string(msg, F_SUBTYPE); - crm_trace("OP callback activated."); - } + if(entry == NULL) { + crm_warn("Skipping callback - NULL callback client"); + return; - ha_msg_del(msg); + } else if(entry->callback == NULL) { + crm_warn("Skipping callback - NULL callback"); + return; - return 1; + } else if(safe_str_neq(entry->event, event)) { + crm_trace("Skipping callback - event mismatch %p/%s vs. %s", + entry, entry->event, event); + return; + } + + crm_trace("Invoking callback for %p/%s event...", entry, event); + entry->callback(event, msg); } - gboolean cib_native_dispatch(IPC_Channel *channel, gpointer user_data) { int lpc = 0; cib_t *cib = user_data; crm_debug("Received callback"); if(user_data == NULL){ crm_err("user_data field must contain the CIB struct"); return FALSE; } while(cib_native_msgready(cib)) { lpc++; /* invoke the callbacks but dont block */ if(cib_native_rcvmsg(cib, 0) < 1) { break; } } crm_debug("%d CIB messages dispatched", lpc); if (channel && (channel->ch_status == IPC_DISCONNECT)) { crm_crit("Lost connection to the CIB service."); return FALSE; } return TRUE; } int cib_native_set_connection_dnotify( cib_t *cib, void (*dnotify)(gpointer user_data)) { cib_native_opaque_t *native = NULL; if (cib == NULL) { crm_err("No CIB!"); return FALSE; } native = cib->variant_opaque; -#if 0 if(dnotify == NULL) { crm_warn("Setting dnotify back to default value"); - native->callback_source->dnotify = - default_ipc_connection_destroy; + set_IPC_Channel_dnotify(native->callback_source, + default_ipc_connection_destroy); } else { crm_debug("Setting dnotify"); - native->callback_source->dnotify = dnotify; + set_IPC_Channel_dnotify(native->callback_source, dnotify); } -#endif return cib_ok; }