Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F4624502
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
14 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/lib/crm/cib/cib_native.c b/lib/crm/cib/cib_native.c
index 45c0e50128..3b50361d6f 100755
--- a/lib/crm/cib/cib_native.c
+++ b/lib/crm/cib/cib_native.c
@@ -1,574 +1,568 @@
/*
* Copyright (c) 2004 International Business Machines
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
*/
#include <portability.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdarg.h>
#include <string.h>
#include <glib.h>
#include <heartbeat.h>
#include <clplumbing/ipc.h>
#include <ha_msg.h>
#include <crm/crm.h>
#include <crm/cib.h>
#include <crm/common/ipc.h>
typedef struct cib_native_opaque_s
{
IPC_Channel *command_channel;
IPC_Channel *callback_channel;
GCHSource *callback_source;
} cib_native_opaque_t;
int cib_native_perform_op(
cib_t *cib, const char *op, const char *host, const char *section,
xmlNodePtr data, xmlNodePtr *output_data, int call_options);
int cib_native_signon(cib_t* cib, enum cib_conn_type type);
int cib_native_signoff(cib_t* cib);
int cib_native_free(cib_t* cib);
IPC_Channel *cib_native_channel(cib_t* cib);
int cib_native_inputfd(cib_t* cib);
gboolean cib_native_msgready(cib_t* cib);
int cib_native_rcvmsg(cib_t* cib, int blocking);
gboolean cib_native_dispatch(IPC_Channel *channel, gpointer user_data);
cib_t *cib_native_new (cib_t *cib);
int cib_native_set_connection_dnotify(
cib_t *cib, void (*dnotify)(gpointer user_data));
void cib_native_notify(gpointer data, gpointer user_data);
void cib_native_callback(cib_t *cib, struct ha_msg *msg);
cib_t*
cib_native_new (cib_t *cib)
{
cib_native_opaque_t *native = NULL;
crm_malloc(cib->variant_opaque, sizeof(cib_native_opaque_t));
native = cib->variant_opaque;
native->command_channel = NULL;
native->callback_channel = NULL;
/* assign variant specific ops*/
cib->cmds->variant_op = cib_native_perform_op;
cib->cmds->signon = cib_native_signon;
cib->cmds->signoff = cib_native_signoff;
cib->cmds->free = cib_native_free;
cib->cmds->channel = cib_native_channel;
cib->cmds->inputfd = cib_native_inputfd;
cib->cmds->msgready = cib_native_msgready;
cib->cmds->rcvmsg = cib_native_rcvmsg;
cib->cmds->dispatch = cib_native_dispatch;
cib->cmds->set_connection_dnotify = cib_native_set_connection_dnotify;
return cib;
}
int
cib_native_signon(cib_t* cib, enum cib_conn_type type)
{
int rc = cib_ok;
char *uuid_ticket = NULL;
struct ha_msg *reg_msg = NULL;
cib_native_opaque_t *native = cib->variant_opaque;
crm_trace("Connecting command channel");
if(type == cib_command) {
cib->state = cib_connected_command;
native->command_channel = init_client_ipc_comms_nodispatch(
"cib_rw");
} else {
cib->state = cib_connected_query;
native->command_channel = init_client_ipc_comms_nodispatch(
"cib_ro");
}
if(native->command_channel == NULL) {
crm_err("Connection to command channel failed");
rc = cib_connection;
} else if(native->command_channel->ch_status != IPC_CONNECT) {
crm_err("Connection may have succeeded,"
" but authentication to command channel failed");
rc = cib_authentication;
}
if(rc == cib_ok) {
crm_trace("Connecting callback channel");
native->callback_source = init_client_ipc_comms(
"cib_callback", cib_native_dispatch, cib, &(native->callback_channel));
if(native->callback_channel == NULL) {
crm_err("Connection to callback channel failed");
rc = cib_connection;
}
} else if(rc == cib_ok
&& native->callback_channel->ch_status != IPC_CONNECT) {
crm_err("Connection may have succeeded,"
" but authentication to callback channel failed");
rc = cib_authentication;
}
if(rc == cib_ok) {
const char *msg_type = NULL;
crm_trace("Waiting for msg on command channel");
reg_msg = msgfromIPC_noauth(native->command_channel);
msg_type = cl_get_string(reg_msg, F_CIB_OPERATION);
if(safe_str_neq(msg_type, CRM_OP_REGISTER) ) {
crm_err("Invalid registration message: %s", msg_type);
rc = cib_registration_msg;
} else {
const char *tmp_ticket = NULL;
crm_trace("Retrieving callback channel ticket");
tmp_ticket = cl_get_string(
reg_msg, F_CIB_CALLBACK_TOKEN);
if(tmp_ticket == NULL) {
rc = cib_callback_token;
} else {
uuid_ticket = crm_strdup(tmp_ticket);
}
}
ha_msg_del(reg_msg);
reg_msg = NULL;
}
if(rc == cib_ok) {
crm_trace("Registering callback channel with ticket %s",
uuid_ticket);
reg_msg = ha_msg_new(2);
ha_msg_add(reg_msg, F_CIB_OPERATION, CRM_OP_REGISTER);
ha_msg_add(reg_msg, F_CIB_CALLBACK_TOKEN, uuid_ticket);
if(msg2ipcchan(reg_msg, native->callback_channel) != HA_OK) {
rc = cib_callback_register;
}
crm_free(uuid_ticket);
ha_msg_del(reg_msg);
}
if(rc == cib_ok) {
crm_trace("wait for the callback channel setup to complete");
reg_msg = msgfromIPC_noauth(native->callback_channel);
if(reg_msg == NULL) {
crm_err("Connection to callback channel not maintined");
rc = cib_connection;
}
ha_msg_del(reg_msg);
}
if(rc == cib_ok) {
crm_info("Connection to CIB successful");
return cib_ok;
}
crm_err("Connection to CIB failed: %s", cib_error2string(rc));
cib_native_signoff(cib);
return rc;
}
int
cib_native_signoff(cib_t* cib)
{
cib_native_opaque_t *native = cib->variant_opaque;
crm_info("Signing out of the CIB Service");
/* close channels */
if (native->command_channel != NULL) {
native->command_channel->ops->destroy(
native->command_channel);
native->command_channel = NULL;
}
if (native->callback_channel != NULL) {
native->callback_channel->ops->destroy(
native->callback_channel);
native->callback_channel = NULL;
}
cib->state = cib_disconnected;
cib->type = cib_none;
return cib_ok;
}
int
cib_native_free (cib_t* cib)
{
int rc = cib_ok;
crm_warn("Freeing CIB");
if(cib->state != cib_disconnected) {
rc = cib_native_signoff(cib);
if(rc == cib_ok) {
crm_free(cib);
}
}
return rc;
}
IPC_Channel *
cib_native_channel(cib_t* cib)
{
cib_native_opaque_t *native = NULL;
if(cib == NULL) {
crm_err("Missing cib object");
return NULL;
}
native = cib->variant_opaque;
if(native != NULL) {
return native->callback_channel;
}
crm_err("couldnt find variant specific data in %p", cib);
return NULL;
}
int
cib_native_inputfd(cib_t* cib)
{
IPC_Channel *ch = cib_native_channel(cib);
return ch->ops->get_recv_select_fd(ch);
}
int
cib_native_perform_op(
cib_t *cib, const char *op, const char *host, const char *section,
xmlNodePtr data, xmlNodePtr *output_data, int call_options)
{
int rc = HA_OK;
const char *output = NULL;
struct ha_msg *op_msg = NULL;
struct ha_msg *op_reply = NULL;
cib_native_opaque_t *native = cib->variant_opaque;
if(cib->state == cib_disconnected) {
return cib_not_connected;
}
if(output_data != NULL) {
*output_data = NULL;
}
op_msg = ha_msg_new(7);
if(op == NULL) {
crm_err("No operation specified");
rc = cib_operation;
}
if(rc == HA_OK) {
rc = ha_msg_add(op_msg, F_TYPE, "cib");
}
if(rc == HA_OK) {
rc = ha_msg_add(op_msg, F_CIB_OPERATION, op);
}
if(rc == HA_OK && host != NULL) {
rc = ha_msg_add(op_msg, F_CIB_HOST, host);
}
if(rc == HA_OK && section != NULL) {
rc = ha_msg_add(op_msg, F_CIB_SECTION, section);
}
if(rc == HA_OK) {
char *tmp = crm_itoa(cib->call_id);
rc = ha_msg_add(op_msg, F_CIB_CALLID, tmp);
crm_free(tmp);
}
if(rc == HA_OK) {
char *tmp = crm_itoa(call_options);
crm_trace("Sending call options: %.8lx, %d, %s",
(long)call_options, call_options, tmp);
rc = ha_msg_add(op_msg, F_CIB_CALLOPTS, tmp);
crm_free(tmp);
}
if(rc == HA_OK) {
char *calldata = dump_xml_unformatted(data);
if(calldata != NULL) {
rc = ha_msg_add(op_msg, F_CIB_CALLDATA, calldata);
} else {
crm_debug("Calldata string was NULL (xml=%p)", data);
}
crm_free(calldata);
}
if (rc != HA_OK) {
ha_msg_del(op_msg);
crm_err("Failed to create CIB operation message");
return cib_create_msg;
}
cib->call_id++;
crm_debug("Sending message to CIB service");
cl_log_message(op_msg);
rc = msg2ipcchan(op_msg, native->command_channel);
ha_msg_del(op_msg);
if (rc != HA_OK) {
return cib_send_failed;
}
if((call_options & cib_discard_reply)) {
return cib_ok;
} else if(!(call_options & cib_sync_call)) {
return cib->call_id - 1;
}
op_reply = msgfromIPC_noauth(native->command_channel);
if (op_reply == NULL) {
crm_err("No reply message");
return cib_reply_failed;
}
rc = cib_ok;
/* Start processing the reply... */
if(ha_msg_value_int(op_reply, F_CIB_RC, &rc) != HA_OK) {
rc = cib_return_code;
- }
-
- /* eg. op may have been a privelidged action and we are in query mode */
- if(rc != cib_ok) {
- ha_msg_del(op_reply);
- return rc;
- }
+ }
if(!(call_options & cib_discard_reply)) {
output = cl_get_string(op_msg, F_CIB_CALLDATA);
}
- if(output != NULL && output_data == NULL) {
- rc = cib_output_ptr;
-
+ if(output_data == NULL) {
+ /* do nothing more */
+
} else if(output != NULL) {
*output_data = string2xml(output);
if(*output_data == NULL) {
rc = cib_output_data;
}
} else {
crm_debug("No output in reply to \"%s\" command %d",
op, cib->call_id - 1);
}
ha_msg_del(op_reply);
return rc;
}
gboolean
cib_native_msgready(cib_t* cib)
{
IPC_Channel *ch = NULL;
cib_native_opaque_t *native = NULL;
if (cib == NULL) {
crm_err("No CIB!");
return FALSE;
}
native = cib->variant_opaque;
ch = cib_native_channel(cib);
if (ch == NULL) {
crm_err("No channel");
return FALSE;
}
if(native->command_channel->ops->is_message_pending(
native->command_channel)) {
crm_verbose("Message pending on command channel");
}
if(native->callback_channel->ops->is_message_pending(
native->callback_channel)) {
crm_trace("Message pending on callback channel");
return TRUE;
}
crm_verbose("No message pending");
return FALSE;
}
int
cib_native_rcvmsg(cib_t* cib, int blocking)
{
const char *type = NULL;
struct ha_msg* msg = NULL;
IPC_Channel *ch = cib_native_channel(cib);
/* if it is not blocking mode and no message in the channel, return */
if (blocking == 0 && cib_native_msgready(cib) == FALSE) {
crm_debug("No message ready and non-blocking...");
return 0;
} else if (cib_native_msgready(cib) == FALSE) {
crm_debug("Waiting for message from CIB service...");
ch->ops->waitin(ch);
}
/* get the message */
msg = msgfromIPC_noauth(ch);
if (msg == NULL) {
crm_warn("Received a NULL msg from CIB service.");
return 0;
}
/* do callbacks */
type = cl_get_string(msg, F_TYPE);
crm_trace("Activating %s callbacks...", type);
if(safe_str_eq(type, T_CIB)) {
cib_native_callback(cib, msg);
} else if(safe_str_eq(type, T_CIB_NOTIFY)) {
g_list_foreach(cib->notify_list, cib_native_notify, msg);
} else {
crm_err("Unknown message type: %s", type);
}
ha_msg_del(msg);
return 1;
}
void
cib_native_callback(cib_t *cib, struct ha_msg *msg)
{
int rc = 0;
int call_id = 0;
xmlNodePtr output = NULL;
const char *output_s = NULL;
if(cib->op_callback == NULL) {
crm_debug("No OP callback set, ignoring reply");
return;
}
ha_msg_value_int(msg, F_CIB_CALLID, &call_id);
ha_msg_value_int(msg, F_CIB_RC, &rc);
output_s = cl_get_string(msg, F_CIB_CALLDATA);
if(output_s != NULL) {
output = string2xml(output_s);
}
cib->op_callback(msg, call_id, rc, output);
crm_trace("OP callback activated.");
}
void
cib_native_notify(gpointer data, gpointer user_data)
{
struct ha_msg *msg = user_data;
cib_notify_client_t *entry = data;
const char *event = cl_get_string(msg, F_SUBTYPE);
if(entry == NULL) {
crm_warn("Skipping callback - NULL callback client");
return;
} else if(entry->callback == NULL) {
crm_warn("Skipping callback - NULL callback");
return;
} else if(safe_str_neq(entry->event, event)) {
crm_trace("Skipping callback - event mismatch %p/%s vs. %s",
entry, entry->event, event);
return;
}
crm_trace("Invoking callback for %p/%s event...", entry, event);
entry->callback(event, msg);
}
gboolean
cib_native_dispatch(IPC_Channel *channel, gpointer user_data)
{
int lpc = 0;
cib_t *cib = user_data;
crm_debug("Received callback");
if(user_data == NULL){
crm_err("user_data field must contain the CIB struct");
return FALSE;
}
while(cib_native_msgready(cib)) {
lpc++;
/* invoke the callbacks but dont block */
if(cib_native_rcvmsg(cib, 0) < 1) {
break;
}
}
crm_debug("%d CIB messages dispatched", lpc);
if (channel && (channel->ch_status == IPC_DISCONNECT)) {
crm_crit("Lost connection to the CIB service.");
return FALSE;
}
return TRUE;
}
int cib_native_set_connection_dnotify(
cib_t *cib, void (*dnotify)(gpointer user_data))
{
cib_native_opaque_t *native = NULL;
if (cib == NULL) {
crm_err("No CIB!");
return FALSE;
}
native = cib->variant_opaque;
if(dnotify == NULL) {
crm_warn("Setting dnotify back to default value");
set_IPC_Channel_dnotify(native->callback_source,
default_ipc_connection_destroy);
} else {
crm_debug("Setting dnotify");
set_IPC_Channel_dnotify(native->callback_source, dnotify);
}
return cib_ok;
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Tue, Jul 8, 6:28 PM (12 h, 15 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2002657
Default Alt Text
(14 KB)
Attached To
Mode
rP Pacemaker
Attached
Detach File
Event Timeline
Log In to Comment