diff --git a/crm/admin/cibadmin.c b/crm/admin/cibadmin.c index 2655058f1f..fdaca23d53 100644 --- a/crm/admin/cibadmin.c +++ b/crm/admin/cibadmin.c @@ -1,574 +1,574 @@ -/* $Id: cibadmin.c,v 1.25 2005/02/20 14:38:54 andrew Exp $ */ +/* $Id: cibadmin.c,v 1.26 2005/02/21 13:13:44 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* someone complaining about _ha_msg_mod not being found */ #include int exit_code = cib_ok; int message_timer_id = -1; int message_timeout_ms = 30*1000; GMainLoop *mainloop = NULL; const char *crm_system_name = "cibadmin"; IPC_Channel *crmd_channel = NULL; const char *host = NULL; void usage(const char *cmd, int exit_status); enum cib_errors do_init(void); int do_work(const char *xml_text, int command_options, crm_data_t **output); gboolean admin_msg_callback(IPC_Channel * source_data, void *private_data); crm_data_t *handleCibMod(const char *xml); gboolean admin_message_timeout(gpointer data); void cib_connection_destroy(gpointer user_data); void cibadmin_op_callback( const HA_Message *msg, int call_id, int rc, crm_data_t *output); int command_options = 0; const char *cib_action = NULL; typedef struct str_list_s { int num_items; char *value; struct str_list_s *next; } str_list_t; char *id = NULL; char *this_msg_reference = NULL; char *obj_type = NULL; char *clear = NULL; char *status = NULL; char *migrate_from = NULL; char *migrate_res = NULL; char *subtype = NULL; char *reset = NULL; int request_id = 0; int operation_status = 0; cib_t *the_cib = NULL; #define OPTARGS "V?i:o:QDUCEX:t:Srwlsh:MB" int main(int argc, char **argv) { int option_index = 0; int argerr = 0; int flag; char *admin_input_xml = NULL; crm_data_t *output = NULL; static struct option long_options[] = { /* Top-level Options */ {CRM_OP_CIB_ERASE, 0, 0, 'E'}, {CRM_OP_CIB_QUERY, 0, 0, 'Q'}, {CRM_OP_CIB_CREATE, 0, 0, 'C'}, {CRM_OP_CIB_REPLACE, 0, 0, 'R'}, {CRM_OP_CIB_UPDATE, 0, 0, 'U'}, {CRM_OP_CIB_DELETE, 0, 0, 'D'}, {CRM_OP_CIB_BUMP, 0, 0, 'B'}, {CRM_OP_CIB_SYNC, 0, 0, 'S'}, {CRM_OP_CIB_SLAVE, 0, 0, 'r'}, {CRM_OP_CIB_MASTER, 0, 0, 'w'}, {CRM_OP_CIB_ISMASTER,0, 0, 'M'}, {"local", 0, 0, 'l'}, {"sync-call", 0, 0, 's'}, {"host", 0, 0, 'h'}, {F_CRM_DATA, 1, 0, 'X'}, {"verbose", 0, 0, 'V'}, {"help", 0, 0, '?'}, {"reference", 1, 0, 0}, {XML_ATTR_TIMEOUT, 1, 0, 't'}, /* common options */ {XML_ATTR_ID, 1, 0, 'i'}, {"obj_type", 1, 0, 'o'}, {0, 0, 0, 0} }; crm_log_init(crm_system_name); if(argc < 2) { usage(crm_system_name, LSB_EXIT_EINVAL); } while (1) { flag = getopt_long(argc, argv, OPTARGS, long_options, &option_index); if (flag == -1) break; switch(flag) { case 0: printf("option %s", long_options[option_index].name); if (optarg) printf(" with arg %s", optarg); printf("\n"); if (safe_str_eq("reference", long_options[option_index].name)) { this_msg_reference = crm_strdup(optarg); } else { printf("Long option (--%s) is not (yet?) properly supported\n", long_options[option_index].name); ++argerr; } break; case 't': message_timeout_ms = atoi(optarg); if(message_timeout_ms < 1) { message_timeout_ms = 30*1000; } break; case 'E': cib_action = CRM_OP_CIB_ERASE; break; case 'Q': cib_action = CRM_OP_CIB_QUERY; break; case 'S': cib_action = CRM_OP_CIB_SYNC; break; case 'U': cib_action = CRM_OP_CIB_UPDATE; break; case 'R': cib_action = CRM_OP_CIB_REPLACE; break; case 'C': cib_action = CRM_OP_CIB_CREATE; break; case 'D': cib_action = CRM_OP_CIB_DELETE; break; case 'M': cib_action = CRM_OP_CIB_ISMASTER; command_options |= cib_scope_local; break; case 'B': cib_action = CRM_OP_CIB_BUMP; break; case 'r': cib_action = CRM_OP_CIB_SLAVE; break; case 'w': cib_action = CRM_OP_CIB_MASTER; command_options |= cib_scope_local; break; case 'V': command_options = command_options | cib_verbose; cl_log_enable_stderr(TRUE); alter_debug(DEBUG_INC); break; case '?': usage(crm_system_name, LSB_EXIT_OK); break; case 'i': crm_verbose("Option %c => %s", flag, optarg); id = crm_strdup(optarg); break; case 'o': crm_verbose("Option %c => %s", flag, optarg); obj_type = crm_strdup(optarg); break; case 'X': admin_input_xml = crm_strdup(optarg); break; case 'h': host = crm_strdup(optarg); break; case 'l': command_options |= cib_scope_local; break; case 's': command_options |= cib_sync_call; break; default: printf("Argument code 0%o (%c)" " is not (?yet?) supported\n", flag, flag); ++argerr; break; } } if (optind < argc) { printf("non-option ARGV-elements: "); while (optind < argc) printf("%s ", argv[optind++]); printf("\n"); } if (optind > argc) { ++argerr; } if(cib_action == NULL) { usage(crm_system_name, cib_operation); } if (argerr) { usage(crm_system_name, LSB_EXIT_GENERIC); } exit_code = do_init(); if(exit_code != cib_ok) { crm_err("Init failed, could not perform requested operations"); fprintf(stderr, "Init failed, could not perform requested operations\n"); return -exit_code; } exit_code = do_work(admin_input_xml, command_options, &output); if (exit_code > 0) { /* wait for the reply by creating a mainloop and running it until * the callbacks are invoked... */ IPC_Channel *ch = the_cib->cmds->channel(the_cib); request_id = exit_code; if(ch == NULL) { crm_err("Connection to CIB is corrupt"); return 2; } mainloop = g_main_new(FALSE); crm_devel("Setting operation timeout to %dms", message_timeout_ms); message_timer_id = Gmain_timeout_add( message_timeout_ms, admin_message_timeout, NULL); crm_devel("%s waiting for reply from the local CIB", crm_system_name); crm_info("Starting mainloop"); g_main_run(mainloop); } else if(exit_code < 0) { crm_err("Call failed: %s", cib_error2string(exit_code)); fprintf(stderr, "Call failed: %s\n", cib_error2string(exit_code)); operation_status = exit_code; } if(output != NULL) { char *buffer = dump_xml_formatted(output); fprintf(stdout, "%s", crm_str(buffer)); crm_free(buffer); } crm_devel("%s exiting normally", crm_system_name); return -exit_code; } crm_data_t* handleCibMod(const char *xml) { const char *attr_name = NULL; const char *attr_value = NULL; crm_data_t *fragment = NULL; crm_data_t *cib_object = NULL; if(xml == NULL) { cib_object = file2xml(stdin); } else { cib_object = string2xml(xml); } if(cib_object == NULL) { return NULL; } attr_name = XML_ATTR_ID; attr_value = crm_element_value(cib_object, attr_name); if(attr_name == NULL || strlen(attr_name) == 0) { crm_err("No value for %s specified.", attr_name); return NULL; } crm_trace("Object creation complete"); /* create the cib request */ fragment = create_cib_fragment(cib_object, NULL); return fragment; } int do_work(const char *admin_input_xml, int call_options, crm_data_t **output) { /* construct the request */ crm_data_t *msg_data = NULL; char *obj_type_parent = NULL; obj_type_parent = cib_pluralSection(obj_type); if(strcmp(CRM_OP_CIB_QUERY, cib_action) == 0) { crm_verbose("Querying the CIB for section: %s", obj_type_parent); return the_cib->cmds->query_from( the_cib, host, obj_type_parent, output, call_options); } else if (strcmp(CRM_OP_CIB_ERASE, cib_action) == 0) { crm_trace("CIB Erase op in progress"); return the_cib->cmds->erase(the_cib, output, call_options); } else if (strcmp(CRM_OP_CIB_CREATE, cib_action) == 0) { enum cib_errors rc = cib_ok; crm_trace("Performing %s op...", cib_action); msg_data = handleCibMod(admin_input_xml); rc = the_cib->cmds->create( the_cib, obj_type_parent, msg_data, output, call_options); free_xml(msg_data); return rc; } else if (strcmp(CRM_OP_CIB_UPDATE, cib_action) == 0) { enum cib_errors rc = cib_ok; crm_trace("Performing %s op...", cib_action); msg_data = handleCibMod(admin_input_xml); rc = the_cib->cmds->modify( the_cib, obj_type_parent, msg_data, output, call_options); free_xml(msg_data); return rc; } else if (strcmp(CRM_OP_CIB_DELETE, cib_action) == 0) { enum cib_errors rc = cib_ok; crm_trace("Performing %s op...", cib_action); msg_data = handleCibMod(admin_input_xml); rc = the_cib->cmds->delete( the_cib, obj_type_parent, msg_data, output, call_options); free_xml(msg_data); return rc; } else if (strcmp(CRM_OP_CIB_SYNC, cib_action) == 0) { crm_trace("Performing %s op...", cib_action); return the_cib->cmds->sync_from( the_cib, host, obj_type_parent, call_options); } else if (strcmp(CRM_OP_CIB_SLAVE, cib_action) == 0 && (call_options ^ cib_scope_local) ) { crm_trace("Performing %s op on all nodes...", cib_action); return the_cib->cmds->set_slave_all(the_cib, call_options); } else if (strcmp(CRM_OP_CIB_MASTER, cib_action) == 0) { crm_trace("Performing %s op on all nodes...", cib_action); return the_cib->cmds->set_master(the_cib, call_options); } else if(cib_action != NULL) { crm_trace("Passing \"%s\" to variant_op...", cib_action); return the_cib->cmds->variant_op( the_cib, cib_action, host, obj_type_parent, NULL, output, call_options); } else { crm_err("You must specify an operation"); } return cib_operation; } enum cib_errors do_init(void) { enum cib_errors rc = cib_ok; #ifdef USE_LIBXML /* docs say only do this once, but in their code they do it every time! */ xmlInitParser(); #endif the_cib = cib_new(); - rc = the_cib->cmds->signon(the_cib, cib_command); + rc = the_cib->cmds->signon(the_cib, crm_system_name, cib_command); if(rc != cib_ok) { crm_err("Signon to CIB failed: %s", cib_error2string(rc)); fprintf(stderr, "Signon to CIB failed: %s\n", cib_error2string(rc)); } else { rc = the_cib->cmds->set_op_callback( the_cib, cibadmin_op_callback); if(rc != cib_ok) { crm_err("Failed to set callback: %s", cib_error2string(rc)); fprintf(stderr,"Failed to set callback: %s\n", cib_error2string(rc)); } } return rc; } void usage(const char *cmd, int exit_status) { FILE *stream; stream = exit_status != 0 ? stderr : stdout; fprintf(stream, "usage: %s [-?Vio] command\n" "\twhere necessary, XML data will be expected using -X" " or on STDIN if -X isnt specified\n", cmd); fprintf(stream, "Options\n"); fprintf(stream, "\t--%s (-%c) \tid of the object being operated on\n", XML_ATTR_ID, 'i'); fprintf(stream, "\t--%s (-%c) \tobject type being operated on\n", "obj_type", 'o'); fprintf(stream, "\t--%s (-%c)\tturn on debug info." " additional instance increase verbosity\n", "verbose", 'V'); fprintf(stream, "\t--%s (-%c)\tthis help message\n", "help", '?'); fprintf(stream, "\nCommands\n"); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_ERASE, 'E'); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_QUERY, 'Q'); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_CREATE, 'C'); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_REPLACE,'R'); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_UPDATE, 'U'); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_DELETE, 'D'); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_BUMP, 'B'); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_ISMASTER,'M'); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_SYNC, 'S'); fprintf(stream, "\nXML data\n"); fprintf(stream, "\t--%s (-%c) \t\n", F_CRM_DATA, 'X'); fprintf(stream, "\nAdvanced Options\n"); fprintf(stream, "\t--%s (-%c)\tsend command to specified host." " Applies to %s and %s commands only\n", "host", 'h', CRM_OP_CIB_QUERY, CRM_OP_CIB_SYNC); fprintf(stream, "\t--%s (-%c)\tcommand only takes effect locally" " on the specified host\n", "local", 'l'); fprintf(stream, "\t--%s (-%c)\twait for call to complete before" " returning\n", "sync-call", 's'); fflush(stream); exit(exit_status); } gboolean admin_message_timeout(gpointer data) { if(safe_str_eq(cib_action, CRM_OP_CIB_SLAVE)) { exit_code = cib_ok; fprintf(stdout, "CIB service(s) are in slave mode.\n"); } else { exit_code = cib_reply_failed; fprintf(stderr, "No messages received in %d seconds.. aborting\n", (int)message_timeout_ms/1000); crm_err("No messages received in %d seconds", (int)message_timeout_ms/1000); } g_main_quit(mainloop); return FALSE; } void cib_connection_destroy(gpointer user_data) { crm_err("Connection to the CIB terminated... exiting"); g_main_quit(mainloop); return; } void cibadmin_op_callback( const HA_Message *msg, int call_id, int rc, crm_data_t *output) { char *admin_input_xml = NULL; crm_info("our callback was invoked"); crm_log_message(LOG_MSG, msg); exit_code = rc; if(output != NULL) { admin_input_xml = dump_xml_formatted(output); } if(safe_str_eq(cib_action, CRM_OP_CIB_ISMASTER) && rc != cib_ok) { crm_info("Local CIB is _not_ the master instance"); fprintf(stderr, "Local CIB is _not_ the master instance\n"); } else if(safe_str_eq(cib_action, CRM_OP_CIB_ISMASTER)) { crm_info("Local CIB _is_ the master instance\n"); fprintf(stderr, "Local CIB _is_ the master instance\n"); } else if(rc != 0) { crm_warn("Call %s failed (%d): %s\n", cib_action, rc, cib_error2string(rc)); fprintf(stderr, "Call %s failed (%d): %s\n", cib_action, rc, cib_error2string(rc)); fprintf(stdout, "%s\n", crm_str(admin_input_xml)); } else if(safe_str_eq(cib_action, CRM_OP_CIB_QUERY) && output==NULL) { crm_err("Output expected in query response"); crm_log_message(LOG_ERR, msg); } else if(output == NULL) { crm_info("Call passed"); } else { crm_info("Call passed"); fprintf(stdout, "%s\n", crm_str(admin_input_xml)); } crm_free(admin_input_xml); if(call_id == request_id) { g_main_quit(mainloop); } else { crm_info("Message was not the response we were looking for (%d vs. %d", call_id, request_id); } } diff --git a/crm/admin/crmadmin.c b/crm/admin/crmadmin.c index 4ad997eaed..dfafd5d73b 100644 --- a/crm/admin/crmadmin.c +++ b/crm/admin/crmadmin.c @@ -1,844 +1,844 @@ -/* $Id: crmadmin.c,v 1.29 2005/02/20 14:38:54 andrew Exp $ */ +/* $Id: crmadmin.c,v 1.30 2005/02/21 13:13:45 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include int message_timer_id = -1; int message_timeout_ms = 30*1000; GMainLoop *mainloop = NULL; IPC_Channel *crmd_channel = NULL; char *admin_uuid = NULL; void usage(const char *cmd, int exit_status); ll_cluster_t *do_init(void); int do_work(ll_cluster_t * hb_cluster); gboolean admin_msg_callback(IPC_Channel * source_data, void *private_data); char *pluralSection(const char *a_section); crm_data_t *handleCibMod(void); int do_find_resource(const char *rsc, crm_data_t *xml_node); int do_find_resource_list(crm_data_t *xml_node); int do_find_node_list(crm_data_t *xml_node); gboolean admin_message_timeout(gpointer data); gboolean is_node_online(crm_data_t *node_state); enum debug { debug_none, debug_dec, debug_inc }; gboolean BE_VERBOSE = FALSE; int expected_responses = 1; gboolean DO_HEALTH = FALSE; gboolean DO_RESET = FALSE; gboolean DO_RESOURCE = FALSE; gboolean DO_ELECT_DC = FALSE; gboolean DO_WHOIS_DC = FALSE; gboolean DO_NODE_LIST = FALSE; gboolean BE_SILENT = FALSE; gboolean DO_RESOURCE_LIST = FALSE; gboolean DO_OPTION = FALSE; enum debug DO_DEBUG = debug_none; const char *crmd_operation = NULL; crm_data_t *msg_options = NULL; const char *admin_verbose = XML_BOOLEAN_FALSE; char *id = NULL; char *this_msg_reference = NULL; char *disconnect = NULL; char *dest_node = NULL; char *rsc_name = NULL; char *crm_option = NULL; int operation_status = 0; const char *sys_to = NULL; const char *crm_system_name = "crmadmin"; #define OPTARGS "V?K:S:HE:DW:d:i:RNst:o:" int main(int argc, char **argv) { int option_index = 0; int argerr = 0; int flag; ll_cluster_t *hb_cluster = NULL; static struct option long_options[] = { /* Top-level Options */ {"verbose", 0, 0, 'V'}, {"help", 0, 0, '?'}, {"silent", 0, 0, 's'}, {"reference", 1, 0, 0}, {XML_ATTR_TIMEOUT, 1, 0, 't'}, /* daemon options */ {"kill", 1, 0, 'K'}, /* stop a node */ {"die", 0, 0, 0}, /* kill a node, no respawn */ {"crm_debug_inc", 1, 0, 'i'}, {"crm_debug_dec", 1, 0, 'd'}, {"status", 1, 0, 'S'}, {"health", 0, 0, 'H'}, {"election", 0, 0, 'E'}, {"dc_lookup", 0, 0, 'D'}, {"resources", 0, 0, 'R'}, {"nodes", 0, 0, 'N'}, {"whereis", 1, 0, 'W'}, {"option", 1, 0, 'o'}, {0, 0, 0, 0} }; crm_system_name = basename(argv[0]); crm_log_level = 0; crm_log_init(crm_system_name); crm_log_level = 0; if(argc < 2) { usage(crm_system_name, LSB_EXIT_EINVAL); } while (1) { flag = getopt_long(argc, argv, OPTARGS, long_options, &option_index); if (flag == -1) break; switch(flag) { case 0: printf("option %s", long_options[option_index].name); if (optarg) printf(" with arg %s", optarg); printf("\n"); if (strcmp("reference", long_options[option_index].name) == 0) { this_msg_reference = crm_strdup(optarg); } else if (strcmp("die", long_options[option_index].name) == 0) { DO_RESET = TRUE; crmd_operation = CRM_OP_DIE; } else { printf( "?? Long option (--%s) is not yet properly supported ??\n", long_options[option_index].name); ++argerr; } break; /* a sample test for multiple instance if (digit_optind != 0 && digit_optind != this_option_optind) printf ("digits occur in two different argv-elements.\n"); digit_optind = this_option_optind; printf ("option %c\n", c); */ case 'V': BE_VERBOSE = TRUE; admin_verbose = XML_BOOLEAN_TRUE; cl_log_enable_stderr(TRUE); alter_debug(DEBUG_INC); break; case 't': message_timeout_ms = atoi(optarg); if(message_timeout_ms < 1) { message_timeout_ms = 30*1000; } break; case '?': usage(crm_system_name, LSB_EXIT_OK); break; case 'D': DO_WHOIS_DC = TRUE; break; case 'W': DO_RESOURCE = TRUE; crm_verbose("Option %c => %s", flag, optarg); rsc_name = crm_strdup(optarg); break; case 'K': DO_RESET = TRUE; crm_verbose("Option %c => %s", flag, optarg); dest_node = crm_strdup(optarg); crmd_operation = CRM_OP_LOCAL_SHUTDOWN; break; case 'o': DO_OPTION = TRUE; crm_verbose("Option %c => %s", flag, optarg); crm_option = crm_strdup(optarg); break; case 's': BE_SILENT = TRUE; break; case 'i': DO_DEBUG = debug_inc; crm_verbose("Option %c => %s", flag, optarg); dest_node = crm_strdup(optarg); break; case 'd': DO_DEBUG = debug_dec; crm_verbose("Option %c => %s", flag, optarg); dest_node = crm_strdup(optarg); break; case 'S': DO_HEALTH = TRUE; crm_verbose("Option %c => %s", flag, optarg); dest_node = crm_strdup(optarg); break; case 'E': DO_ELECT_DC = TRUE; break; case 'N': DO_NODE_LIST = TRUE; break; case 'R': DO_RESOURCE_LIST = TRUE; break; case 'H': DO_HEALTH = TRUE; break; default: printf("Argument code 0%o (%c) is not (?yet?) supported\n", flag, flag); ++argerr; break; } } if (optind < argc) { printf("non-option ARGV-elements: "); while (optind < argc) printf("%s ", argv[optind++]); printf("\n"); } if (optind > argc) { ++argerr; } if (argerr) { usage(crm_system_name, LSB_EXIT_GENERIC); } hb_cluster = do_init(); if (hb_cluster != NULL) { int res = do_work(hb_cluster); if (res > 0) { /* wait for the reply by creating a mainloop and running it until * the callbacks are invoked... */ mainloop = g_main_new(FALSE); crm_verbose("%s waiting for reply from the local CRM", crm_system_name); message_timer_id = Gmain_timeout_add( message_timeout_ms, admin_message_timeout, NULL); g_main_run(mainloop); return_to_orig_privs(); } else if(res == 0) { crm_verbose("%s: no reply expected", crm_system_name); } else { crm_err("No message to send"); operation_status = -1; } } else { crm_err("Init failed, could not perform requested operations"); operation_status = -2; } crm_verbose("%s exiting normally", crm_system_name); return operation_status; } int do_work(ll_cluster_t * hb_cluster) { int ret = 1; /* construct the request */ crm_data_t *msg_data = NULL; gboolean all_is_good = TRUE; msg_options = create_xml_node(NULL, XML_TAG_OPTIONS); set_xml_property_copy(msg_options, XML_ATTR_VERBOSE, admin_verbose); set_xml_property_copy(msg_options, XML_ATTR_TIMEOUT, "0"); if (DO_HEALTH == TRUE) { crm_verbose("Querying the system"); sys_to = CRM_SYSTEM_DC; if (dest_node != NULL) { sys_to = CRM_SYSTEM_CRMD; crmd_operation = CRM_OP_PING; if (BE_VERBOSE) { expected_responses = -1;/* wait until timeout instead */ } set_xml_property_copy( msg_options, XML_ATTR_TIMEOUT, "0"); } else { crm_info("Cluster-wide health not available yet"); all_is_good = FALSE; } } else if(DO_ELECT_DC) { /* tell the local node to initiate an election */ sys_to = CRM_SYSTEM_CRMD; crmd_operation = CRM_OP_VOTE; set_xml_property_copy( msg_options, XML_ATTR_TIMEOUT, "0"); dest_node = NULL; ret = 0; /* no return message */ } else if(DO_WHOIS_DC) { sys_to = CRM_SYSTEM_DC; crmd_operation = CRM_OP_PING; set_xml_property_copy( msg_options, XML_ATTR_TIMEOUT, "0"); dest_node = NULL; } else if(DO_RESOURCE || DO_RESOURCE_LIST || DO_NODE_LIST || DO_OPTION){ cib_t * the_cib = cib_new(); crm_data_t *output = NULL; int call_options = cib_sync_call; enum cib_errors rc = the_cib->cmds->signon( - the_cib, cib_command); + the_cib, crm_system_name, cib_command); if(rc != cib_ok) { return -1; } else if(DO_RESOURCE) { output = get_cib_copy(the_cib); do_find_resource(rsc_name, output); } else if(DO_RESOURCE_LIST) { output = get_cib_copy(the_cib); do_find_resource_list(output); } else if(DO_NODE_LIST) { output = get_cib_copy(the_cib); do_find_node_list(output); } else if(DO_OPTION) { char *name = NULL; char *value = NULL; crm_data_t *xml_option = NULL; crm_data_t *fragment = NULL; if(decodeNVpair(crm_option, '=', &name, &value)==FALSE){ crm_err("%s needs to be of the form" " =", crm_option); return -1; } xml_option = create_xml_node(NULL, XML_CIB_TAG_NVPAIR); set_xml_property_copy( xml_option, XML_NVPAIR_ATTR_NAME, name); set_xml_property_copy( xml_option, XML_NVPAIR_ATTR_VALUE, value); fragment = create_cib_fragment(xml_option, NULL); free_xml(xml_option); crm_free(name); crm_free(value); rc = the_cib->cmds->modify( the_cib, XML_CIB_TAG_CRMCONFIG, fragment, NULL, call_options|cib_discard_reply); free_xml(fragment); } free_xml(output); the_cib->cmds->signoff(the_cib); return rc; } else if(DO_RESET) { /* tell dest_node to initiate the shutdown proceedure * * if dest_node is NULL, the request will be sent to the * local node */ sys_to = CRM_SYSTEM_CRMD; set_xml_property_copy( msg_options, XML_ATTR_TIMEOUT, "0"); ret = 0; /* no return message */ } else if(DO_DEBUG == debug_inc) { /* tell dest_node to increase its debug level * * if dest_node is NULL, the request will be sent to the * local node */ sys_to = CRM_SYSTEM_CRMD; crmd_operation = CRM_OP_DEBUG_UP; ret = 0; /* no return message */ } else if(DO_DEBUG == debug_dec) { /* tell dest_node to increase its debug level * * if dest_node is NULL, the request will be sent to the * local node */ sys_to = CRM_SYSTEM_CRMD; crmd_operation = CRM_OP_DEBUG_DOWN; ret = 0; /* no return message */ } else { crm_err("Unknown options"); all_is_good = FALSE; } if(all_is_good == FALSE) { crm_err("Creation of request failed. No message to send"); return -1; } /* send it */ if (crmd_channel == NULL) { crm_err("The IPC connection is not valid, cannot send anything"); return -1; } if(sys_to == NULL) { if (dest_node != NULL) sys_to = CRM_SYSTEM_CRMD; else sys_to = CRM_SYSTEM_DC; } { HA_Message *cmd = create_request( crmd_operation, msg_data, dest_node, sys_to, crm_system_name, admin_uuid); if(this_msg_reference != NULL) { ha_msg_mod(cmd, XML_ATTR_REFERENCE, this_msg_reference); } send_ipc_message(crmd_channel, cmd); } return ret; } ll_cluster_t * do_init(void) { int facility; ll_cluster_t *hb_cluster = NULL; #ifdef USE_LIBXML /* docs say only do this once, but in their code they do it every time! */ xmlInitParser (); #endif /* change the logging facility to the one used by heartbeat daemon */ hb_cluster = ll_cluster_new("heartbeat"); crm_verbose("Switching to Heartbeat logger"); if (( facility = hb_cluster->llc_ops->get_logfacility(hb_cluster)) > 0) { cl_log_set_facility(facility); } crm_malloc(admin_uuid, sizeof(char) * 11); if(admin_uuid != NULL) { snprintf(admin_uuid, 10, "%d", getpid()); admin_uuid[10] = '\0'; } init_client_ipc_comms( CRM_SYSTEM_CRMD, admin_msg_callback, NULL, &crmd_channel); if(crmd_channel != NULL) { send_hello_message( crmd_channel, admin_uuid, crm_system_name,"0", "1"); return hb_cluster; } return NULL; } gboolean admin_msg_callback(IPC_Channel * server, void *private_data) { int lpc = 0; IPC_Message *msg = NULL; ha_msg_input_t *new_input = NULL; gboolean hack_return_good = TRUE; static int received_responses = 0; char *filename; int filename_len = 0; const char *result = NULL; g_source_remove(message_timer_id); while (server->ch_status != IPC_DISCONNECT && server->ops->is_message_pending(server) == TRUE) { if(new_input != NULL) { delete_ha_msg_input(new_input); } if (server->ops->recv(server, &msg) != IPC_OK) { perror("Receive failure:"); return !hack_return_good; } if (msg == NULL) { crm_trace("No message this time"); continue; } lpc++; new_input = new_ipc_msg_input(msg); msg->msg_done(msg); crm_log_message(LOG_MSG, new_input->msg); if (new_input->xml == NULL) { crm_info( "XML in IPC message was not valid... " "discarding."); continue; } else if (validate_crm_message( new_input->msg, crm_system_name, admin_uuid, XML_ATTR_RESPONSE) == FALSE) { crm_info( "Message was not a CRM response. Discarding."); continue; } result = cl_get_string(new_input->msg, XML_ATTR_RESULT); if(result == NULL || strcmp(result, "ok") == 0) { result = "pass"; } else { result = "fail"; } received_responses++; if(DO_HEALTH) { const char *state = crm_element_value( new_input->xml, "crmd_state"); printf("Status of %s@%s: %s (%s)\n", crm_element_value(new_input->xml,XML_PING_ATTR_SYSFROM), cl_get_string(new_input->msg, F_CRM_HOST_FROM), state, crm_element_value(new_input->xml,XML_PING_ATTR_STATUS)); if(BE_SILENT && state != NULL) { fprintf(stderr, "%s\n", state); } } else if(DO_WHOIS_DC) { const char *dc = cl_get_string( new_input->msg, F_CRM_HOST_FROM); printf("Designated Controller is: %s\n", dc); if(BE_SILENT && dc != NULL) { fprintf(stderr, "%s\n", dc); } } if (this_msg_reference != NULL) { /* in testing mode... */ /* 31 = "test-_.xml" + an_int_as_string + '\0' */ filename_len = 31 + strlen(this_msg_reference); crm_malloc(filename, sizeof(char) * filename_len); if(filename != NULL) { sprintf(filename, "%s-%s_%d.xml", result, this_msg_reference, received_responses); filename[filename_len - 1] = '\0'; if (0 > write_xml_file(new_input->xml, filename)) { crm_crit("Could not save response to" " %s", filename); } } } } if (server->ch_status == IPC_DISCONNECT) { crm_verbose("admin_msg_callback: received HUP"); return !hack_return_good; } if (received_responses >= expected_responses) { crm_verbose( "Recieved expected number (%d) of messages from Heartbeat." " Exiting normally.", expected_responses); g_main_quit(mainloop); return !hack_return_good; } message_timer_id = Gmain_timeout_add( message_timeout_ms, admin_message_timeout, NULL); return hack_return_good; } gboolean admin_message_timeout(gpointer data) { fprintf(stderr, "No messages received in %d seconds.. aborting\n", (int)message_timeout_ms/1000); crm_err("No messages received in %d seconds", (int)message_timeout_ms/1000); g_main_quit(mainloop); return FALSE; } int do_find_resource(const char *rsc, crm_data_t *xml_node) { int found = 0; crm_data_t *nodestates = get_object_root(XML_CIB_TAG_STATUS, xml_node); const char *path2[] = { XML_CIB_TAG_LRM, XML_LRM_TAG_RESOURCES }; xml_child_iter( nodestates, a_node, XML_CIB_TAG_STATE, crm_data_t *rscstates = NULL; if(is_node_online(a_node) == FALSE) { crm_devel("Skipping offline node: %s", crm_element_value(a_node, XML_ATTR_ID)); continue; } rscstates = find_xml_node_nested(a_node, path2, DIMOF(path2)); xml_child_iter( rscstates, rsc_state, XML_LRM_TAG_RESOURCE, const char *id = crm_element_value(rsc_state,XML_ATTR_ID); const char *target = crm_element_value(rsc_state,XML_LRM_ATTR_TARGET); const char *last_op = crm_element_value(rsc_state,XML_LRM_ATTR_LASTOP); const char *op_code = crm_element_value(rsc_state,XML_LRM_ATTR_OPSTATUS); crm_devel("checking %s:%s for %s", target, id, rsc); if(safe_str_neq(rsc, id)){ crm_trace("no match"); continue; } if(safe_str_eq("stop", last_op)) { crm_devel("resource %s is stopped on: %s\n", rsc, target); } else if(safe_str_eq(op_code, "-1")) { crm_devel("resource %s is pending on: %s\n", rsc, target); } else if(safe_str_neq(op_code, "0")) { crm_devel("resource %s is failed on: %s\n", rsc, target); } else { crm_devel("resource %s is running on: %s\n", rsc, target); printf("resource %s is running on: %s\n", rsc, target); if(BE_SILENT) { fprintf(stderr, "%s ", target); } found++; } ); if(BE_SILENT) { fprintf(stderr, "\n"); } ); if(found == 0) { printf("resource %s is NOT running\n", rsc); } return found; } gboolean is_node_online(crm_data_t *node_state) { const char *uname = crm_element_value(node_state,XML_ATTR_UNAME); const char *join_state = crm_element_value(node_state,XML_CIB_ATTR_JOINSTATE); const char *crm_state = crm_element_value(node_state,XML_CIB_ATTR_CRMDSTATE); const char *ha_state = crm_element_value(node_state,XML_CIB_ATTR_HASTATE); const char *ccm_state = crm_element_value(node_state,XML_CIB_ATTR_INCCM); if(safe_str_eq(join_state, CRMD_JOINSTATE_MEMBER) && safe_str_eq(ha_state, ACTIVESTATUS) && safe_str_eq(ccm_state, XML_BOOLEAN_YES) && safe_str_eq(crm_state, ONLINESTATUS)) { crm_devel("Node %s is online", uname); return TRUE; } crm_devel("Node %s: %s %s %s", uname, join_state, ccm_state, crm_state); crm_devel("Node %s is offline", uname); return FALSE; } int do_find_resource_list(crm_data_t *xml_node) { int found = 0; crm_data_t *rscs = get_object_root(XML_CIB_TAG_RESOURCES, xml_node); xml_child_iter( rscs, rsc, XML_CIB_TAG_RESOURCE, printf("%s resource: %s (%s)\n", crm_element_value(rsc, "class"), crm_element_value(rsc, XML_ATTR_ID), crm_element_value(rsc, XML_ATTR_TYPE)); found++; ); if(found == 0) { printf("NO resources configured\n"); } return found; } int do_find_node_list(crm_data_t *xml_node) { int found = 0; crm_data_t *nodes = get_object_root(XML_CIB_TAG_NODES, xml_node); xml_child_iter( nodes, node, XML_CIB_TAG_NODE, printf("%s node: %s (%s)\n", crm_element_value(node, XML_ATTR_TYPE), crm_element_value(node, XML_ATTR_UNAME), crm_element_value(node, XML_ATTR_ID)); found++; ); if(found == 0) { printf("NO nodes configured\n"); } return found; } void usage(const char *cmd, int exit_status) { FILE *stream; stream = exit_status ? stderr : stdout; fprintf(stream, "usage: %s [-?vs] [command] [command args]\n", cmd); fprintf(stream, "Options\n"); fprintf(stream, "\t--%s (-%c)\t: " "turn on debug info. additional instances increase verbosity\n", "verbose", 'V'); fprintf(stream, "\t--%s (-%c)\t: be very very quiet\n", "silent", 's'); fprintf(stream, "\t--%s (-%c)\t: this help message\n", "help", '?'); fprintf(stream, "\nCommands\n"); fprintf(stream, "\t--%s (-%c) \t: " "increment the CRMd debug level on \n", CRM_OP_DEBUG_UP,'i'); fprintf(stream, "\t--%s (-%c) \t: " "decrement the CRMd debug level on \n", CRM_OP_DEBUG_DOWN,'d'); fprintf(stream, "\t--%s (-%c) \t: " "shutdown the CRMd on \n", "kill", 'K'); fprintf(stream, "\t--%s (-%c) \t: " "request the status of \n", "status", 'S'); fprintf(stream, "\t--%s (-%c)\t\t: " "request the status of all nodes\n", "health", 'H'); fprintf(stream, "\t--%s (-%c) \t: " "initiate an election from \n", "election", 'E'); fprintf(stream, "\t--%s (-%c)\t: " "request the uname of the DC\n", "dc_lookup", 'D'); fprintf(stream, "\t--%s (-%c)\t\t: " "request the uname of all member nodes\n", "nodes", 'N'); fprintf(stream, "\t--%s (-%c)\t: " "request the names of all resources\n", "resources", 'R'); fprintf(stream, "\t--%s (-%c) \t: " "request the location of \n", "whereis", 'W'); /* fprintf(stream, "\t--%s (-%c)\t\n", "disconnect", 'D'); */ fflush(stream); exit(exit_status); } diff --git a/crm/cib/callbacks.c b/crm/cib/callbacks.c index 33fac5e0ef..686f657cba 100644 --- a/crm/cib/callbacks.c +++ b/crm/cib/callbacks.c @@ -1,1027 +1,1050 @@ -/* $Id: callbacks.c,v 1.23 2005/02/19 18:11:03 andrew Exp $ */ +/* $Id: callbacks.c,v 1.24 2005/02/21 13:13:45 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include gint cib_GCompareFunc(gconstpointer a, gconstpointer b); gboolean cib_msg_timeout(gpointer data); void cib_GHFunc(gpointer key, gpointer value, gpointer user_data); GHashTable *peer_hash = NULL; int next_client_id = 0; gboolean cib_is_master = FALSE; gboolean cib_have_quorum = FALSE; GHashTable *client_list = NULL; extern const char *cib_our_uname; extern ll_cluster_t *hb_conn; /* technically bump does modify the cib... * but we want to split the "bump" from the "sync" */ cib_operation_t cib_server_ops[] = { {NULL, FALSE, FALSE, FALSE, FALSE, cib_process_default}, {CRM_OP_NOOP, FALSE, FALSE, FALSE, FALSE, cib_process_default}, {CRM_OP_RETRIVE_CIB, FALSE, FALSE, FALSE, FALSE, cib_process_query}, {CRM_OP_CIB_SLAVE, FALSE, TRUE, FALSE, FALSE, cib_process_readwrite}, {CRM_OP_CIB_SLAVEALL,TRUE, TRUE, FALSE, FALSE, cib_process_readwrite}, {CRM_OP_CIB_MASTER, FALSE, TRUE, FALSE, FALSE, cib_process_readwrite}, {CRM_OP_CIB_ISMASTER,FALSE, TRUE, FALSE, FALSE, cib_process_readwrite}, {CRM_OP_CIB_BUMP, FALSE, TRUE, TRUE, FALSE, cib_process_bump}, {CRM_OP_CIB_REPLACE, TRUE, TRUE, TRUE, TRUE, cib_process_replace}, {CRM_OP_CIB_CREATE, TRUE, TRUE, TRUE, TRUE, cib_process_modify}, {CRM_OP_CIB_UPDATE, TRUE, TRUE, TRUE, TRUE, cib_process_modify}, {CRM_OP_JOINACK, TRUE, TRUE, TRUE, TRUE, cib_process_modify}, {CRM_OP_SHUTDOWN_REQ,TRUE, TRUE, TRUE, TRUE, cib_process_modify}, {CRM_OP_CIB_DELETE, TRUE, TRUE, TRUE, TRUE, cib_process_modify}, {CRM_OP_CIB_QUERY, FALSE, FALSE, TRUE, FALSE, cib_process_query}, {CRM_OP_QUIT, FALSE, TRUE, FALSE, FALSE, cib_process_quit}, {CRM_OP_PING, FALSE, FALSE, FALSE, FALSE, cib_process_ping}, {CRM_OP_CIB_ERASE, TRUE, TRUE, TRUE, FALSE, cib_process_erase} }; int send_via_callback_channel(HA_Message *msg, const char *token); enum cib_errors cib_process_command( const HA_Message *request, HA_Message **reply, gboolean privileged); gboolean cib_common_callback( IPC_Channel *channel, gpointer user_data, gboolean privileged); enum cib_errors cib_get_operation_id(const HA_Message * msg, int *operation); gboolean cib_process_disconnect(IPC_Channel *channel, cib_client_t *cib_client); gboolean cib_client_connect(IPC_Channel *channel, gpointer user_data) { gboolean auth_failed = FALSE; gboolean can_connect = TRUE; gboolean (*client_callback)(IPC_Channel *channel, gpointer user_data) = NULL; cib_client_t *new_client = NULL; crm_devel("Connecting channel"); if (channel == NULL) { crm_err("Channel was NULL"); can_connect = FALSE; } else if (channel->ch_status == IPC_DISCONNECT) { crm_err("Channel was disconnected"); can_connect = FALSE; } else if(user_data == NULL) { crm_err("user_data must contain channel name"); can_connect = FALSE; } else { crm_malloc(new_client, sizeof(cib_client_t)); new_client->id = NULL; new_client->callback_id = NULL; new_client->source = NULL; new_client->channel = channel; new_client->channel_name = user_data; new_client->delegated_calls = NULL; crm_devel("Created channel %p for channel %s", new_client, new_client->channel_name); client_callback = NULL; /* choose callback and do auth based on channel_name */ if(safe_str_eq(new_client->channel_name, cib_channel_callback)) { client_callback = cib_null_callback; } else { uuid_t client_id; uuid_generate(client_id); crm_malloc(new_client->id, sizeof(char)*36); uuid_unparse(client_id, new_client->id); new_client->id[35] = EOS; uuid_generate(client_id); crm_malloc(new_client->callback_id, sizeof(char)*36); uuid_unparse(client_id, new_client->callback_id); new_client->callback_id[35] = EOS; client_callback = cib_ro_callback; if(safe_str_eq(new_client->channel_name, cib_channel_rw)) { client_callback = cib_rw_callback; } } } if(auth_failed) { crm_err("Connection to %s channel failed authentication", (char *)user_data); can_connect = FALSE; } if(can_connect == FALSE) { if(new_client) { crm_free(new_client->id); crm_free(new_client->callback_id); } crm_free(new_client); return FALSE; } channel->ops->set_recv_qlen(channel, 100); channel->ops->set_send_qlen(channel, 100); if(client_callback != NULL) { new_client->source = G_main_add_IPC_Channel( G_PRIORITY_LOW, channel, FALSE, client_callback, new_client, default_ipc_connection_destroy); } if(client_callback != cib_null_callback) { /* send msg to client with uuid to use when signing up for * callback channel */ HA_Message *reg_msg = ha_msg_new(3); ha_msg_add(reg_msg, F_CIB_OPERATION, CRM_OP_REGISTER); ha_msg_add(reg_msg, F_CIB_CLIENTID, new_client->id); ha_msg_add( reg_msg, F_CIB_CALLBACK_TOKEN, new_client->callback_id); msg2ipcchan(reg_msg, channel); crm_msg_del(reg_msg); /* make sure we can find ourselves later for sync calls * redirected to the master instance */ g_hash_table_insert(client_list, new_client->id, new_client); } crm_devel("Channel %s connected for client %s", new_client->channel_name, new_client->id); return TRUE; } gboolean cib_rw_callback(IPC_Channel *channel, gpointer user_data) { return cib_common_callback(channel, user_data, TRUE); } gboolean cib_ro_callback(IPC_Channel *channel, gpointer user_data) { return cib_common_callback(channel, user_data, FALSE); } gboolean cib_null_callback(IPC_Channel *channel, gpointer user_data) { gboolean did_disconnect = TRUE; HA_Message *op_request = NULL; cib_client_t *cib_client = user_data; cib_client_t *hash_client = NULL; const char *type = NULL; const char *uuid_ticket = NULL; + const char *client_name = NULL; + gboolean register_failed = FALSE; if(cib_client == NULL) { crm_err("Discarding IPC message from unknown source" " on callback channel."); return FALSE; } while(channel->ops->is_message_pending(channel)) { if (channel->ch_status == IPC_DISCONNECT) { /* The message which was pending for us is that * the IPC status is now IPC_DISCONNECT */ break; } op_request = msgfromIPC_noauth(channel); type = cl_get_string(op_request, F_CIB_OPERATION); if(safe_str_neq(type, CRM_OP_REGISTER) ) { crm_warn("Discarding IPC message from %s on callback channel", cib_client->id); crm_msg_del(op_request); continue; } - + uuid_ticket = cl_get_string(op_request, F_CIB_CALLBACK_TOKEN); - hash_client = g_hash_table_lookup(client_list, uuid_ticket); + client_name = cl_get_string(op_request, F_CIB_CLIENTNAME); - if(hash_client != NULL) { - crm_err("Duplicate registration request... disconnecting"); + CRM_DEV_ASSERT(uuid_ticket != NULL); + if(crm_assert_failed) { + register_failed = crm_assert_failed; + } + + CRM_DEV_ASSERT(client_name != NULL); + if(crm_assert_failed) { + register_failed = crm_assert_failed; + } + + if(register_failed == FALSE) { + hash_client = g_hash_table_lookup(client_list, uuid_ticket); + if(hash_client != NULL) { + crm_err("Duplicate registration request..." + " disconnecting"); + register_failed = TRUE; + } + } + + if(register_failed) { + crm_err("Registration request failed... disconnecting"); crm_msg_del(op_request); return FALSE; } + cib_client->id = crm_strdup(uuid_ticket); + cib_client->name = crm_strdup(client_name); - cib_client->id = crm_strdup(uuid_ticket); g_hash_table_insert(client_list, cib_client->id, cib_client); crm_info("Registered %s on %s channel", cib_client->id, cib_client->channel_name); crm_msg_del(op_request); op_request = ha_msg_new(2); ha_msg_add(op_request, F_CIB_OPERATION, CRM_OP_REGISTER); ha_msg_add(op_request, F_CIB_CLIENTID, cib_client->id); msg2ipcchan(op_request, channel); crm_msg_del(op_request); } did_disconnect = cib_process_disconnect(channel, cib_client); if(did_disconnect) { crm_info("Client disconnected"); } return did_disconnect; } gboolean cib_common_callback( IPC_Channel *channel, gpointer user_data, gboolean privileged) { int rc = cib_ok; int lpc = 0; int call_type = 0; int call_options = 0; const char *op = NULL; const char *host = NULL; HA_Message *op_request = NULL; HA_Message *op_reply = NULL; gboolean needs_processing = FALSE; cib_client_t *cib_client = user_data; if(cib_client == NULL) { crm_err("Receieved call from unknown source. Discarding."); return FALSE; } crm_verbose("Callback for %s on %s channel", cib_client->id, cib_client->channel_name); while(channel->ops->is_message_pending(channel)) { if (channel->ch_status == IPC_DISCONNECT) { /* The message which was pending for us is that * the IPC status is now IPC_DISCONNECT */ break; } op_request = msgfromIPC(channel); if (op_request == NULL) { perror("Receive failure:"); break; } crm_verbose("Processing IPC message from %s on %s channel", cib_client->id, cib_client->channel_name); crm_log_message(LOG_MSG, op_request); crm_log_message_adv(LOG_DEV, "cib.client-in.log", op_request); lpc++; rc = cib_ok; if(HA_OK != ha_msg_add( op_request, F_CIB_CLIENTID, cib_client->id)) { crm_err("Couldnt add F_CIB_CLIENTID to message"); rc = cib_msg_field_add; } if(rc == cib_ok) { ha_msg_value_int( op_request, F_CIB_CALLOPTS, &call_options); crm_devel("Call options: %.8lx", (long)call_options); host = cl_get_string(op_request, F_CIB_HOST); op = cl_get_string(op_request, F_CIB_OPERATION); rc = cib_get_operation_id(op_request, &call_type); } if(rc == cib_ok && cib_server_ops[call_type].needs_privileges && privileged == FALSE) { rc = cib_not_authorized; } needs_processing = FALSE; if(rc != cib_ok) { /* TODO: construct error reply */ crm_err("Pre-processing of command failed: %s", cib_error2string(rc)); } else if(host == NULL && cib_is_master && !(call_options & cib_scope_local)) { crm_devel("Processing master %s op locally", op); needs_processing = TRUE; } else if( (host == NULL && (call_options & cib_scope_local)) || safe_str_eq(host, cib_our_uname)) { crm_devel("Processing %s op locally", op); needs_processing = TRUE; } else { /* send via HA to other nodes */ ha_msg_add(op_request, F_CIB_DELEGATED, cib_our_uname); crm_log_message(LOG_MSG, op_request); if(host != NULL) { crm_devel("Forwarding %s op to %s", op, host); hb_conn->llc_ops->send_ordered_nodemsg( hb_conn, op_request, host); } else { crm_info("Forwarding %s op to master instance", op); hb_conn->llc_ops->sendclustermsg( hb_conn, op_request); } if(call_options & cib_discard_reply) { crm_trace("Client not interested in reply"); } else if(call_options & cib_sync_call) { /* keep track of the request so we can time it * out if required */ HA_Message *saved = ha_msg_copy(op_request); crm_devel("Registering delegated call from %s", cib_client->id); cib_client->delegated_calls = g_list_append( cib_client->delegated_calls, saved); } crm_msg_del(op_request); op_request = NULL; continue; } if(needs_processing) { crm_verbose("Processing %s op", op); rc = cib_process_command( op_request, &op_reply, privileged); crm_devel("Performing local processing: op=%s origin=%s/%s,%s (update=%s)", op, cib_our_uname, cib_client->id, cl_get_string(op_request, F_CIB_CALLID), (rc==cib_ok && cib_server_ops[call_type].modifies_cib)?"true":"false"); crm_devel("Processing complete"); } crm_devel("processing response cases"); if(rc != cib_ok) { crm_err("Input message"); crm_log_message(LOG_ERR, op_request); crm_err("Output message"); crm_log_message(LOG_ERR, op_reply); - crm_log_message_adv(LOG_ERR, DEVEL_DIR"/cib.out.log", op_reply); + crm_log_message_adv(LOG_ERR, "cib.out.log", op_reply); } if(op_reply == NULL) { crm_trace("No reply is required for op %s", crm_str(op)); } else if(call_options & cib_sync_call) { crm_devel("Sending sync reply to %s op", crm_str(op)); crm_log_message(LOG_MSG, op_reply); if(msg2ipcchan(op_reply, channel) != HA_OK) { crm_err("Sync reply failed: %s", cib_error2string(cib_reply_failed)); if(rc == cib_ok) { rc = cib_reply_failed; } } } else { enum cib_errors local_rc = cib_ok; /* send reply via client's callback channel */ crm_devel("Sending async reply %p to %s op", op_reply, crm_str(op)); crm_log_message(LOG_MSG, op_reply); local_rc = send_via_callback_channel( op_reply, cib_client->callback_id); if(local_rc != cib_ok) { crm_err("ASync reply failed: %s", cib_error2string(local_rc)); if(rc != cib_ok) { local_rc = cib_reply_failed; } } } crm_devel("Cleaning up reply"); crm_msg_del(op_reply); op_reply = NULL; crm_devel("Processing forward cases"); if(rc == cib_ok && cib_server_ops[call_type].modifies_cib && !(call_options & cib_scope_local)) { /* send via HA to other nodes */ crm_info("Forwarding %s op to all instances", op); ha_msg_add(op_request, F_CIB_GLOBAL_UPDATE, XML_BOOLEAN_TRUE); cl_log_message(LOG_DEV, op_request); CRM_DEV_ASSERT(hb_conn->llc_ops->sendclustermsg( hb_conn, op_request) == HA_OK); } else { if(call_options & cib_scope_local ) { crm_devel("Request not broadcast : local scope"); } if(cib_server_ops[call_type].modifies_cib == FALSE) { crm_devel("Request not broadcast : R/O call"); } if(rc != cib_ok) { crm_devel("Request not broadcast : call failed : %s", cib_error2string(rc)); } } crm_devel("Cleaning up request"); crm_msg_del(op_request); op_request = NULL; } crm_verbose("Processed %d messages", lpc); return cib_process_disconnect(channel, cib_client); } enum cib_errors cib_process_command( const HA_Message *request, HA_Message **reply, gboolean privileged) { crm_data_t *output = NULL; crm_data_t *input = NULL; int call_type = 0; int call_options = 0; enum cib_errors rc = cib_ok; const char *op = NULL; const char *call_id = NULL; const char *section = NULL; const char *tmp = NULL; if(reply) { *reply = NULL; } /* Start processing the request... */ op = cl_get_string(request, F_CIB_OPERATION); call_id = cl_get_string(request, F_CIB_CALLID); ha_msg_value_int(request, F_CIB_CALLOPTS, &call_options); crm_trace("Processing call id: %s", call_id); rc = cib_get_operation_id(request, &call_type); if(rc == cib_ok && cib_server_ops[call_type].needs_privileges && privileged == FALSE) { /* abort */ rc = cib_not_authorized; } if(rc == cib_ok && cib_server_ops[call_type].needs_section) { section = cl_get_string(request, F_CIB_SECTION); crm_trace("Unpacked section as: %s", section); } if(rc == cib_ok && cib_server_ops[call_type].needs_data) { crm_trace("Unpacking data in %s", F_CIB_CALLDATA); input = get_message_xml(request, F_CIB_CALLDATA); } if(rc == cib_ok) { rc = cib_server_ops[call_type].fn( op, call_options, section, input, &output); } crm_devel("Processing reply cases"); if((call_options & cib_discard_reply) || reply == NULL) { return rc; } crm_devel("Creating the reply"); /* make the basic reply */ *reply = ha_msg_new(8); ha_msg_add(*reply, F_TYPE, T_CIB); ha_msg_add(*reply, F_CIB_OPERATION, op); ha_msg_add(*reply, F_CIB_CALLID, call_id); ha_msg_add_int(*reply, F_CIB_RC, rc); tmp = cl_get_string(request, F_CIB_CLIENTID); ha_msg_add(*reply, F_CIB_CLIENTID, tmp); tmp = cl_get_string(request, F_CIB_CALLOPTS); ha_msg_add(*reply, F_CIB_CALLOPTS, tmp); /* attach the output if necessary */ if(output != NULL) { add_message_xml(*reply, F_CIB_CALLDATA, output); } crm_devel("Cleaning up"); free_xml(output); free_xml(input); return rc; } int send_via_callback_channel(HA_Message *msg, const char *token) { cib_client_t *hash_client = NULL; GList *list_item = NULL; crm_devel("Delivering msg %p to client %s", msg, token); if(msg == NULL) { crm_err("No message to send"); return cib_reply_failed; } else if(token == NULL) { crm_err("No client id token, cant send message"); return cib_missing; } hash_client = g_hash_table_lookup(client_list, token); if(hash_client == NULL) { crm_err("Cannot find client for token %s", token); return cib_client_gone; } else if(hash_client->channel == NULL) { crm_err("Cannot find channel for client %s", token); return cib_client_corrupt; } list_item = g_list_find_custom( hash_client->delegated_calls, msg, cib_GCompareFunc); if(list_item != NULL) { /* remove it - no need to time it out */ HA_Message *orig_msg = list_item->data; crm_devel("Removing msg from delegated list"); hash_client->delegated_calls = g_list_remove( hash_client->delegated_calls, orig_msg); CRM_DEV_ASSERT(orig_msg != msg); crm_msg_del(orig_msg); } crm_devel("Delivering reply to client %s", token); cl_log_message(LOG_DEV, msg); if(msg2ipcchan(msg, hash_client->channel) != HA_OK) { crm_err("Delivery of reply to client %s failed", token); return cib_reply_failed; } return cib_ok; } gint cib_GCompareFunc(gconstpointer a, gconstpointer b) { const HA_Message *a_msg = a; const HA_Message *b_msg = b; int msg_a_id = 0; int msg_b_id = 0; ha_msg_value_int(a_msg, F_CIB_CALLID, &msg_a_id); ha_msg_value_int(b_msg, F_CIB_CALLID, &msg_b_id); if(msg_a_id == msg_b_id) { return 0; } else if(msg_a_id < msg_b_id) { return -1; } return 1; } gboolean cib_msg_timeout(gpointer data) { crm_trace("Checking if any clients have timed out messages"); g_hash_table_foreach(client_list, cib_GHFunc, NULL); return TRUE; } void cib_GHFunc(gpointer key, gpointer value, gpointer user_data) { cib_client_t *client = value; GListPtr list = client->delegated_calls; HA_Message *msg = NULL; while(list != NULL) { int seen = 0; int timeout = 5; /* 1 iteration == 1 seconds */ HA_Message *reply = NULL; msg = list->data; ha_msg_value_int(msg, F_CIB_SEENCOUNT, &seen); ha_msg_value_int(msg, F_CIB_TIMEOUT, &timeout); crm_trace("Timeout %d, seen %d", timeout, seen); if(timeout > 0 && seen < timeout) { int seen2 = 0; crm_trace("Updating seen count for msg from client %s", client->id); seen++; ha_msg_mod_int(msg, F_CIB_SEENCOUNT, seen); ha_msg_value_int(msg, F_CIB_SEENCOUNT, &seen2); list = list->next; continue; } crm_warn("Sending operation timeout msg to client %s", client->id); reply = ha_msg_new(4); ha_msg_add(reply, F_TYPE, T_CIB); ha_msg_add(reply, F_CIB_OPERATION, cl_get_string(msg, F_CIB_OPERATION)); ha_msg_add(reply, F_CIB_CALLID, cl_get_string(msg, F_CIB_CALLID)); ha_msg_add_int(reply, F_CIB_RC, cib_master_timeout); msg2ipcchan(reply, client->channel); list = list->next; client->delegated_calls = g_list_remove( client->delegated_calls, msg); crm_msg_del(reply); crm_msg_del(msg); } } gboolean cib_process_disconnect(IPC_Channel *channel, cib_client_t *cib_client) { if (channel->ch_status == IPC_DISCONNECT && cib_client != NULL) { crm_info("Cleaning up after %s channel disconnect from client (%p) %s", cib_client->channel_name, cib_client, crm_str(cib_client->id)); if(cib_client->id != NULL) { g_hash_table_remove(client_list, cib_client->id); } if(cib_client->source != NULL) { crm_devel("deleting the IPC Channel"); G_main_del_IPC_Channel(cib_client->source); cib_client->source = NULL; } crm_devel("Freeing the cib client %s", crm_str(cib_client->id)); #if 0 /* todo - put this back in once i recheck its safe */ crm_free(cib_client->callback_id); + crm_free(cib_client->name); crm_free(cib_client->id); #endif crm_free(cib_client); crm_devel("Freed the cib client"); return FALSE; } else if (channel->ch_status == IPC_DISCONNECT) { crm_warn("Unknown client disconnected"); return FALSE; } return TRUE; } gboolean cib_ha_dispatch(IPC_Channel *channel, gpointer user_data) { int lpc = 0; ll_cluster_t *hb_cluster = (ll_cluster_t*)user_data; while(hb_cluster->llc_ops->msgready(hb_cluster)) { lpc++; /* invoke the callbacks but dont block */ hb_cluster->llc_ops->rcvmsg(hb_cluster, 0); } crm_trace("%d HA messages dispatched", lpc); if (channel && (channel->ch_status == IPC_DISCONNECT)) { crm_crit("Lost connection to heartbeat service... exiting"); exit(100); return FALSE; } return TRUE; } void cib_peer_callback(const HA_Message * msg, void* private_data) { int is_done = 1; int call_type = 0; int call_options = 0; gboolean process = TRUE; gboolean needs_reply = TRUE; gboolean local_notify = FALSE; enum cib_errors rc = cib_ok; HA_Message *op_reply = NULL; const char *originator = cl_get_string(msg, F_ORIG); const char *request_to = cl_get_string(msg, F_CIB_HOST); const char *reply_to = cl_get_string(msg, F_CIB_ISREPLY); const char *update = cl_get_string(msg, F_CIB_GLOBAL_UPDATE); const char *delegated = cl_get_string(msg, F_CIB_DELEGATED); const char *client_id = NULL; if(safe_str_eq(originator, cib_our_uname)) { crm_devel("Discarding message %s from ourselves", cl_get_string(msg, F_SEQ)); return; } if(cib_get_operation_id(msg, &call_type) != cib_ok) { crm_err("Invalid operation... discarding msg %s", cl_get_string(msg, F_SEQ)); return; } crm_trace("%s Processing msg %s", cib_our_uname, cl_get_string(msg, F_SEQ)); if(request_to != NULL && strlen(request_to) == 0) { request_to = NULL; } if(cib_server_ops[call_type].modifies_cib || (reply_to == NULL && cib_is_master) || request_to != NULL) { is_done = 0; } crm_info("Processing message from peer to %s...", request_to); crm_log_message(LOG_DEV, msg); if(safe_str_eq(update, XML_BOOLEAN_TRUE) && safe_str_eq(reply_to, cib_our_uname)) { crm_devel("Processing global update that originated from us"); needs_reply = FALSE; local_notify = TRUE; } else if(safe_str_eq(update, XML_BOOLEAN_TRUE)) { crm_devel("Processing global update"); needs_reply = FALSE; } else if(request_to != NULL && safe_str_eq(request_to, cib_our_uname)) { crm_devel("Processing request sent to us"); } else if(delegated != NULL && cib_is_master == TRUE) { crm_devel("Processing request sent to master instance"); } else if(reply_to != NULL && safe_str_eq(reply_to, cib_our_uname)) { crm_devel("Forward reply sent from %s to local clients", originator); process = FALSE; needs_reply = FALSE; local_notify = TRUE; } else if(delegated != NULL) { crm_devel("Ignoring msg for master instance"); return; } else if(request_to != NULL) { /* this is for a specific instance and we're not it */ crm_devel("Ignoring msg for instance on %s", crm_str(request_to)); return; } else if(reply_to == NULL && cib_is_master == FALSE) { /* this is for the master instance and we're not it */ crm_devel("Ignoring reply to %s", crm_str(reply_to)); return; } else { crm_warn("Nothing for us to do?"); return; } crm_devel("Finished determining processing actions"); ha_msg_value_int(msg, F_CIB_CALLOPTS, &call_options); crm_trace("Retrieved call options: %d", call_options); if(process) { crm_devel("Performing local processing: op=%s origin=%s/%s,%s (update=%s)", cl_get_string(msg, F_CIB_OPERATION), originator, cl_get_string(msg, F_CIB_CLIENTID), cl_get_string(msg, F_CIB_CALLID), update); rc = cib_process_command(msg, &op_reply, TRUE); } if(local_notify) { /* send callback to originating child */ cib_client_t *client_obj = NULL; HA_Message *client_reply = NULL; crm_trace("find the client"); if(process == FALSE) { client_reply = ha_msg_copy(msg); } else { client_reply = ha_msg_copy(op_reply); } client_id = cl_get_string(msg, F_CIB_CLIENTID); if(client_id != NULL) { client_obj = g_hash_table_lookup( client_list, client_id); } else { crm_err("No client to sent the response to." " F_CIB_CLIENTID not set."); } crm_devel("Sending callback to originator of delegated request"); if(client_obj != NULL) { if(is_done == 0) { crm_devel("Sending local modify response"); } else { crm_devel("Sending master response"); } if(call_options & cib_sync_call) { crm_devel("Sending sync response: %d", call_options); send_via_callback_channel( client_reply, client_obj->id); } else { crm_devel("Sending async response"); send_via_callback_channel( client_reply, client_obj->callback_id); } } else { crm_warn("Client %s may have left us", crm_str(client_id)); } crm_msg_del(client_reply); } if(needs_reply == FALSE) { /* nothing more to do... * this was a non-originating slave update */ crm_devel("Completed slave update"); crm_msg_del(op_reply); return; } crm_trace("add the originator to message"); /* from now on we are the server */ if(rc == cib_ok && cib_server_ops[call_type].modifies_cib && !(call_options & cib_scope_local)) { /* this (successful) call modified the CIB _and_ the * change needs to be broadcast... * send via HA to other nodes */ HA_Message *op_bcast = ha_msg_copy(msg); crm_devel("Sending update request to everyone"); ha_msg_add(op_bcast, F_CIB_ISREPLY, originator); ha_msg_add(op_bcast, F_CIB_GLOBAL_UPDATE, XML_BOOLEAN_TRUE); crm_log_message(LOG_DEV, op_bcast); hb_conn->llc_ops->sendclustermsg(hb_conn, op_bcast); crm_msg_del(op_bcast); } else { /* send reply via HA to originating node */ crm_devel("Sending request result to originator only"); ha_msg_add(op_reply, F_CIB_ISREPLY, originator); crm_log_message(LOG_DEV, op_reply); hb_conn->llc_ops->send_ordered_nodemsg( hb_conn, op_reply, originator); } crm_msg_del(op_reply); return; } enum cib_errors cib_get_operation_id(const HA_Message * msg, int *operation) { int lpc = 0; int max_msg_types = DIMOF(cib_server_ops); const char *op = cl_get_string(msg, F_CIB_OPERATION); for (lpc = 0; lpc < max_msg_types; lpc++) { if (safe_str_eq(op, cib_server_ops[lpc].operation)) { *operation = lpc; return cib_ok; } } crm_err("Operation %s is not valid", op); *operation = -1; return cib_operation; } void cib_client_status_callback(const char * node, const char * client, const char * status, void * private) { crm_notice("Status update: Client %s/%s now has status [%s]\n", node, client, status); g_hash_table_replace(peer_hash, crm_strdup(node), crm_strdup(status)); return; } gboolean cib_ccm_dispatch(int fd, gpointer user_data) { int rc = 0; oc_ev_t *ccm_token = (oc_ev_t*)user_data; crm_devel("received callback"); rc = oc_ev_handle_event(ccm_token); if(0 == rc) { return TRUE; } else { crm_err("CCM connection appears to have failed: rc=%d.", rc); return FALSE; } } void cib_ccm_msg_callback( oc_ed_t event, void *cookie, size_t size, const void *data) { crm_devel("received callback"); crm_info("event=%s", event==OC_EV_MS_NEW_MEMBERSHIP?"NEW MEMBERSHIP": event==OC_EV_MS_NOT_PRIMARY?"NOT PRIMARY": event==OC_EV_MS_PRIMARY_RESTORED?"PRIMARY RESTORED": event==OC_EV_MS_EVICTED?"EVICTED": "NO QUORUM MEMBERSHIP"); if(event==OC_EV_MS_NEW_MEMBERSHIP || event==OC_EV_MS_NOT_PRIMARY || event==OC_EV_MS_PRIMARY_RESTORED) { cib_have_quorum = TRUE; } else { cib_have_quorum = FALSE; } oc_ev_callback_done(cookie); return; } diff --git a/crm/cib/callbacks.h b/crm/cib/callbacks.h index 1b4bd12906..c8e11a5b92 100644 --- a/crm/cib/callbacks.h +++ b/crm/cib/callbacks.h @@ -1,79 +1,80 @@ -/* $Id: callbacks.h,v 1.5 2005/01/26 13:30:55 andrew Exp $ */ +/* $Id: callbacks.h,v 1.6 2005/02/21 13:13:45 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include /* #include */ #include #include #include extern gboolean cib_is_master; extern gboolean cib_have_quorum; extern GHashTable *client_list; extern GHashTable *peer_hash; typedef struct cib_client_s { char *id; + char *name; char *callback_id; const char *channel_name; IPC_Channel *channel; GCHSource *source; GList *delegated_calls; } cib_client_t; typedef struct cib_operation_s { const char* operation; gboolean modifies_cib; gboolean needs_privileges; gboolean needs_section; gboolean needs_data; enum cib_errors (*fn)( const char *, int, const char *, crm_data_t*, crm_data_t**); } cib_operation_t; extern cib_operation_t cib_server_ops[]; extern gboolean cib_client_connect(IPC_Channel *channel, gpointer user_data); extern gboolean cib_null_callback (IPC_Channel *channel, gpointer user_data); extern gboolean cib_rw_callback (IPC_Channel *channel, gpointer user_data); extern gboolean cib_ro_callback (IPC_Channel *channel, gpointer user_data); extern gboolean cib_ha_dispatch (IPC_Channel *channel, gpointer user_data); extern void cib_peer_callback(const HA_Message * msg, void* private_data); extern void cib_client_status_callback(const char * node, const char * client, const char * status, void * private); extern gboolean cib_ccm_dispatch(int fd, gpointer user_data); extern void cib_ccm_msg_callback( oc_ed_t event, void *cookie, size_t size, const void *data); diff --git a/crm/cib/cibmon.c b/crm/cib/cibmon.c index 14f99314ee..26d8bb213b 100644 --- a/crm/cib/cibmon.c +++ b/crm/cib/cibmon.c @@ -1,459 +1,460 @@ -/* $Id: cibmon.c,v 1.14 2005/02/19 18:11:03 andrew Exp $ */ +/* $Id: cibmon.c,v 1.15 2005/02/21 13:13:45 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* someone complaining about _ha_msg_mod not being found */ #include #define UPDATE_PREFIX "cib.updates:" int exit_code = cib_ok; GMainLoop *mainloop = NULL; const char *crm_system_name = "cibmon"; void usage(const char *cmd, int exit_status); void cib_connection_destroy(gpointer user_data); void cibmon_pre_notify(const char *event, HA_Message *msg); void cibmon_post_notify(const char *event, HA_Message *msg); void cibmon_update_confirm(const char *event, HA_Message *msg); cib_t *the_cib = NULL; #define OPTARGS "V?pPUam:i" gboolean intermediate_changes = FALSE; gboolean pre_notify = FALSE; gboolean post_notify = FALSE; gboolean update_notify = FALSE; int max_failures = 30; int main(int argc, char **argv) { int option_index = 0; int argerr = 0; int flag; int level = 0; int attempts = 0; static struct option long_options[] = { /* Top-level Options */ {"verbose", 0, 0, 'V'}, {"help", 0, 0, '?'}, {"pre", 0, 0, 'p'}, {"post", 0, 0, 'P'}, {"update", 0, 0, 'U'}, {"all", 0, 0, 'a'}, {"intermediate", 0, 0, 'i'}, {"max-conn-fail",1, 0, 'm'}, {0, 0, 0, 0} }; crm_log_init(crm_system_name); cl_set_corerootdir(HA_COREDIR); cl_cdtocoredir(); #ifdef USE_LIBXML /* docs say only do this once, but in their code they do it every time! */ xmlInitParser(); #endif while (1) { flag = getopt_long(argc, argv, OPTARGS, long_options, &option_index); if (flag == -1) break; switch(flag) { case 0: printf("option %s", long_options[option_index].name); if (optarg) { printf(" with arg %s", optarg); } printf("\n"); printf("Long option (--%s) is not" " (yet?) properly supported\n", long_options[option_index].name); ++argerr; break; case 'V': level = get_crm_log_level(); cl_log_enable_stderr(TRUE); set_crm_log_level(level+1); break; case '?': usage(crm_system_name, LSB_EXIT_OK); break; case 'm': max_failures = crm_atoi(optarg, "30"); break; case 'a': pre_notify = TRUE; post_notify = TRUE; update_notify = TRUE; break; case 'p': pre_notify = TRUE; break; case 'P': post_notify = TRUE; break; case 'U': update_notify = TRUE; break; case 'i': intermediate_changes = TRUE; break; default: printf("Argument code 0%o (%c)" " is not (?yet?) supported\n", flag, flag); ++argerr; break; } } if (optind < argc) { printf("non-option ARGV-elements: "); while (optind < argc) printf("%s ", argv[optind++]); printf("\n"); } if (optind > argc) { ++argerr; } if (argerr) { usage(crm_system_name, LSB_EXIT_GENERIC); } the_cib = cib_new(); do { if(attempts != 0) { sleep(1); } - exit_code = the_cib->cmds->signon(the_cib, cib_query); + exit_code = the_cib->cmds->signon( + the_cib, crm_system_name, cib_query); } while(exit_code == cib_connection && attempts++ < max_failures); if(exit_code != cib_ok) { crm_err("Signon to CIB failed: %s", cib_error2string(exit_code)); } if(exit_code == cib_ok) { exit_code = the_cib->cmds->set_connection_dnotify( the_cib, cib_connection_destroy); } if(exit_code == cib_ok && pre_notify) { exit_code = the_cib->cmds->add_notify_callback( the_cib, T_CIB_PRE_NOTIFY, cibmon_pre_notify); if(exit_code != cib_ok) { crm_err("Failed to set %s callback: %s", T_CIB_PRE_NOTIFY, cib_error2string(exit_code)); } } if(exit_code == cib_ok && post_notify) { exit_code = the_cib->cmds->add_notify_callback( the_cib, T_CIB_POST_NOTIFY, cibmon_post_notify); if(exit_code != cib_ok) { crm_err("Failed to set %s callback: %s", T_CIB_POST_NOTIFY, cib_error2string(exit_code)); } } if(exit_code == cib_ok && update_notify) { exit_code = the_cib->cmds->add_notify_callback( the_cib, T_CIB_UPDATE_CONFIRM, cibmon_update_confirm); if(exit_code != cib_ok) { crm_err("Failed to set %s callback: %s", T_CIB_UPDATE_CONFIRM, cib_error2string(exit_code)); } } if(exit_code != cib_ok) { crm_err("Setup failed, could not monitor CIB actions"); return -exit_code; } mainloop = g_main_new(FALSE); crm_info("Starting mainloop"); g_main_run(mainloop); crm_devel("%s exiting normally", crm_system_name); fflush(stderr); return -exit_code; } void usage(const char *cmd, int exit_status) { FILE *stream; stream = exit_status != 0 ? stderr : stdout; #if 0 fprintf(stream, "usage: %s [-?Vio] command\n" "\twhere necessary, XML data will be expected using -X" " or on STDIN if -X isnt specified\n", cmd); fprintf(stream, "Options\n"); fprintf(stream, "\t--%s (-%c) \tid of the object being operated on\n", XML_ATTR_ID, 'i'); fprintf(stream, "\t--%s (-%c) \tobject type being operated on\n", "obj_type", 'o'); fprintf(stream, "\t--%s (-%c)\tturn on debug info." " additional instance increase verbosity\n", "verbose", 'V'); fprintf(stream, "\t--%s (-%c)\tthis help message\n", "help", '?'); fprintf(stream, "\nCommands\n"); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_ERASE, 'E'); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_QUERY, 'Q'); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_CREATE, 'C'); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_REPLACE,'R'); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_UPDATE, 'U'); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_DELETE, 'D'); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_BUMP, 'B'); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_ISMASTER,'M'); fprintf(stream, "\t--%s (-%c)\t\n", CRM_OP_CIB_SYNC, 'S'); fprintf(stream, "\nXML data\n"); fprintf(stream, "\t--%s (-%c) \t\n", F_CRM_DATA, 'X'); fprintf(stream, "\nAdvanced Options\n"); fprintf(stream, "\t--%s (-%c)\tsend command to specified host." " Applies to %s and %s commands only\n", "host", 'h', CRM_OP_CIB_QUERY, CRM_OP_CIB_SYNC); fprintf(stream, "\t--%s (-%c)\tcommand only takes effect locally" " on the specified host\n", "local", 'l'); fprintf(stream, "\t--%s (-%c)\twait for call to complete before" " returning\n", "sync-call", 's'); #endif fflush(stream); exit(exit_status); } void cib_connection_destroy(gpointer user_data) { crm_err("Connection to the CIB terminated... exiting"); g_main_quit(mainloop); return; } int update_depth = 0; gboolean last_notify_pre = TRUE; void cibmon_pre_notify(const char *event, HA_Message *msg) { int rc = -1; const char *op = NULL; const char *id = NULL; const char *type = NULL; crm_data_t *update = NULL; crm_data_t *pre_update = NULL; if(msg == NULL) { crm_err("NULL update"); return; } op = cl_get_string(msg, F_CIB_OPERATION); id = cl_get_string(msg, F_CIB_OBJID); type = cl_get_string(msg, F_CIB_OBJTYPE); update = get_message_xml(msg, F_CIB_UPDATE); pre_update = get_message_xml(msg, F_CIB_EXISTING); ha_msg_value_int(msg, F_CIB_RC, &rc); update_depth++; last_notify_pre = TRUE; if(update_depth > 1 && intermediate_changes == FALSE) { crm_trace("[%s] Ignoring intermediate update", event); return; } if(update != NULL) { crm_devel(UPDATE_PREFIX"[%s] Performing %s on <%s%s%s>", event, op, type, id?" id=":"", id?id:""); print_xml_formatted(LOG_INSANE, UPDATE_PREFIX, update, "Update"); } else if(update == NULL) { crm_info(UPDATE_PREFIX"[%s] Performing operation %s (on section=%s)", event, op, crm_str(type)); } print_xml_formatted(LOG_DEV, UPDATE_PREFIX, pre_update, "Existing Object"); } void cibmon_post_notify(const char *event, HA_Message *msg) { int rc = -1; const char *op = NULL; const char *id = NULL; const char *type = NULL; crm_data_t *update = NULL; crm_data_t *output = NULL; crm_data_t *generation = NULL; if(msg == NULL) { crm_err("NULL update"); return; } op = cl_get_string(msg, F_CIB_OPERATION); id = cl_get_string(msg, F_CIB_OBJID); type = cl_get_string(msg, F_CIB_OBJTYPE); update = get_message_xml(msg, F_CIB_UPDATE); output = get_message_xml(msg, F_CIB_UPDATE_RESULT); generation = get_message_xml(msg, "cib_generation"); update_depth--; if(last_notify_pre == FALSE && update_depth > 0 && intermediate_changes == FALSE) { crm_trace("Ignoring intermediate update"); return; } last_notify_pre = FALSE; ha_msg_value_int(msg, F_CIB_RC, &rc); if(update == NULL) { if(rc == cib_ok) { crm_verbose(UPDATE_PREFIX"[%s] %s (to %s) completed", event, op, crm_str(type)); } else { crm_warn(UPDATE_PREFIX"[%s] %s (to %s) FAILED: (%d) %s", event, op, crm_str(type), rc, cib_error2string(rc)); } } else { if(rc == cib_ok) { crm_verbose(UPDATE_PREFIX"[%s] Operation %s to <%s%s%s> completed.", event, op, crm_str(type), id?" id=":"", id?id:""); } else { crm_warn(UPDATE_PREFIX"[%s] Operation %s to <%s %s%s> FAILED: (%d) %s", event, op, crm_str(type), id?" id=":"", id?id:"", rc, cib_error2string(rc)); } } if(update == NULL) { print_xml_formatted( rc==cib_ok?LOG_DEBUG:LOG_WARNING, UPDATE_PREFIX, update, "Update"); } print_xml_formatted( rc==cib_ok?LOG_DEV:LOG_WARNING, UPDATE_PREFIX, output, "Resulting Object"); if(update_depth == 0) { print_xml_formatted( rc==cib_ok?LOG_DEBUG:LOG_WARNING, UPDATE_PREFIX, generation, "CIB Generation"); } } void cibmon_update_confirm(const char *event, HA_Message *msg) { int rc = -1; const char *op = NULL; const char *id = NULL; const char *type = NULL; if(msg == NULL) { crm_err("NULL update"); return; } op = cl_get_string(msg, F_CIB_OPERATION); id = cl_get_string(msg, F_CIB_OBJID); type = cl_get_string(msg, F_CIB_OBJTYPE); ha_msg_value_int(msg, F_CIB_RC, &rc); if(id == NULL) { if(rc == cib_ok) { crm_info(UPDATE_PREFIX"[%s] %s (to section=%s) confirmed.\n", event, op, crm_str(type)); } else { crm_warn(UPDATE_PREFIX"[%s] %s (to section=%s) ABORTED: (%d) %s\n", event, op, crm_str(type), rc, cib_error2string(rc)); } } else { if(rc == cib_ok) { crm_info(UPDATE_PREFIX"[%s] %s (to <%s%s%s>) confirmed\n", event, op, crm_str(type), id?" id=":"", id?id:""); } else { crm_warn(UPDATE_PREFIX"[%s] %s (to <%s%s%s>) ABORTED: (%d) %s\n", event, op, crm_str(type), id?" id=":"", id?id:"", rc, cib_error2string(rc)); } } crm_devel(UPDATE_PREFIX"================================="); } diff --git a/crm/cib/notify.c b/crm/cib/notify.c index 0fc70edb46..4e15e37f8e 100644 --- a/crm/cib/notify.c +++ b/crm/cib/notify.c @@ -1,223 +1,226 @@ -/* $Id: notify.c,v 1.13 2005/02/19 18:11:03 andrew Exp $ */ +/* $Id: notify.c,v 1.14 2005/02/21 13:13:45 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include extern GHashTable *client_list; int pending_updates = 0; void cib_notify_client(gpointer key, gpointer value, gpointer user_data); void attach_cib_generation(HA_Message *msg, const char *field, crm_data_t *a_cib); void cib_notify_client(gpointer key, gpointer value, gpointer user_data) { HA_Message *update_msg = user_data; cib_client_t *client = value; CRM_DEV_ASSERT(client != NULL); CRM_DEV_ASSERT(update_msg != NULL); if(client != NULL && safe_str_eq(client->channel_name, cib_channel_callback)) { - crm_trace("Notifying client %s of update", client->id); + crm_trace("Notifying client %s/%s of update", + client->name, client->id); if(client->channel->should_send_blocking == FALSE) { - crm_warn("Client channel %s was not set to \"send blocking\"", client->id); + crm_warn("Client channel %s/%s was not set to" + " \"send blocking\"", client->name,client->id); client->channel->should_send_blocking = TRUE; } if(msg2ipcchan(update_msg, client->channel) != HA_OK) { - crm_err("Notification of client %s failed", client->id); + crm_warn("Notification of client %s/%s failed", + client->name, client->id); } } } void cib_pre_notify( const char *op, crm_data_t *existing, crm_data_t *update) { HA_Message *update_msg = ha_msg_new(6); const char *type = NULL; const char *id = NULL; if(update != NULL) { id = crm_element_value(update, XML_ATTR_ID); } ha_msg_add(update_msg, F_TYPE, T_CIB_NOTIFY); ha_msg_add(update_msg, F_SUBTYPE, T_CIB_PRE_NOTIFY); ha_msg_add(update_msg, F_CIB_OPERATION, op); if(id != NULL) { ha_msg_add(update_msg, F_CIB_OBJID, id); } if(update != NULL) { ha_msg_add(update_msg, F_CIB_OBJTYPE, crm_element_name(update)); } else if(existing != NULL) { ha_msg_add(update_msg, F_CIB_OBJTYPE, crm_element_name(existing)); } type = cl_get_string(update_msg, F_CIB_OBJTYPE); attach_cib_generation(update_msg, "cib_generation", the_cib); if(existing != NULL) { add_message_xml(update_msg, F_CIB_EXISTING, existing); } if(update != NULL) { add_message_xml(update_msg, F_CIB_UPDATE, update); } g_hash_table_foreach(client_list, cib_notify_client, update_msg); pending_updates++; if(update == NULL) { crm_verbose("Performing operation %s (on section=%s)", op, type); } else { crm_verbose("Performing %s on <%s%s%s>", op, type, id?" id=":"", id?id:""); } crm_msg_del(update_msg); } void cib_post_notify( const char *op, crm_data_t *update, enum cib_errors result, crm_data_t *new_obj) { HA_Message *update_msg = ha_msg_new(8); char *type = NULL; char *id = NULL; if(update != NULL && crm_element_value(new_obj, XML_ATTR_ID) != NULL){ id = crm_element_value_copy(new_obj, XML_ATTR_ID); } ha_msg_add(update_msg, F_TYPE, T_CIB_NOTIFY); ha_msg_add(update_msg, F_SUBTYPE, T_CIB_POST_NOTIFY); ha_msg_add(update_msg, F_CIB_OPERATION, op); ha_msg_add_int(update_msg, F_CIB_RC, result); if(id != NULL) { ha_msg_add(update_msg, F_CIB_OBJID, id); } if(update != NULL) { crm_trace("Setting type to update->name: %s", crm_element_name(update)); ha_msg_add(update_msg, F_CIB_OBJTYPE, crm_element_name(update)); type = crm_strdup(crm_element_name(update)); } else if(new_obj != NULL) { crm_trace("Setting type to new_obj->name: %s", crm_element_name(new_obj)); ha_msg_add(update_msg, F_CIB_OBJTYPE, crm_element_name(new_obj)); type = crm_strdup(crm_element_name(new_obj)); } else { crm_trace("Not Setting type"); } attach_cib_generation(update_msg, "cib_generation", the_cib); if(update != NULL) { add_message_xml(update_msg, F_CIB_UPDATE, update); } if(new_obj != NULL) { add_message_xml(update_msg, F_CIB_UPDATE_RESULT, new_obj); } crm_devel("Notifying clients"); g_hash_table_foreach(client_list, cib_notify_client, update_msg); pending_updates--; if(pending_updates == 0) { ha_msg_mod(update_msg, F_SUBTYPE, T_CIB_UPDATE_CONFIRM); crm_devel("Sending confirmation to clients"); g_hash_table_foreach(client_list, cib_notify_client, update_msg); } if(update == NULL) { if(result == cib_ok) { crm_verbose("Operation %s (on section=%s) completed", op, crm_str(type)); } else { crm_warn("Operation %s (on section=%s) FAILED: (%d) %s", op, crm_str(type), result, cib_error2string(result)); } } else { if(result == cib_ok) { crm_verbose("Completed %s of <%s %s%s>", op, crm_str(type), id?"id=":"", id?id:""); } else { crm_warn("%s of <%s %s%s> FAILED: %s", op,crm_str(type), id?"id=":"", id?id:"", cib_error2string(result)); } } crm_free(id); crm_free(type); crm_msg_del(update_msg); crm_devel("Notify complete"); } void attach_cib_generation(HA_Message *msg, const char *field, crm_data_t *a_cib) { crm_data_t *generation = create_xml_node( NULL, XML_CIB_TAG_GENERATION_TUPPLE); if(the_cib != NULL) { copy_in_properties(generation, a_cib); } add_message_xml(msg, field, a_cib); free_xml(generation); } diff --git a/crm/crmd/cib.c b/crm/crmd/cib.c index c6485c82a2..d6ce0909c5 100644 --- a/crm/crmd/cib.c +++ b/crm/crmd/cib.c @@ -1,285 +1,285 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include /* for access */ #include #include #include /* for calls to open */ #include /* for calls to open */ #include /* for calls to open */ #include /* for getpwuid */ #include /* for initgroups */ #include /* for getrlimit */ #include /* for getrlimit */ #include #include #include #include #include #include #include #include struct crm_subsystem_s *cib_subsystem = NULL; void crmd_update_confirm(const char *event, HA_Message *msg); int cib_retries = 0; /* A_CIB_STOP, A_CIB_START, A_CIB_RESTART, */ enum crmd_fsa_input do_cib_control(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { enum crmd_fsa_input result = I_NULL; struct crm_subsystem_s *this_subsys = cib_subsystem; long long stop_actions = A_CIB_STOP; long long start_actions = A_CIB_START; if(action & stop_actions) { if(fsa_cib_conn != NULL && fsa_cib_conn->state != cib_disconnected) { fsa_cib_conn->cmds->signoff(fsa_cib_conn); } } if(action & start_actions) { if(cur_state != S_STOPPING) { if(fsa_cib_conn == NULL) { fsa_cib_conn = cib_new(); } - if(fsa_cib_conn->cmds->signon( - fsa_cib_conn, cib_command) != cib_ok) { + if(cib_ok != fsa_cib_conn->cmds->signon( + fsa_cib_conn, CRM_SYSTEM_CRMD, cib_command)){ crm_warn("Could not connect to the CIB service"); } else if(fsa_cib_conn->cmds->add_notify_callback( fsa_cib_conn, T_CIB_UPDATE_CONFIRM, crmd_update_confirm) != cib_ok) { crm_err("Could not set notify callback"); } else if(fsa_cib_conn->cmds->set_connection_dnotify( fsa_cib_conn, crmd_cib_connection_destroy)!=cib_ok){ crm_err("Could not set dnotify callback"); } else { set_bit_inplace( fsa_input_register, R_CIB_CONNECTED); } if(is_set(fsa_input_register, R_CIB_CONNECTED) == FALSE) { crm_warn("Could complete CIB registration %d" " time... retry", cib_retries); if(++cib_retries < 30) { startTimer(wait_timer); crmd_fsa_stall(); } else { crm_err("Could complete CIB" " registration %d times..." " hard error", cib_retries); register_fsa_error( C_FSA_INTERNAL, I_ERROR, NULL); } } else { cib_retries = 0; } } else { crm_info("Ignoring request to start %s after shutdown", this_subsys->name); } } return result; } /* A_CIB_INVOKE, A_CIB_BUMPGEN, A_UPDATE_NODESTATUS */ enum crmd_fsa_input do_cib_invoke(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data) { HA_Message *answer = NULL; enum crmd_fsa_input result = I_NULL; ha_msg_input_t *cib_msg = fsa_typed_data(fsa_dt_ha_msg); const char *sys_from = cl_get_string(cib_msg->msg, F_CRM_SYS_FROM); if(action & A_CIB_INVOKE) { if(safe_str_eq(sys_from, CRM_SYSTEM_CRMD)) { action = A_CIB_INVOKE_LOCAL; } else if(safe_str_eq(sys_from, CRM_SYSTEM_DC)) { action = A_CIB_INVOKE_LOCAL; } } if(action & A_CIB_INVOKE || action & A_CIB_INVOKE_LOCAL) { int call_options = 0; enum cib_errors rc = cib_ok; crm_data_t *cib_frag = NULL; const char *section = NULL; const char *op = cl_get_string(cib_msg->msg, F_CRM_TASK); section = cl_get_string(cib_msg->msg, F_CIB_SECTION); ha_msg_value_int(cib_msg->msg, F_CIB_CALLOPTS, &call_options); crm_log_message(LOG_MSG, cib_msg->msg); crm_xml_devel(cib_msg->xml, "[CIB update]"); if(op == NULL) { crm_err("Invalid CIB Message"); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); return I_NULL; } cib_frag = NULL; rc = fsa_cib_conn->cmds->variant_op( fsa_cib_conn, op, NULL, section, cib_msg->xml, &cib_frag, call_options); if(rc != cib_ok || (action & A_CIB_INVOKE)) { answer = create_reply(cib_msg->msg, cib_frag); ha_msg_add(answer,XML_ATTR_RESULT,cib_error2string(rc)); } if(action & A_CIB_INVOKE) { if(relay_message(answer, TRUE) == FALSE) { crm_err("Confused what to do with cib result"); crm_log_message(LOG_ERR, answer); crm_msg_del(answer); result = I_ERROR; } } else if(rc != cib_ok) { ha_msg_input_t *input = NULL; crm_err("Internal CRM/CIB command from %s() failed: %s", msg_data->origin, cib_error2string(rc)); crm_log_message_adv(LOG_ERR, "CIB Input", cib_msg->msg); crm_log_message_adv(LOG_ERR, "CIB Reply", answer); input = new_ha_msg_input(answer); register_fsa_input(C_FSA_INTERNAL, I_ERROR, input); crm_msg_del(answer); delete_ha_msg_input(input); } return result; } else { crm_err("Unexpected action %s in %s", fsa_action2string(action), __FUNCTION__); } return I_NULL; } /* frees fragment as part of delete_ha_msg_input() */ void update_local_cib_adv( crm_data_t *msg_data, gboolean do_now, const char *raised_from) { HA_Message *msg = NULL; ha_msg_input_t *fsa_input = NULL; int call_options = cib_sync_call; CRM_DEV_ASSERT(msg_data != NULL); crm_malloc(fsa_input, sizeof(ha_msg_input_t)); msg = create_request(CRM_OP_CIB_UPDATE, msg_data, NULL, CRM_SYSTEM_CIB, CRM_SYSTEM_CRMD, NULL); ha_msg_add(msg, F_CIB_SECTION, crm_element_value(msg_data, XML_ATTR_SECTION)); ha_msg_add_int(msg, F_CIB_CALLOPTS, call_options); ha_msg_add(msg, "call_origin", raised_from); fsa_input->msg = msg; fsa_input->xml = msg_data; CRM_DEV_ASSERT(cib_ok == fsa_cib_conn->cmds->is_master(fsa_cib_conn)); if(do_now == FALSE) { crm_devel("Registering event with FSA"); register_fsa_input_adv(C_FSA_INTERNAL, I_CIB_OP, fsa_input, 0, FALSE, raised_from); } else { fsa_data_t *op_data = NULL; crm_devel("Invoking CIB handler directly"); crm_malloc(op_data, sizeof(fsa_data_t)); op_data->fsa_cause = C_FSA_INTERNAL; op_data->fsa_input = I_CIB_OP; op_data->origin = raised_from; op_data->data = fsa_input; op_data->data_type = fsa_dt_ha_msg; do_cib_invoke(A_CIB_INVOKE_LOCAL, C_FSA_INTERNAL, fsa_state, I_CIB_OP, op_data); crm_free(op_data); crm_devel("CIB handler completed"); } crm_devel("deleting input"); #if 0 delete_ha_msg_input(fsa_input); #else crm_msg_del(fsa_input->msg); crm_free(fsa_input); /* BUG: it should be possible to free this but for some reason I cant */ /* free_xml(fsa_input->xml); */ #endif crm_devel("deleted input"); } void crmd_update_confirm(const char *event, HA_Message *msg) { int rc = -1; const char *op = cl_get_string(msg, F_CIB_OPERATION); ha_msg_value_int(msg, F_CIB_RC, &rc); if(rc != cib_ok) { crm_trace("Ignoring failed CIB update"); return; } if(safe_str_eq(op, CRM_OP_CIB_ERASE)) { /* regenerate everyone's state and our node entry */ register_fsa_input(C_UNKNOWN, I_ELECTION_DC, NULL); } } diff --git a/crm/tengine/main.c b/crm/tengine/main.c index 854f9a9de7..c7e2365f3b 100644 --- a/crm/tengine/main.c +++ b/crm/tengine/main.c @@ -1,228 +1,228 @@ -/* $Id: main.c,v 1.19 2005/02/19 18:16:12 andrew Exp $ */ +/* $Id: main.c,v 1.20 2005/02/21 13:13:45 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define SYS_NAME CRM_SYSTEM_TENGINE #define OPTARGS "hVc" GMainLoop* mainloop = NULL; const char* crm_system_name = SYS_NAME; extern cib_t *te_cib_conn; void usage(const char* cmd, int exit_status); int init_start(void); gboolean tengine_shutdown(int nsig, gpointer unused); extern void te_update_confirm(const char *event, HA_Message *msg); int main(int argc, char ** argv) { gboolean allow_cores = TRUE; int argerr = 0; int flag; crm_log_init(crm_system_name); set_crm_log_level(LOG_DEBUG); G_main_add_SignalHandler( G_PRIORITY_HIGH, SIGTERM, tengine_shutdown, NULL, NULL); crm_devel("Begining option processing"); while ((flag = getopt(argc, argv, OPTARGS)) != EOF) { switch(flag) { case 'V': alter_debug(DEBUG_INC); break; case 'h': /* Help message */ usage(crm_system_name, LSB_EXIT_OK); break; case 'c': allow_cores = TRUE; break; default: ++argerr; break; } } crm_devel("Option processing complete"); if (optind > argc) { ++argerr; } if (argerr) { usage(crm_system_name,LSB_EXIT_GENERIC); } /* read local config file */ crm_devel("Starting..."); return init_start(); } int init_start(void) { int init_ok = TRUE; init_client_ipc_comms( CRM_SYSTEM_CRMD, subsystem_msg_dispatch, (void*)process_te_message, &crm_ch); if(crm_ch != NULL) { send_hello_message(crm_ch, "1234", CRM_SYSTEM_TENGINE, "0", "1"); } else { init_ok = FALSE; crm_err("Could not connect to the CRMd"); } if(init_ok) { crm_trace("Creating CIB connection"); te_cib_conn = cib_new(); if(te_cib_conn == NULL) { init_ok = FALSE; } } if(init_ok) { crm_trace("Connecting to the CIB"); - if(te_cib_conn->cmds->signon( - te_cib_conn, cib_command) != cib_ok) { + if(cib_ok != te_cib_conn->cmds->signon( + te_cib_conn, crm_system_name, cib_command)) { init_ok = FALSE; } } if(init_ok) { crm_trace("Setting CIB notification callback"); if(te_cib_conn->cmds->add_notify_callback( te_cib_conn, T_CIB_UPDATE_CONFIRM, te_update_confirm) != cib_ok) { crm_err("Could not set CIB notification callback"); init_ok = FALSE; } } if(init_ok && ST_OK != stonithd_signon(crm_system_name)) { crm_err("Could not sign up to stonithd"); /* init_ok = FALSE; */ } if(init_ok && ST_OK != stonithd_set_stonith_ops_callback( tengine_stonith_callback, NULL)) { crm_err("Could not set stonith callback"); stonithd_signoff(); /* init_ok = FALSE; */ } if(init_ok) { IPC_Channel *fence_ch = stonithd_input_IPC_channel(); if(fence_ch == NULL) { } else if(NULL == G_main_add_IPC_Channel( G_PRIORITY_LOW, fence_ch, FALSE, tengine_stonith_dispatch, NULL, tengine_stonith_connection_destroy)) { crm_err("Failed to add Fencing channel to our mainloop"); init_ok = FALSE; } } if(init_ok) { /* Create the mainloop and run it... */ crm_info("Starting %s", crm_system_name); mainloop = g_main_new(FALSE); g_main_run(mainloop); return_to_orig_privs(); crm_info("Exiting %s", crm_system_name); } else { crm_warn("Initialization errors, %s not starting.", crm_system_name); } if(init_ok) { return 0; } return 1; } void usage(const char* cmd, int exit_status) { FILE* stream; stream = exit_status ? stderr : stdout; fprintf(stream, "usage: %s [-srkh]" "[-c configure file]\n", cmd); /* fprintf(stream, "\t-d\tsets debug level\n"); */ /* fprintf(stream, "\t-s\tgets daemon status\n"); */ /* fprintf(stream, "\t-r\trestarts daemon\n"); */ /* fprintf(stream, "\t-k\tstops daemon\n"); */ /* fprintf(stream, "\t-h\thelp message\n"); */ fflush(stream); exit(exit_status); } gboolean tengine_shutdown(int nsig, gpointer unused) { static int shuttingdown = 0; if (!shuttingdown) { shuttingdown = 1; } if (mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); }else{ exit(LSB_EXIT_OK); } return TRUE; } diff --git a/include/crm/cib.h b/include/crm/cib.h index c44531ce66..be6470f9ca 100644 --- a/include/crm/cib.h +++ b/include/crm/cib.h @@ -1,280 +1,282 @@ -/* $Id: cib.h,v 1.16 2005/02/16 18:22:42 andrew Exp $ */ +/* $Id: cib.h,v 1.17 2005/02/21 13:13:45 andrew Exp $ */ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef CIB__H #define CIB__H #include #include #include #include #define cib_feature_revision 1 #define cib_feature_revision_s "1" enum cib_variant { cib_native, cib_database, cib_edir }; enum cib_state { cib_connected_command, cib_connected_query, cib_disconnected }; enum cib_conn_type { cib_command, cib_query, cib_no_connection }; enum cib_call_options { cib_none = 0x000000, cib_verbose = 0x000001, cib_discard_reply = 0x000004, cib_scope_local = 0x000010, /* cib_scope_global = 0x000020, */ cib_sync_call = 0x000040, /* cib_async_call = 0x000080, */ cib_inhibit_notify= 0x000100 }; #define cib_default_options = cib_none enum cib_errors { cib_ok = 0, cib_operation = -1, cib_create_msg = -2, cib_not_connected = -3, cib_not_authorized = -4, cib_send_failed = -5, cib_reply_failed = -6, cib_return_code = -7, cib_output_ptr = -8, cib_output_data = -9, cib_connection = -10, cib_authentication = -11, cib_missing = -12, cib_variant = -28, CIBRES_MISSING_ID = -13, CIBRES_MISSING_TYPE = -14, CIBRES_MISSING_FIELD = -15, CIBRES_OBJTYPE_MISMATCH = -16, CIBRES_CORRUPT = -17, CIBRES_OTHER = -18, cib_unknown = -19, cib_STALE = -20, cib_EXISTS = -21, cib_NOTEXISTS = -22, cib_ACTIVATION = -23, cib_NOSECTION = -24, cib_NOOBJECT = -25, cib_NOPARENT = -26, cib_NODECOPY = -27, cib_NOTSUPPORTED = -29, cib_registration_msg = -30, cib_callback_token = -31, cib_callback_register = -32, cib_msg_field_add = -33, cib_client_gone = -34, cib_not_master = -35, cib_client_corrupt = -36, cib_master_timeout = -37, cib_revision_unsupported= -38, cib_revision_unknown = -39, cib_missing_data = -40 }; enum cib_op { CIB_OP_NONE = 0, CIB_OP_ADD, CIB_OP_MODIFY, CIB_OP_DELETE, CIB_OP_MAX }; enum cib_section { cib_section_none, cib_section_all, cib_section_nodes, cib_section_constraints, cib_section_resources, cib_section_crmconfig, cib_section_status }; #define F_CIB_CLIENTID "cib_clientid" #define F_CIB_CALLOPTS "cib_callopt" #define F_CIB_CALLID "cib_callid" #define F_CIB_CALLDATA "cib_calldata" #define F_CIB_OPERATION "cib_op" #define F_CIB_ISREPLY "cib_isreplyto" #define F_CIB_SECTION "cib_section" #define F_CIB_HOST "cib_host" #define F_CIB_RC "cib_rc" #define F_CIB_DELEGATED "cib_delegated_from" #define F_CIB_OBJID "cib_object" #define F_CIB_OBJTYPE "cib_object_type" #define F_CIB_EXISTING "cib_existing_object" #define F_CIB_SEENCOUNT "cib_seen" #define F_CIB_TIMEOUT "cib_timeout" #define F_CIB_UPDATE "cib_update" #define F_CIB_CALLBACK_TOKEN "cib_callback_token" #define F_CIB_GLOBAL_UPDATE "cib_update" #define F_CIB_UPDATE_RESULT "cib_update_result" +#define F_CIB_CLIENTNAME "cib_clientname" #define T_CIB "cib" #define T_CIB_NOTIFY "cib_notify" /* notify sub-types */ #define T_CIB_PRE_NOTIFY "cib_pre_notify" #define T_CIB_POST_NOTIFY "cib_post_notify" #define T_CIB_UPDATE_CONFIRM "cib_update_confirmation" #define cib_channel_ro "cib_ro" #define cib_channel_rw "cib_rw" #define cib_channel_callback "cib_callback" typedef struct cib_s cib_t; typedef struct cib_api_operations_s { int (*variant_op)( cib_t *cib, const char *op, const char *host, const char *section, crm_data_t *data, crm_data_t **output_data, int call_options); - int (*signon) (cib_t *cib, enum cib_conn_type type); + int (*signon) ( + cib_t *cib, const char *name, enum cib_conn_type type); int (*signoff)(cib_t *cib); int (*free) (cib_t *cib); int (*set_op_callback)( cib_t *cib, void (*callback)( const HA_Message *msg, int callid , int rc, crm_data_t *output)); int (*add_notify_callback)( cib_t *cib, const char *event, void (*callback)( const char *event, HA_Message *msg)); int (*del_notify_callback)( cib_t *cib, const char *event, void (*callback)( const char *event, HA_Message *msg)); int (*set_connection_dnotify)( cib_t *cib, void (*dnotify)(gpointer user_data)); IPC_Channel *(*channel)(cib_t* cib); int (*inputfd)(cib_t* cib); int (*noop)(cib_t *cib, int call_options); int (*ping)( cib_t *cib, crm_data_t **output_data, int call_options); int (*query)(cib_t *cib, const char *section, crm_data_t **output_data, int call_options); int (*query_from)( cib_t *cib, const char *host, const char *section, crm_data_t **output_data, int call_options); int (*is_master) (cib_t *cib); int (*set_master)(cib_t *cib, int call_options); int (*set_slave) (cib_t *cib, int call_options); int (*set_slave_all)(cib_t *cib, int call_options); int (*sync)(cib_t *cib, const char *section, int call_options); int (*sync_from)( cib_t *cib, const char *host, const char *section, int call_options); int (*bump_epoch)(cib_t *cib, int call_options); int (*create)(cib_t *cib, const char *section, crm_data_t *data, crm_data_t **output_data, int call_options) ; int (*modify)(cib_t *cib, const char *section, crm_data_t *data, crm_data_t **output_data, int call_options) ; int (*replace)(cib_t *cib, const char *section, crm_data_t *data, crm_data_t **output_data, int call_options) ; int (*delete)(cib_t *cib, const char *section, crm_data_t *data, crm_data_t **output_data, int call_options) ; int (*erase)( cib_t *cib, crm_data_t **output_data, int call_options); int (*quit)(cib_t *cib, int call_options); gboolean (*msgready)(cib_t* cib); int (*rcvmsg)(cib_t* cib, int blocking); gboolean (*dispatch)(IPC_Channel *channel, gpointer user_data); } cib_api_operations_t; struct cib_s { enum cib_state state; enum cib_conn_type type; int call_id; void *variant_opaque; GList *notify_list; void (*op_callback)(const HA_Message *msg, int call_id, int rc, crm_data_t *output); cib_api_operations_t *cmds; }; typedef struct cib_notify_client_s { const char *event; const char *obj_id; /* implement one day */ const char *obj_type; /* implement one day */ void (*callback)( const char *event, HA_Message *msg); } cib_notify_client_t; /* Core functions */ extern cib_t *cib_new(void); extern gboolean startCib(const char *filename); extern crm_data_t *get_cib_copy(cib_t *cib); extern crm_data_t *cib_get_generation(cib_t *cib); extern int cib_compare_generation(crm_data_t *left, crm_data_t *right); /* Utility functions */ extern crm_data_t *get_object_root(const char *object_type,crm_data_t *the_root); extern crm_data_t *create_cib_fragment_adv( crm_data_t *update, const char *section, const char *source); extern char *cib_pluralSection(const char *a_section); /* Error Interpretation*/ extern const char *cib_error2string(enum cib_errors); extern const char *cib_op2string(enum cib_op); extern crm_data_t *createEmptyCib(void); extern gboolean verifyCibXml(crm_data_t *cib); extern int cib_section2enum(const char *a_section); #define create_cib_fragment(update,section) create_cib_fragment_adv(update, section, __FUNCTION__) #endif diff --git a/lib/crm/cib/cib_native.c b/lib/crm/cib/cib_native.c index 7b5f02db17..65cbaab102 100755 --- a/lib/crm/cib/cib_native.c +++ b/lib/crm/cib/cib_native.c @@ -1,586 +1,587 @@ /* * Copyright (c) 2004 International Business Machines * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ #include #include #include #include #include #include #include #include #include #include #include #include #include typedef struct cib_native_opaque_s { IPC_Channel *command_channel; IPC_Channel *callback_channel; GCHSource *callback_source; } cib_native_opaque_t; int cib_native_perform_op( cib_t *cib, const char *op, const char *host, const char *section, crm_data_t *data, crm_data_t **output_data, int call_options); -int cib_native_signon(cib_t* cib, enum cib_conn_type type); +int cib_native_signon(cib_t* cib, const char *name, enum cib_conn_type type); int cib_native_signoff(cib_t* cib); int cib_native_free(cib_t* cib); IPC_Channel *cib_native_channel(cib_t* cib); int cib_native_inputfd(cib_t* cib); gboolean cib_native_msgready(cib_t* cib); int cib_native_rcvmsg(cib_t* cib, int blocking); gboolean cib_native_dispatch(IPC_Channel *channel, gpointer user_data); cib_t *cib_native_new (cib_t *cib); int cib_native_set_connection_dnotify( cib_t *cib, void (*dnotify)(gpointer user_data)); void cib_native_notify(gpointer data, gpointer user_data); void cib_native_callback(cib_t *cib, struct ha_msg *msg); cib_t* cib_native_new (cib_t *cib) { cib_native_opaque_t *native = NULL; crm_malloc(cib->variant_opaque, sizeof(cib_native_opaque_t)); native = cib->variant_opaque; native->command_channel = NULL; native->callback_channel = NULL; /* assign variant specific ops*/ cib->cmds->variant_op = cib_native_perform_op; cib->cmds->signon = cib_native_signon; cib->cmds->signoff = cib_native_signoff; cib->cmds->free = cib_native_free; cib->cmds->channel = cib_native_channel; cib->cmds->inputfd = cib_native_inputfd; cib->cmds->msgready = cib_native_msgready; cib->cmds->rcvmsg = cib_native_rcvmsg; cib->cmds->dispatch = cib_native_dispatch; cib->cmds->set_connection_dnotify = cib_native_set_connection_dnotify; return cib; } int -cib_native_signon(cib_t* cib, enum cib_conn_type type) +cib_native_signon(cib_t* cib, const char *name, enum cib_conn_type type) { int rc = cib_ok; char *uuid_ticket = NULL; struct ha_msg *reg_msg = NULL; cib_native_opaque_t *native = cib->variant_opaque; crm_trace("Connecting command channel"); if(type == cib_command) { cib->state = cib_connected_command; native->command_channel = init_client_ipc_comms_nodispatch( cib_channel_rw); } else { cib->state = cib_connected_query; native->command_channel = init_client_ipc_comms_nodispatch( cib_channel_ro); } if(native->command_channel == NULL) { crm_debug("Connection to command channel failed"); rc = cib_connection; } else if(native->command_channel->ch_status != IPC_CONNECT) { crm_err("Connection may have succeeded," " but authentication to command channel failed"); rc = cib_authentication; } if(rc == cib_ok) { crm_trace("Connecting callback channel"); native->callback_source = init_client_ipc_comms( cib_channel_callback, cib_native_dispatch, cib, &(native->callback_channel)); if(native->callback_channel == NULL) { crm_debug("Connection to callback channel failed"); rc = cib_connection; } else if(native->callback_source == NULL) { crm_err("Callback source not recorded"); rc = cib_connection; } } else if(rc == cib_ok && native->callback_channel->ch_status != IPC_CONNECT) { crm_err("Connection may have succeeded," " but authentication to callback channel failed"); rc = cib_authentication; } if(rc == cib_ok) { const char *msg_type = NULL; crm_trace("Waiting for msg on command channel"); reg_msg = msgfromIPC_noauth(native->command_channel); msg_type = cl_get_string(reg_msg, F_CIB_OPERATION); if(safe_str_neq(msg_type, CRM_OP_REGISTER) ) { crm_err("Invalid registration message: %s", msg_type); rc = cib_registration_msg; } else { const char *tmp_ticket = NULL; crm_trace("Retrieving callback channel ticket"); tmp_ticket = cl_get_string( reg_msg, F_CIB_CALLBACK_TOKEN); if(tmp_ticket == NULL) { rc = cib_callback_token; } else { uuid_ticket = crm_strdup(tmp_ticket); } } crm_msg_del(reg_msg); reg_msg = NULL; } if(rc == cib_ok) { crm_trace("Registering callback channel with ticket %s", crm_str(uuid_ticket)); reg_msg = ha_msg_new(2); ha_msg_add(reg_msg, F_CIB_OPERATION, CRM_OP_REGISTER); ha_msg_add(reg_msg, F_CIB_CALLBACK_TOKEN, uuid_ticket); + ha_msg_add(reg_msg, F_CIB_CLIENTNAME, name); CRM_DEV_ASSERT(native->command_channel->should_send_blocking); if(msg2ipcchan(reg_msg, native->callback_channel) != HA_OK) { rc = cib_callback_register; } crm_free(uuid_ticket); crm_msg_del(reg_msg); } if(rc == cib_ok) { crm_trace("wait for the callback channel setup to complete"); reg_msg = msgfromIPC_noauth(native->callback_channel); if(reg_msg == NULL) { crm_err("Connection to callback channel not maintined"); rc = cib_connection; } crm_msg_del(reg_msg); } if(rc == cib_ok) { crm_info("Connection to CIB successful"); return cib_ok; } crm_warn("Connection to CIB failed: %s", cib_error2string(rc)); cib_native_signoff(cib); return rc; } int cib_native_signoff(cib_t* cib) { cib_native_opaque_t *native = cib->variant_opaque; crm_info("Signing out of the CIB Service"); /* close channels */ if (native->command_channel != NULL) { native->command_channel->ops->destroy( native->command_channel); native->command_channel = NULL; } if (native->callback_channel != NULL) { G_main_del_IPC_Channel(native->callback_source); #ifdef BUG native->callback_channel->ops->destroy( native->callback_channel); #endif native->callback_channel = NULL; } cib->state = cib_disconnected; cib->type = cib_none; return cib_ok; } int cib_native_free (cib_t* cib) { int rc = cib_ok; crm_warn("Freeing CIB"); if(cib->state != cib_disconnected) { rc = cib_native_signoff(cib); if(rc == cib_ok) { crm_free(cib); } } return rc; } IPC_Channel * cib_native_channel(cib_t* cib) { cib_native_opaque_t *native = NULL; if(cib == NULL) { crm_err("Missing cib object"); return NULL; } native = cib->variant_opaque; if(native != NULL) { return native->callback_channel; } crm_err("couldnt find variant specific data in %p", cib); return NULL; } int cib_native_inputfd(cib_t* cib) { IPC_Channel *ch = cib_native_channel(cib); return ch->ops->get_recv_select_fd(ch); } int cib_native_perform_op( cib_t *cib, const char *op, const char *host, const char *section, crm_data_t *data, crm_data_t **output_data, int call_options) { int rc = HA_OK; struct ha_msg *op_msg = NULL; struct ha_msg *op_reply = NULL; cib_native_opaque_t *native = cib->variant_opaque; if(cib->state == cib_disconnected) { return cib_not_connected; } if(output_data != NULL) { *output_data = NULL; } if(op == NULL) { crm_err("No operation specified"); rc = cib_operation; } op_msg = ha_msg_new(7); if (op_msg == NULL) { crm_err("No memory to create HA_Message"); return cib_create_msg; } if(rc == HA_OK) { rc = ha_msg_add(op_msg, F_TYPE, "cib"); } if(rc == HA_OK) { rc = ha_msg_add(op_msg, F_CIB_OPERATION, op); } if(rc == HA_OK && host != NULL) { CRM_DEV_ASSERT(cl_is_allocated(host) == 1); rc = ha_msg_add(op_msg, F_CIB_HOST, host); } if(rc == HA_OK && section != NULL) { rc = ha_msg_add(op_msg, F_CIB_SECTION, section); } if(rc == HA_OK) { rc = ha_msg_add_int(op_msg, F_CIB_CALLID, cib->call_id); } if(rc == HA_OK) { crm_trace("Sending call options: %.8lx, %d", (long)call_options, call_options); rc = ha_msg_add_int(op_msg, F_CIB_CALLOPTS, call_options); } if(rc == HA_OK && data != NULL) { add_message_xml(op_msg, F_CIB_CALLDATA, data); } if (rc != HA_OK) { crm_err("Failed to create CIB operation message"); crm_log_message(LOG_ERR, op_msg); crm_msg_del(op_msg); return cib_create_msg; } cib->call_id++; crm_debug("Sending %s message to CIB service", op); crm_log_message(LOG_MSG, op_msg); rc = msg2ipcchan(op_msg, native->command_channel); if (rc != HA_OK) { crm_err("Sending message to CIB service FAILED: %d", rc); CRM_DEV_ASSERT(native->command_channel->should_send_blocking); crm_log_message(LOG_ERR, op_msg); crm_msg_del(op_msg); return cib_send_failed; } else { crm_devel("Message sent"); } crm_msg_del(op_msg); op_msg = NULL; if((call_options & cib_discard_reply)) { crm_devel("Discarding reply"); return cib_ok; } else if(!(call_options & cib_sync_call)) { crm_devel("Async call, returning"); return cib->call_id - 1; } crm_devel("Waiting for a syncronous reply"); op_reply = msgfromIPC_noauth(native->command_channel); if (op_reply == NULL) { crm_err("No reply message"); return cib_reply_failed; } crm_devel("Syncronous reply recieved"); crm_log_message(LOG_MSG, op_reply); rc = cib_ok; /* Start processing the reply... */ if(ha_msg_value_int(op_reply, F_CIB_RC, &rc) != HA_OK) { rc = cib_return_code; } if(output_data == NULL) { /* do nothing more */ } else if(!(call_options & cib_discard_reply)) { *output_data = get_message_xml(op_reply, F_CIB_CALLDATA); if(*output_data == NULL) { crm_debug("No output in reply to \"%s\" command %d", op, cib->call_id - 1); } } crm_msg_del(op_reply); return rc; } gboolean cib_native_msgready(cib_t* cib) { IPC_Channel *ch = NULL; cib_native_opaque_t *native = NULL; if (cib == NULL) { crm_err("No CIB!"); return FALSE; } native = cib->variant_opaque; ch = cib_native_channel(cib); if (ch == NULL) { crm_err("No channel"); return FALSE; } if(native->command_channel->ops->is_message_pending( native->command_channel)) { crm_verbose("Message pending on command channel"); } if(native->callback_channel->ops->is_message_pending( native->callback_channel)) { crm_trace("Message pending on callback channel"); return TRUE; } crm_verbose("No message pending"); return FALSE; } int cib_native_rcvmsg(cib_t* cib, int blocking) { const char *type = NULL; struct ha_msg* msg = NULL; IPC_Channel *ch = cib_native_channel(cib); /* if it is not blocking mode and no message in the channel, return */ if (blocking == 0 && cib_native_msgready(cib) == FALSE) { crm_devel("No message ready and non-blocking..."); return 0; } else if (cib_native_msgready(cib) == FALSE) { crm_devel("Waiting for message from CIB service..."); ch->ops->waitin(ch); } /* get the message */ msg = msgfromIPC_noauth(ch); if (msg == NULL) { crm_warn("Received a NULL msg from CIB service."); return 0; } /* do callbacks */ type = cl_get_string(msg, F_TYPE); crm_trace("Activating %s callbacks...", type); if(safe_str_eq(type, T_CIB)) { cib_native_callback(cib, msg); } else if(safe_str_eq(type, T_CIB_NOTIFY)) { g_list_foreach(cib->notify_list, cib_native_notify, msg); } else { crm_err("Unknown message type: %s", type); } crm_msg_del(msg); return 1; } void cib_native_callback(cib_t *cib, struct ha_msg *msg) { int rc = 0; int call_id = 0; crm_data_t *output = NULL; if(cib->op_callback == NULL) { crm_devel("No OP callback set, ignoring reply"); return; } ha_msg_value_int(msg, F_CIB_CALLID, &call_id); ha_msg_value_int(msg, F_CIB_RC, &rc); output = get_message_xml(msg, F_CIB_CALLDATA); cib->op_callback(msg, call_id, rc, output); crm_trace("OP callback activated."); } void cib_native_notify(gpointer data, gpointer user_data) { struct ha_msg *msg = user_data; cib_notify_client_t *entry = data; const char *event = NULL; if(msg == NULL) { crm_warn("Skipping callback - NULL message"); return; } event = cl_get_string(msg, F_SUBTYPE); if(entry == NULL) { crm_warn("Skipping callback - NULL callback client"); return; } else if(entry->callback == NULL) { crm_warn("Skipping callback - NULL callback"); return; } else if(safe_str_neq(entry->event, event)) { crm_trace("Skipping callback - event mismatch %p/%s vs. %s", entry, entry->event, event); return; } crm_trace("Invoking callback for %p/%s event...", entry, event); entry->callback(event, msg); crm_trace("Callback invoked..."); } gboolean cib_native_dispatch(IPC_Channel *channel, gpointer user_data) { int lpc = 0; cib_t *cib = user_data; crm_devel("Received callback"); if(user_data == NULL){ crm_err("user_data field must contain the CIB struct"); return FALSE; } while(cib_native_msgready(cib)) { lpc++; /* invoke the callbacks but dont block */ if(cib_native_rcvmsg(cib, 0) < 1) { break; } } crm_devel("%d CIB messages dispatched", lpc); if (channel && (channel->ch_status == IPC_DISCONNECT)) { crm_crit("Lost connection to the CIB service."); return FALSE; } return TRUE; } int cib_native_set_connection_dnotify( cib_t *cib, void (*dnotify)(gpointer user_data)) { cib_native_opaque_t *native = NULL; if (cib == NULL) { crm_err("No CIB!"); return FALSE; } native = cib->variant_opaque; if(dnotify == NULL) { crm_warn("Setting dnotify back to default value"); set_IPC_Channel_dnotify(native->callback_source, default_ipc_connection_destroy); } else { crm_devel("Setting dnotify"); set_IPC_Channel_dnotify(native->callback_source, dnotify); } return cib_ok; }