diff --git a/crm/common/ipc.c b/crm/common/ipc.c index 46a2922155..6a023d444e 100644 --- a/crm/common/ipc.c +++ b/crm/common/ipc.c @@ -1,336 +1,335 @@ -/* $Id: ipc.c,v 1.19 2005/02/07 11:11:00 andrew Exp $ */ +/* $Id: ipc.c,v 1.20 2005/02/18 15:17:59 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef USE_LIBXML # include # include # include #endif #include #include #include #include /* frees msg */ gboolean send_ipc_message(IPC_Channel *ipc_client, HA_Message *msg) { gboolean all_is_good = TRUE; if (msg == NULL) { crm_err("cant send NULL message"); all_is_good = FALSE; } else if (ipc_client == NULL) { crm_err("cant send message without an IPC Channel"); all_is_good = FALSE; } else if(ipc_client->ch_status != IPC_CONNECT) { crm_err("IPC Channel is not connected"); all_is_good = FALSE; } else if(ipc_client->should_send_blocking == FALSE) { crm_verbose("Setting IPC Channel to blocking..." " least some messages get lost"); ipc_client->should_send_blocking = TRUE; } if(all_is_good && msg2ipcchan(msg, ipc_client) != HA_OK) { crm_err("Could not send IPC, message"); all_is_good = FALSE; if(ipc_client->ch_status != IPC_CONNECT) { crm_err("IPC Channel is no longer connected"); } } crm_log_message(all_is_good?LOG_DEBUG:LOG_ERR, msg); #ifdef MSG_LOG crm_log_message_adv(all_is_good?LOG_DEBUG:LOG_ERR, DEVEL_DIR"/outbound.ipc.log", msg); #endif crm_msg_del(msg); return all_is_good; } void default_ipc_connection_destroy(gpointer user_data) { return; } int init_server_ipc_comms( char *channel_name, gboolean (*channel_client_connect)(IPC_Channel *newclient,gpointer user_data), void (*channel_connection_destroy)(gpointer user_data)) { /* the clients wait channel is the other source of events. * This source delivers the clients connection events. * listen to this source at a relatively lower priority. */ char commpath[SOCKET_LEN]; IPC_WaitConnection *wait_ch; sprintf(commpath, WORKING_DIR "/%s", channel_name); wait_ch = wait_channel_init(commpath); if (wait_ch == NULL) { return 1; } G_main_add_IPC_WaitConnection( G_PRIORITY_LOW, wait_ch, NULL, FALSE, channel_client_connect, channel_name, channel_connection_destroy); crm_debug("Listening on: %s", commpath); return 0; } GCHSource* init_client_ipc_comms(const char *channel_name, gboolean (*dispatch)( IPC_Channel* source_data, gpointer user_data), void *client_data, IPC_Channel **ch) { IPC_Channel *a_ch = NULL; GCHSource *the_source = NULL; void *callback_data = client_data; a_ch = init_client_ipc_comms_nodispatch(channel_name); if(ch != NULL) { *ch = a_ch; if(callback_data == NULL) { callback_data = a_ch; } } if(a_ch == NULL) { crm_err("Setup of client connection failed," " not adding channel to mainloop"); return NULL; } if(dispatch == NULL) { crm_warn("No dispatch method specified..." "maybe you meant init_client_ipc_comms_nodispatch()?"); } else { crm_debug("Adding dispatch method to channel"); the_source = G_main_add_IPC_Channel( G_PRIORITY_LOW, a_ch, FALSE, dispatch, callback_data, default_ipc_connection_destroy); } return the_source; } IPC_Channel * init_client_ipc_comms_nodispatch(const char *channel_name) { IPC_Channel *ch; GHashTable *attrs; static char path[] = IPC_PATH_ATTR; char *commpath = NULL; int local_socket_len = 2; /* 2 = '/' + '\0' */ local_socket_len += strlen(channel_name); local_socket_len += strlen(WORKING_DIR); crm_malloc(commpath, sizeof(char)*local_socket_len); if(commpath != NULL) { sprintf(commpath, WORKING_DIR "/%s", channel_name); commpath[local_socket_len - 1] = '\0'; crm_debug("Attempting to talk on: %s", commpath); } attrs = g_hash_table_new(g_str_hash,g_str_equal); g_hash_table_insert(attrs, path, commpath); ch = ipc_channel_constructor(IPC_ANYTYPE, attrs); g_hash_table_destroy(attrs); if (ch == NULL) { - crm_crit("Could not access channel on: %s", commpath); + crm_err("Could not access channel on: %s", commpath); return NULL; } else if (ch->ops->initiate_connection(ch) != IPC_OK) { - crm_crit("Could not init comms on: %s", commpath); - fprintf(stderr, "Could not init comms on: %s\n", commpath); + crm_debug("Could not init comms on: %s", commpath); return NULL; } ch->ops->set_recv_qlen(ch, 100); ch->ops->set_send_qlen(ch, 100); ch->should_send_blocking = TRUE; crm_debug("Processing of %s complete", commpath); return ch; } IPC_WaitConnection * wait_channel_init(char daemonsocket[]) { IPC_WaitConnection *wait_ch; mode_t mask; char path[] = IPC_PATH_ATTR; GHashTable * attrs; attrs = g_hash_table_new(g_str_hash,g_str_equal); g_hash_table_insert(attrs, path, daemonsocket); mask = umask(0); wait_ch = ipc_wait_conn_constructor(IPC_ANYTYPE, attrs); if (wait_ch == NULL) { cl_perror("Can't create wait channel of type %s", IPC_ANYTYPE); exit(1); } mask = umask(mask); g_hash_table_destroy(attrs); return wait_ch; } gboolean subsystem_msg_dispatch(IPC_Channel *sender, void *user_data) { int lpc = 0; IPC_Message *msg = NULL; ha_msg_input_t *new_input = NULL; gboolean all_is_well = TRUE; const char *sys_to; const char *task; while(sender->ops->is_message_pending(sender)) { gboolean process = FALSE; if (sender->ch_status == IPC_DISCONNECT) { /* The message which was pending for us is that * the IPC status is now IPC_DISCONNECT */ break; } if (sender->ops->recv(sender, &msg) != IPC_OK) { perror("Receive failure:"); return !all_is_well; } if (msg == NULL) { crm_err("No message this time"); continue; } lpc++; new_input = new_ipc_msg_input(msg); msg->msg_done(msg); crm_log_message(LOG_MSG, new_input->msg); sys_to = cl_get_string(new_input->msg, F_CRM_SYS_TO); task = cl_get_string(new_input->msg, F_CRM_TASK); if(safe_str_eq(task, CRM_OP_HELLO)) { process = TRUE; } else if(sys_to == NULL) { crm_err("Value of %s was NULL!!", F_CRM_SYS_TO); } else if(task == NULL) { crm_err("Value of %s was NULL!!", F_CRM_TASK); } else { process = TRUE; } if(process){ gboolean (*process_function) (HA_Message *msg, crm_data_t *data, IPC_Channel *sender) = NULL; process_function = user_data; #ifdef MSG_LOG crm_log_message_adv( LOG_MSG, NULL, new_input->msg); #endif if(FALSE == process_function( new_input->msg, new_input->xml, sender)) { crm_warn("Received a message destined for %s" " by mistake", sys_to); } } else { #ifdef MSG_LOG crm_log_message_adv( LOG_ERR, NULL, new_input->msg); #endif } delete_ha_msg_input(new_input); msg = NULL; } /* clean up after a break */ if(msg != NULL) msg->msg_done(msg); crm_verbose("Processed %d messages", lpc); if (sender->ch_status == IPC_DISCONNECT) { crm_err("The server has left us: Shutting down...NOW"); exit(1); /* shutdown properly later */ return !all_is_well; } return all_is_well; } diff --git a/crm/crmd/cib.c b/crm/crmd/cib.c index 1a1a19b524..d923bf5c4d 100644 --- a/crm/crmd/cib.c +++ b/crm/crmd/cib.c @@ -1,285 +1,285 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include /* for access */ #include #include #include /* for calls to open */ #include /* for calls to open */ #include /* for calls to open */ #include /* for getpwuid */ #include /* for initgroups */ #include /* for getrlimit */ #include /* for getrlimit */ #include #include #include #include #include #include #include #include struct crm_subsystem_s *cib_subsystem = NULL; void crmd_update_confirm(const char *event, HA_Message *msg); int cib_retries = 0; /* A_CIB_STOP, A_CIB_START, A_CIB_RESTART, */ enum crmd_fsa_input do_cib_control(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { enum crmd_fsa_input result = I_NULL; struct crm_subsystem_s *this_subsys = cib_subsystem; long long stop_actions = A_CIB_STOP; long long start_actions = A_CIB_START; if(action & stop_actions) { if(fsa_cib_conn != NULL && fsa_cib_conn->state != cib_disconnected) { fsa_cib_conn->cmds->signoff(fsa_cib_conn); } } if(action & start_actions) { if(cur_state != S_STOPPING) { if(fsa_cib_conn == NULL) { fsa_cib_conn = cib_new(); } if(fsa_cib_conn->cmds->signon( fsa_cib_conn, cib_command) != cib_ok) { - crm_err("Could not connect to the CIB service"); + crm_warn("Could not connect to the CIB service"); } else if(fsa_cib_conn->cmds->add_notify_callback( fsa_cib_conn, T_CIB_UPDATE_CONFIRM, crmd_update_confirm) != cib_ok) { crm_err("Could not set notify callback"); } else if(fsa_cib_conn->cmds->set_connection_dnotify( fsa_cib_conn, crmd_cib_connection_destroy)!=cib_ok){ crm_err("Could not set dnotify callback"); } else { set_bit_inplace( fsa_input_register, R_CIB_CONNECTED); } if(is_set(fsa_input_register, R_CIB_CONNECTED) == FALSE) { crm_warn("Could complete CIB registration %d" " time... retry", cib_retries); if(++cib_retries < 30) { startTimer(wait_timer); crmd_fsa_stall(); } else { crm_err("Could complete CIB" " registration %d times..." " hard error", cib_retries); register_fsa_error( C_FSA_INTERNAL, I_ERROR, NULL); } } else { cib_retries = 0; } } else { crm_info("Ignoring request to start %s after shutdown", this_subsys->name); } } return result; } /* A_CIB_INVOKE, A_CIB_BUMPGEN, A_UPDATE_NODESTATUS */ enum crmd_fsa_input do_cib_invoke(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { HA_Message *answer = NULL; enum crmd_fsa_input result = I_NULL; ha_msg_input_t *cib_msg = fsa_typed_data(fsa_dt_ha_msg); const char *sys_from = cl_get_string(cib_msg->msg, F_CRM_SYS_FROM); if(action & A_CIB_INVOKE) { if(safe_str_eq(sys_from, CRM_SYSTEM_CRMD)) { action = A_CIB_INVOKE_LOCAL; } else if(safe_str_eq(sys_from, CRM_SYSTEM_DC)) { action = A_CIB_INVOKE_LOCAL; } } if(action & A_CIB_INVOKE || action & A_CIB_INVOKE_LOCAL) { int call_options = 0; enum cib_errors rc = cib_ok; crm_data_t *cib_frag = NULL; const char *section = NULL; const char *op = cl_get_string(cib_msg->msg, F_CRM_TASK); section = cl_get_string(cib_msg->msg, F_CIB_SECTION); ha_msg_value_int(cib_msg->msg, F_CIB_CALLOPTS, &call_options); crm_log_message(LOG_MSG, cib_msg->msg); crm_xml_devel(cib_msg->xml, "[CIB update]"); if(op == NULL) { crm_err("Invalid CIB Message"); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); return I_NULL; } rc = fsa_cib_conn->cmds->variant_op( fsa_cib_conn, op, NULL, section, cib_msg->xml, &cib_frag, call_options); if(rc != cib_ok || (action & A_CIB_INVOKE)) { answer = create_reply(cib_msg->msg, cib_msg->xml); ha_msg_add(answer,XML_ATTR_RESULT,cib_error2string(rc)); } if(action & A_CIB_INVOKE) { if(relay_message(answer, TRUE) == FALSE) { crm_err("Confused what to do with cib result"); crm_log_message(LOG_ERR, answer); crm_msg_del(answer); result = I_ERROR; } } else if(rc != cib_ok) { ha_msg_input_t *input = NULL; crm_err("Internal CRM/CIB command from %s: %s", msg_data->origin, cib_error2string(rc)); crm_log_message(LOG_ERR, cib_msg->msg); crm_xml_err(cib_msg->xml, "Command data:"); crm_log_message(LOG_ERR, answer); input = new_ha_msg_input(answer); register_fsa_input(C_FSA_INTERNAL, I_ERROR, input); crm_msg_del(answer); delete_ha_msg_input(input); } return result; } else { crm_err("Unexpected action %s in %s", fsa_action2string(action), __FUNCTION__); } return I_NULL; } /* frees fragment as part of delete_ha_msg_input() */ void update_local_cib_adv( crm_data_t *msg_data, gboolean do_now, const char *raised_from) { HA_Message *msg = NULL; ha_msg_input_t *fsa_input = NULL; int call_options = cib_sync_call; CRM_DEV_ASSERT(msg_data != NULL); crm_malloc(fsa_input, sizeof(ha_msg_input_t)); msg = create_request(CRM_OP_CIB_UPDATE, msg_data, NULL, CRM_SYSTEM_CIB, CRM_SYSTEM_CRMD, NULL); ha_msg_add(msg, F_CIB_SECTION, crm_element_value(msg_data, XML_ATTR_SECTION)); ha_msg_add_int(msg, F_CIB_CALLOPTS, call_options); ha_msg_add(msg, "call_origin", raised_from); fsa_input->msg = msg; fsa_input->xml = msg_data; CRM_DEV_ASSERT(cib_ok == fsa_cib_conn->cmds->is_master(fsa_cib_conn)); if(do_now == FALSE) { crm_debug("Registering event with FSA"); register_fsa_input_adv(C_FSA_INTERNAL, I_CIB_OP, fsa_input, 0, FALSE, raised_from); } else { fsa_data_t *op_data = NULL; crm_debug("Invoking CIB handler directly"); crm_malloc(op_data, sizeof(fsa_data_t)); op_data->fsa_cause = C_FSA_INTERNAL; op_data->fsa_input = I_CIB_OP; op_data->origin = raised_from; op_data->data = fsa_input; op_data->data_type = fsa_dt_ha_msg; do_cib_invoke(A_CIB_INVOKE_LOCAL, C_FSA_INTERNAL, fsa_state, I_CIB_OP, op_data); crm_free(op_data); crm_debug("CIB handler completed"); } crm_debug("deleting input"); #if 0 delete_ha_msg_input(fsa_input); #else crm_msg_del(fsa_input->msg); crm_free(fsa_input); /* BUG: it should be possible to free this but for some reason I cant */ /* free_xml(fsa_input->xml); */ #endif crm_debug("deleted input"); } void crmd_update_confirm(const char *event, HA_Message *msg) { int rc = -1; const char *op = cl_get_string(msg, F_CIB_OPERATION); ha_msg_value_int(msg, F_CIB_RC, &rc); if(rc != cib_ok) { crm_trace("Ignoring failed CIB update"); return; } if(safe_str_eq(op, CRM_OP_CIB_ERASE)) { /* regenerate everyone's state and our node entry */ register_fsa_input(C_UNKNOWN, I_ELECTION_DC, NULL); } } diff --git a/lib/crm/cib/cib_native.c b/lib/crm/cib/cib_native.c index 7d994e0ab8..5403cc509c 100755 --- a/lib/crm/cib/cib_native.c +++ b/lib/crm/cib/cib_native.c @@ -1,586 +1,586 @@ /* * Copyright (c) 2004 International Business Machines * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ #include #include #include #include #include #include #include #include #include #include #include #include #include typedef struct cib_native_opaque_s { IPC_Channel *command_channel; IPC_Channel *callback_channel; GCHSource *callback_source; } cib_native_opaque_t; int cib_native_perform_op( cib_t *cib, const char *op, const char *host, const char *section, crm_data_t *data, crm_data_t **output_data, int call_options); int cib_native_signon(cib_t* cib, enum cib_conn_type type); int cib_native_signoff(cib_t* cib); int cib_native_free(cib_t* cib); IPC_Channel *cib_native_channel(cib_t* cib); int cib_native_inputfd(cib_t* cib); gboolean cib_native_msgready(cib_t* cib); int cib_native_rcvmsg(cib_t* cib, int blocking); gboolean cib_native_dispatch(IPC_Channel *channel, gpointer user_data); cib_t *cib_native_new (cib_t *cib); int cib_native_set_connection_dnotify( cib_t *cib, void (*dnotify)(gpointer user_data)); void cib_native_notify(gpointer data, gpointer user_data); void cib_native_callback(cib_t *cib, struct ha_msg *msg); cib_t* cib_native_new (cib_t *cib) { cib_native_opaque_t *native = NULL; crm_malloc(cib->variant_opaque, sizeof(cib_native_opaque_t)); native = cib->variant_opaque; native->command_channel = NULL; native->callback_channel = NULL; /* assign variant specific ops*/ cib->cmds->variant_op = cib_native_perform_op; cib->cmds->signon = cib_native_signon; cib->cmds->signoff = cib_native_signoff; cib->cmds->free = cib_native_free; cib->cmds->channel = cib_native_channel; cib->cmds->inputfd = cib_native_inputfd; cib->cmds->msgready = cib_native_msgready; cib->cmds->rcvmsg = cib_native_rcvmsg; cib->cmds->dispatch = cib_native_dispatch; cib->cmds->set_connection_dnotify = cib_native_set_connection_dnotify; return cib; } int cib_native_signon(cib_t* cib, enum cib_conn_type type) { int rc = cib_ok; char *uuid_ticket = NULL; struct ha_msg *reg_msg = NULL; cib_native_opaque_t *native = cib->variant_opaque; crm_trace("Connecting command channel"); if(type == cib_command) { cib->state = cib_connected_command; native->command_channel = init_client_ipc_comms_nodispatch( cib_channel_rw); } else { cib->state = cib_connected_query; native->command_channel = init_client_ipc_comms_nodispatch( cib_channel_ro); } if(native->command_channel == NULL) { - crm_err("Connection to command channel failed"); + crm_debug("Connection to command channel failed"); rc = cib_connection; } else if(native->command_channel->ch_status != IPC_CONNECT) { crm_err("Connection may have succeeded," " but authentication to command channel failed"); rc = cib_authentication; } if(rc == cib_ok) { crm_trace("Connecting callback channel"); native->callback_source = init_client_ipc_comms( cib_channel_callback, cib_native_dispatch, cib, &(native->callback_channel)); if(native->callback_channel == NULL) { - crm_err("Connection to callback channel failed"); + crm_debug("Connection to callback channel failed"); rc = cib_connection; } else if(native->callback_source == NULL) { crm_err("Callback source not recorded"); rc = cib_connection; } } else if(rc == cib_ok && native->callback_channel->ch_status != IPC_CONNECT) { crm_err("Connection may have succeeded," " but authentication to callback channel failed"); rc = cib_authentication; } if(rc == cib_ok) { const char *msg_type = NULL; crm_trace("Waiting for msg on command channel"); reg_msg = msgfromIPC_noauth(native->command_channel); msg_type = cl_get_string(reg_msg, F_CIB_OPERATION); if(safe_str_neq(msg_type, CRM_OP_REGISTER) ) { crm_err("Invalid registration message: %s", msg_type); rc = cib_registration_msg; } else { const char *tmp_ticket = NULL; crm_trace("Retrieving callback channel ticket"); tmp_ticket = cl_get_string( reg_msg, F_CIB_CALLBACK_TOKEN); if(tmp_ticket == NULL) { rc = cib_callback_token; } else { uuid_ticket = crm_strdup(tmp_ticket); } } crm_msg_del(reg_msg); reg_msg = NULL; } if(rc == cib_ok) { crm_trace("Registering callback channel with ticket %s", crm_str(uuid_ticket)); reg_msg = ha_msg_new(2); ha_msg_add(reg_msg, F_CIB_OPERATION, CRM_OP_REGISTER); ha_msg_add(reg_msg, F_CIB_CALLBACK_TOKEN, uuid_ticket); CRM_DEV_ASSERT(native->command_channel->should_send_blocking); if(msg2ipcchan(reg_msg, native->callback_channel) != HA_OK) { rc = cib_callback_register; } crm_free(uuid_ticket); crm_msg_del(reg_msg); } if(rc == cib_ok) { crm_trace("wait for the callback channel setup to complete"); reg_msg = msgfromIPC_noauth(native->callback_channel); if(reg_msg == NULL) { crm_err("Connection to callback channel not maintined"); rc = cib_connection; } crm_msg_del(reg_msg); } if(rc == cib_ok) { crm_info("Connection to CIB successful"); return cib_ok; } crm_err("Connection to CIB failed: %s", cib_error2string(rc)); cib_native_signoff(cib); return rc; } int cib_native_signoff(cib_t* cib) { cib_native_opaque_t *native = cib->variant_opaque; crm_info("Signing out of the CIB Service"); /* close channels */ if (native->command_channel != NULL) { native->command_channel->ops->destroy( native->command_channel); native->command_channel = NULL; } if (native->callback_channel != NULL) { G_main_del_IPC_Channel(native->callback_source); #ifdef BUG native->callback_channel->ops->destroy( native->callback_channel); #endif native->callback_channel = NULL; } cib->state = cib_disconnected; cib->type = cib_none; return cib_ok; } int cib_native_free (cib_t* cib) { int rc = cib_ok; crm_warn("Freeing CIB"); if(cib->state != cib_disconnected) { rc = cib_native_signoff(cib); if(rc == cib_ok) { crm_free(cib); } } return rc; } IPC_Channel * cib_native_channel(cib_t* cib) { cib_native_opaque_t *native = NULL; if(cib == NULL) { crm_err("Missing cib object"); return NULL; } native = cib->variant_opaque; if(native != NULL) { return native->callback_channel; } crm_err("couldnt find variant specific data in %p", cib); return NULL; } int cib_native_inputfd(cib_t* cib) { IPC_Channel *ch = cib_native_channel(cib); return ch->ops->get_recv_select_fd(ch); } int cib_native_perform_op( cib_t *cib, const char *op, const char *host, const char *section, crm_data_t *data, crm_data_t **output_data, int call_options) { int rc = HA_OK; struct ha_msg *op_msg = NULL; struct ha_msg *op_reply = NULL; cib_native_opaque_t *native = cib->variant_opaque; if(cib->state == cib_disconnected) { return cib_not_connected; } if(output_data != NULL) { *output_data = NULL; } if(op == NULL) { crm_err("No operation specified"); rc = cib_operation; } op_msg = ha_msg_new(7); if (op_msg == NULL) { crm_err("No memory to create HA_Message"); return cib_create_msg; } if(rc == HA_OK) { rc = ha_msg_add(op_msg, F_TYPE, "cib"); } if(rc == HA_OK) { rc = ha_msg_add(op_msg, F_CIB_OPERATION, op); } if(rc == HA_OK && host != NULL) { CRM_DEV_ASSERT(cl_is_allocated(host) == 1); rc = ha_msg_add(op_msg, F_CIB_HOST, host); } if(rc == HA_OK && section != NULL) { rc = ha_msg_add(op_msg, F_CIB_SECTION, section); } if(rc == HA_OK) { rc = ha_msg_add_int(op_msg, F_CIB_CALLID, cib->call_id); } if(rc == HA_OK) { crm_trace("Sending call options: %.8lx, %d", (long)call_options, call_options); rc = ha_msg_add_int(op_msg, F_CIB_CALLOPTS, call_options); } if(rc == HA_OK && data != NULL) { add_message_xml(op_msg, F_CIB_CALLDATA, data); } if (rc != HA_OK) { crm_err("Failed to create CIB operation message"); crm_log_message(LOG_ERR, op_msg); crm_msg_del(op_msg); return cib_create_msg; } cib->call_id++; crm_debug("Sending %s message to CIB service", op); crm_log_message(LOG_MSG, op_msg); rc = msg2ipcchan(op_msg, native->command_channel); if (rc != HA_OK) { crm_err("Sending message to CIB service FAILED: %d", rc); CRM_DEV_ASSERT(native->command_channel->should_send_blocking); crm_log_message(LOG_ERR, op_msg); crm_msg_del(op_msg); return cib_send_failed; } else { crm_debug("Message sent"); } crm_msg_del(op_msg); op_msg = NULL; if((call_options & cib_discard_reply)) { crm_debug("Discarding reply"); return cib_ok; } else if(!(call_options & cib_sync_call)) { crm_debug("Async call, returning"); return cib->call_id - 1; } crm_debug("Waiting for a syncronous reply"); op_reply = msgfromIPC_noauth(native->command_channel); if (op_reply == NULL) { crm_err("No reply message"); return cib_reply_failed; } crm_debug("Syncronous reply recieved"); crm_log_message(LOG_MSG, op_reply); rc = cib_ok; /* Start processing the reply... */ if(ha_msg_value_int(op_reply, F_CIB_RC, &rc) != HA_OK) { rc = cib_return_code; } if(output_data == NULL) { /* do nothing more */ } else if(!(call_options & cib_discard_reply)) { *output_data = get_message_xml(op_reply, F_CIB_CALLDATA); if(*output_data == NULL) { crm_debug("No output in reply to \"%s\" command %d", op, cib->call_id - 1); } } crm_msg_del(op_reply); return rc; } gboolean cib_native_msgready(cib_t* cib) { IPC_Channel *ch = NULL; cib_native_opaque_t *native = NULL; if (cib == NULL) { crm_err("No CIB!"); return FALSE; } native = cib->variant_opaque; ch = cib_native_channel(cib); if (ch == NULL) { crm_err("No channel"); return FALSE; } if(native->command_channel->ops->is_message_pending( native->command_channel)) { crm_verbose("Message pending on command channel"); } if(native->callback_channel->ops->is_message_pending( native->callback_channel)) { crm_trace("Message pending on callback channel"); return TRUE; } crm_verbose("No message pending"); return FALSE; } int cib_native_rcvmsg(cib_t* cib, int blocking) { const char *type = NULL; struct ha_msg* msg = NULL; IPC_Channel *ch = cib_native_channel(cib); /* if it is not blocking mode and no message in the channel, return */ if (blocking == 0 && cib_native_msgready(cib) == FALSE) { crm_debug("No message ready and non-blocking..."); return 0; } else if (cib_native_msgready(cib) == FALSE) { crm_debug("Waiting for message from CIB service..."); ch->ops->waitin(ch); } /* get the message */ msg = msgfromIPC_noauth(ch); if (msg == NULL) { crm_warn("Received a NULL msg from CIB service."); return 0; } /* do callbacks */ type = cl_get_string(msg, F_TYPE); crm_trace("Activating %s callbacks...", type); if(safe_str_eq(type, T_CIB)) { cib_native_callback(cib, msg); } else if(safe_str_eq(type, T_CIB_NOTIFY)) { g_list_foreach(cib->notify_list, cib_native_notify, msg); } else { crm_err("Unknown message type: %s", type); } crm_msg_del(msg); return 1; } void cib_native_callback(cib_t *cib, struct ha_msg *msg) { int rc = 0; int call_id = 0; crm_data_t *output = NULL; if(cib->op_callback == NULL) { crm_debug("No OP callback set, ignoring reply"); return; } ha_msg_value_int(msg, F_CIB_CALLID, &call_id); ha_msg_value_int(msg, F_CIB_RC, &rc); output = get_message_xml(msg, F_CIB_CALLDATA); cib->op_callback(msg, call_id, rc, output); crm_trace("OP callback activated."); } void cib_native_notify(gpointer data, gpointer user_data) { struct ha_msg *msg = user_data; cib_notify_client_t *entry = data; const char *event = NULL; if(msg == NULL) { crm_warn("Skipping callback - NULL message"); return; } event = cl_get_string(msg, F_SUBTYPE); if(entry == NULL) { crm_warn("Skipping callback - NULL callback client"); return; } else if(entry->callback == NULL) { crm_warn("Skipping callback - NULL callback"); return; } else if(safe_str_neq(entry->event, event)) { crm_trace("Skipping callback - event mismatch %p/%s vs. %s", entry, entry->event, event); return; } crm_trace("Invoking callback for %p/%s event...", entry, event); entry->callback(event, msg); crm_trace("Callback invoked..."); } gboolean cib_native_dispatch(IPC_Channel *channel, gpointer user_data) { int lpc = 0; cib_t *cib = user_data; crm_debug("Received callback"); if(user_data == NULL){ crm_err("user_data field must contain the CIB struct"); return FALSE; } while(cib_native_msgready(cib)) { lpc++; /* invoke the callbacks but dont block */ if(cib_native_rcvmsg(cib, 0) < 1) { break; } } crm_debug("%d CIB messages dispatched", lpc); if (channel && (channel->ch_status == IPC_DISCONNECT)) { crm_crit("Lost connection to the CIB service."); return FALSE; } return TRUE; } int cib_native_set_connection_dnotify( cib_t *cib, void (*dnotify)(gpointer user_data)) { cib_native_opaque_t *native = NULL; if (cib == NULL) { crm_err("No CIB!"); return FALSE; } native = cib->variant_opaque; if(dnotify == NULL) { crm_warn("Setting dnotify back to default value"); set_IPC_Channel_dnotify(native->callback_source, default_ipc_connection_destroy); } else { crm_debug("Setting dnotify"); set_IPC_Channel_dnotify(native->callback_source, dnotify); } return cib_ok; }