diff --git a/attrd/commands.c b/attrd/commands.c index ae01dcca80..8e460dcbd4 100644 --- a/attrd/commands.c +++ b/attrd/commands.c @@ -1,506 +1,508 @@ /* * Copyright (C) 2013 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include cib_t *the_cib = NULL; GHashTable *attributes = NULL; typedef struct attribute_s { char *uuid; /* TODO: Remove if at all possible */ char *id; char *set; GHashTable *values; int timeout; bool changed; bool updating; mainloop_timer_t *timer; char *user; } attribute_t; typedef struct attribute_value_s { char *current; char *requested; char *stored; } attribute_value_t; void write_attribute(attribute_t *a); void write_attributes(bool all); void attrd_peer_update(crm_node_t *peer, xmlNode *xml); void attrd_peer_sync(crm_node_t *peer, xmlNode *xml); bool build_update_element(xmlNode *parent, attribute_t *a, const char *node_uuid, const char *attr_value); static gboolean attribute_timer_cb(gpointer user_data) { if(election_state(writer) == election_won) { write_attribute(user_data); } return FALSE; } static void free_attribute_value(gpointer data) { attribute_value_t *v = data; free(v->current); free(v->requested); free(v->stored); free(v); } void free_attribute(gpointer data) { attribute_t *a = data; if(a) { free(a->id); free(a->set); free(a->uuid); free(a->user); mainloop_timer_del(a->timer); g_hash_table_destroy(a->values); free(a); } } xmlNode * build_attribute_xml( xmlNode *parent, const char *name, const char *set, const char *uuid, unsigned int timeout, const char *user, const char *peer, const char *value) { xmlNode *xml = create_xml_node(parent, __FUNCTION__); crm_xml_add(xml, F_ATTRD_ATTRIBUTE, name); crm_xml_add(xml, F_ATTRD_SET, set); crm_xml_add(xml, F_ATTRD_KEY, uuid); crm_xml_add(xml, F_ATTRD_USER, user); crm_xml_add(xml, F_ATTRD_HOST, peer); crm_xml_add(xml, F_ATTRD_VALUE, value); crm_xml_add_int(xml, F_ATTRD_DAMPEN, timeout); return xml; } static attribute_t * create_attribute(xmlNode *xml) { attribute_t *a = calloc(1, sizeof(attribute_t)); a->id = crm_element_value_copy(xml, F_ATTRD_ATTRIBUTE); a->set = crm_element_value_copy(xml, F_ATTRD_SET); a->uuid = crm_element_value_copy(xml, F_ATTRD_KEY); a->values = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, free_attribute_value); #if ENABLE_ACL crm_trace("Performing all %s operations as user '%s'", a->id, a->user); a->user = crm_element_value_copy(xml, F_ATTRD_USER); #endif crm_element_value_int(xml, F_ATTRD_DAMPEN, &a->timeout); if(a->timeout > 0) { a->timer = mainloop_timer_add(strdup(a->id), a->timeout, FALSE, attribute_timer_cb, a); } g_hash_table_replace(attributes, a->id, a); return a; } void attrd_client_message(crm_client_t *client, xmlNode *xml) { static int plus_plus_len = 5; const char *op = crm_element_value(xml, F_ATTRD_TASK); if(safe_str_eq(op, "update")) { attribute_t *a = NULL; attribute_value_t *v = NULL; char *key = crm_element_value_copy(xml, F_ATTRD_KEY); char *set = crm_element_value_copy(xml, F_ATTRD_SET); char *host = crm_element_value_copy(xml, F_ATTRD_HOST); const char *attr = crm_element_value(xml, F_ATTRD_ATTRIBUTE); const char *value = crm_element_value(xml, F_ATTRD_VALUE); a = g_hash_table_lookup(attributes, attr); if(host == NULL) { host = cluster->uname; crm_xml_add(xml, F_ATTRD_HOST, host); } if (set == NULL) { if(a == NULL) { set = g_strdup_printf("%s-%s", XML_CIB_TAG_STATUS, host); } else if(set == NULL) { set = a->set; } crm_xml_add(xml, F_ATTRD_SET, a->set); } if (key == NULL) { if(a == NULL) { int lpc = 0; key = g_strdup_printf("%s-%s", set, attr); /* Minimal attempt at sanitizing automatic IDs */ for (lpc = 0; key[lpc] != 0; lpc++) { switch (key[lpc]) { case ':': key[lpc] = '.'; } } } else if(key == NULL) { key = a->uuid; } crm_xml_add(xml, F_ATTRD_KEY, key); } if (value) { int offset = 1; int int_value = 0; int value_len = strlen(value); if (value_len < (plus_plus_len + 2) || value[plus_plus_len] != '+' || (value[plus_plus_len + 1] != '+' && value[plus_plus_len + 1] != '=')) { goto send; } if(a) { v = g_hash_table_lookup(a->values, host); } if(v) { int_value = char2score(v->current); } if (value[plus_plus_len + 1] != '+') { const char *offset_s = value + (plus_plus_len + 2); offset = char2score(offset_s); } int_value += offset; if (int_value > INFINITY) { int_value = INFINITY; } crm_info("Expanded %s=%s to %d", attr, value, int_value); crm_xml_add_int(xml, F_ATTRD_VALUE, int_value); } send: crm_info("Broadcasting %s[%s] = %s%s", host, attr, value, election_state(writer) == election_won?" (writer)":""); crm_xml_add_int(xml, F_ATTRD_WRITER, election_state(writer)); send_cluster_message(NULL, crm_msg_attrd, xml, TRUE); free(key); free(set); free(host); } } void attrd_peer_message(crm_node_t *peer, xmlNode *xml) { const char *op = crm_element_value(xml, F_ATTRD_TASK); const char *election_op = crm_element_value(xml, F_CRM_TASK); if(election_op) { election_count_vote(writer, xml, TRUE); return; } if(election_state(writer) == election_won && safe_str_neq(peer->uname, cluster->uname)) { int peer_state = 0; crm_element_value_int(xml, F_ATTRD_WRITER, &peer_state); if(peer_state == election_won) { crm_notice("Detected another attribute writer: %s", peer->uname); election_vote(writer); } } if(safe_str_eq(op, "update")) { attrd_peer_update(peer, xml); } else if(safe_str_eq(op, "sync")) { attrd_peer_sync(peer, xml); } else if(safe_str_eq(op, "sync-response")) { xmlNode *child = NULL; crm_debug("Processing %s from %s", op, peer->uname); for (child = __xml_first_child(xml); child != NULL; child = __xml_next(child)) { attrd_peer_update(peer, child); } } } void attrd_peer_sync(crm_node_t *peer, xmlNode *xml) { GHashTableIter aIter; GHashTableIter vIter; const char *host = NULL; attribute_t *a = NULL; attribute_value_t *v = NULL; xmlNode *sync = create_xml_node(NULL, __FUNCTION__); crm_xml_add(sync, F_ATTRD_TASK, "sync-response"); g_hash_table_iter_init(&aIter, attributes); while (g_hash_table_iter_next(&aIter, NULL, (gpointer *) & a)) { g_hash_table_iter_init(&vIter, a->values); while (g_hash_table_iter_next(&vIter, (gpointer *) & host, (gpointer *) & v)) { crm_debug("Syncing %s[%s] = %s to %s", a->id, host, v->current, peer->uname); build_attribute_xml(sync, a->id, a->set, a->uuid, a->timeout, a->user, host, v->current); } } crm_debug("Syncing values to %s", peer?peer->uname:"everyone"); crm_xml_add_int(sync, F_ATTRD_WRITER, election_state(writer)); send_cluster_message(peer, crm_msg_attrd, sync, TRUE); free_xml(sync); } void attrd_peer_update(crm_node_t *peer, xmlNode *xml) { - bool changed = FALSE; attribute_value_t *v = NULL; const char *host = crm_element_value(xml, F_ATTRD_HOST); const char *attr = crm_element_value(xml, F_ATTRD_ATTRIBUTE); const char *value = crm_element_value(xml, F_ATTRD_VALUE); attribute_t *a = g_hash_table_lookup(attributes, attr); if(a == NULL) { a = create_attribute(xml); } v = g_hash_table_lookup(a->values, host); if(v == NULL) { crm_trace("Setting %s[%s] to %s from %s", host, attr, value, peer->uname); v = calloc(1, sizeof(attribute_value_t)); v->current = strdup(value); - changed = TRUE; + a->changed = TRUE; } else { crm_trace("Setting %s[%s]: %s -> %s from %s", host, attr, v->current, value, peer->uname); } if(safe_str_neq(v->current, value)) { free(v->current); v->current = strdup(value); - changed = TRUE; + a->changed = TRUE; } - if(changed && a->timer) { + if(a->changed && a->timer) { mainloop_timer_start(a->timer); - } else if(changed && election_state(writer) == election_won) { + } else if(a->changed && election_state(writer) == election_won) { write_attribute(a); } } gboolean attrd_election_cb(gpointer user_data) { attrd_peer_sync(NULL, NULL); return FALSE; } void attrd_peer_change_cb(enum crm_status_type kind, crm_node_t *peer, const void *data) { if(election_state(writer) == election_won && kind == crm_status_nstate && safe_str_eq(peer->state, CRM_NODE_MEMBER)) { attrd_peer_sync(peer, NULL); } } static void attrd_cib_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void *user_data) { int level = LOG_ERR; GHashTableIter iter; const char *key = NULL; attribute_value_t *v = NULL; char *name = user_data; attribute_t *a = g_hash_table_lookup(attributes, name); if(a == NULL) { crm_info("Attribute %s no longer exists", name); goto done; } if (rc == pcmk_ok && call_id < 0) { rc = call_id; } a->updating = FALSE; switch (rc) { case pcmk_ok: level = LOG_INFO; break; case -pcmk_err_diff_failed: /* When an attr changes while the CIB is syncing */ case -ETIME: /* When an attr changes while there is a DC election */ case -ENXIO: /* When an attr changes while the CIB is syncing a * newer config from a node that just came up */ level = LOG_WARNING; break; } do_crm_log(level, "Update %d for %s: %s (%d)", call_id, name, pcmk_strerror(rc), rc); g_hash_table_iter_init(&iter, a->values); while (g_hash_table_iter_next(&iter, (gpointer *) & key, (gpointer *) & v)) { crm_info("Update %d for %s[%s]=%s: %s (%d)", call_id, a->id, v->requested, pcmk_strerror(rc), rc); if(rc == pcmk_ok) { free(v->stored); v->stored = v->requested; v->requested = NULL; } else { free(v->requested); v->requested = NULL; + a->changed = TRUE; /* Attempt write out again */ } } done: free(name); if(a && a->changed && election_state(writer) == election_won) { write_attribute(a); } } void write_attributes(bool all) { GHashTableIter iter; attribute_t *a = NULL; g_hash_table_iter_init(&iter, attributes); while (g_hash_table_iter_next(&iter, NULL, (gpointer *) & a)) { if(all || a->changed) { write_attribute(a); + } else { + crm_debug("Skipping unchanged attribute %s", a->id); } } } void write_attribute(attribute_t *a) { int rc = pcmk_ok; xmlNode *xml_top = NULL; const char *peer = NULL; attribute_value_t *v = NULL; GHashTableIter iter; if (a == NULL) { return; } else if (the_cib == NULL) { crm_info("Delaying storing %s: cib not connected", a->id); return; } else if(a->updating) { crm_info("Delaying storing %s: update in progress", a->id); return; } a->changed = FALSE; a->updating = TRUE; xml_top = create_xml_node(NULL, XML_CIB_TAG_STATUS); g_hash_table_iter_init(&iter, a->values); while (g_hash_table_iter_next(&iter, (gpointer *) & peer, (gpointer *) & v)) { crm_info("Update for %s[%s]=%s: %s (%d)", a->id, peer, v->requested, pcmk_strerror(rc), rc); if(v->current) { free(v->requested); v->requested = strdup(v->current); } else { free(v->requested); v->requested = NULL; } build_update_element(xml_top, a, peer, v->requested); } crm_log_xml_trace(xml_top, "update_attr"); rc = cib_internal_op(the_cib, CIB_OP_MODIFY, NULL, XML_CIB_TAG_STATUS, xml_top, NULL, cib_quorum_override, a->user); crm_debug("Sent update %d for %s, id=%s, set=%s", rc, a->id, a->uuid ? a->uuid : "", a->set); g_hash_table_iter_init(&iter, a->values); while (g_hash_table_iter_next(&iter, (gpointer *) & peer, (gpointer *) & v)) { crm_debug("Update %d for %s[%s]=%s", rc, a->id, peer, v->requested); } the_cib->cmds->register_callback(the_cib, rc, 120, FALSE, strdup(a->id), "attrd_cib_callback", attrd_cib_callback); } bool build_update_element(xmlNode *parent, attribute_t *a, const char *node_uuid, const char *value) { xmlNode *xml_obj = NULL; xml_obj = create_xml_node(parent, XML_CIB_TAG_STATE); crm_xml_add(xml_obj, XML_ATTR_ID, node_uuid); xml_obj = create_xml_node(xml_obj, XML_TAG_TRANSIENT_NODEATTRS); crm_xml_add(xml_obj, XML_ATTR_ID, node_uuid); xml_obj = create_xml_node(xml_obj, XML_TAG_ATTR_SETS); crm_xml_add(xml_obj, XML_ATTR_ID, a->set); xml_obj = create_xml_node(xml_obj, XML_CIB_TAG_NVPAIR); crm_xml_add(xml_obj, XML_ATTR_ID, a->uuid); crm_xml_add(xml_obj, XML_NVPAIR_ATTR_NAME, a->id); crm_xml_add(xml_obj, XML_NVPAIR_ATTR_VALUE, value); return TRUE; } diff --git a/attrd/internal.h b/attrd/internal.h index c63e22deb4..0e1aaa5ca0 100644 --- a/attrd/internal.h +++ b/attrd/internal.h @@ -1,34 +1,36 @@ /* * Copyright (C) 2013 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ +cib_t *the_cib; GMainLoop *mloop; bool shutting_down; crm_cluster_t *cluster; GHashTable *attributes; election_t *writer; +void write_attributes(bool all); void attrd_peer_message(crm_node_t *client, xmlNode *msg); void attrd_client_message(crm_client_t *client, xmlNode *msg); void free_attribute(gpointer data); gboolean attrd_election_cb(gpointer user_data); void attrd_peer_change_cb(enum crm_status_type type, crm_node_t *peer, const void *data); xmlNode *build_attribute_xml( xmlNode *parent, const char *name, const char *set, const char *uuid, unsigned int timeout, const char *user, const char *peer, const char *value); diff --git a/attrd/main.c b/attrd/main.c index b591954651..8750806820 100644 --- a/attrd/main.c +++ b/attrd/main.c @@ -1,263 +1,353 @@ -/* +/* * Copyright (C) 2013 Andrew Beekhof - * + * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. - * + * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. - * + * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include +cib_t *the_cib = NULL; GMainLoop *mloop = NULL; bool shutting_down = FALSE; crm_cluster_t *cluster = NULL; election_t *writer = NULL; static void attrd_shutdown(int nsig) { shutting_down = TRUE; crm_info("Shutting down"); if (mloop != NULL && g_main_is_running(mloop)) { g_main_quit(mloop); } else { crm_exit(pcmk_ok); } } static void attrd_cpg_dispatch(cpg_handle_t handle, const struct cpg_name *groupName, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { uint32_t kind = 0; xmlNode *xml = NULL; const char *from = NULL; char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from); if(data == NULL) { return; } if (kind == crm_class_cluster) { xml = string2xml(data); } if (xml == NULL) { crm_err("Bad message of class %d received from %s[%u]: '%.120s'", kind, from, nodeid, data); } else { crm_node_t *peer = crm_get_peer(nodeid, from); attrd_peer_message(peer, xml); } free_xml(xml); free(data); } static void attrd_cpg_destroy(gpointer unused) { if (shutting_down) { crm_info("Corosync disconnection complete"); } else { crm_crit("Lost connection to Corosync service!"); attrd_shutdown(0); } } +static void +attrd_cib_replaced_cb(const char *event, xmlNode * msg) +{ + crm_notice("Updating all attributes after %s event", event); + if(election_state(writer) == election_won) { + write_attributes(TRUE); + } +} + +static void +attrd_cib_destroy_cb(gpointer user_data) +{ + cib_t *conn = user_data; + + conn->cmds->signoff(conn); /* Ensure IPC is cleaned up */ + + if (shutting_down) { + crm_info("Connection disconnection complete"); + + } else { + /* eventually this should trigger a reconnect, not a shutdown */ + crm_err("Lost connection to CIB service!"); + attrd_shutdown(0); + } + + return; +} + +static cib_t * +attrd_cib_connect(int max_retry) +{ + int rc = -ENOTCONN; + static int attempts = 0; + cib_t *connection = cib_new(); + + do { + if(attempts > 0) { + sleep(attempts); + } + + attempts++; + crm_debug("CIB signon attempt %d", attempts); + rc = connection->cmds->signon(connection, T_ATTRD, cib_command); + + } while(rc != pcmk_ok && attempts < max_retry); + + if (rc != pcmk_ok) { + crm_err("Signon to CIB failed: %s (%d)", pcmk_strerror(rc), rc); + goto cleanup; + } + + crm_info("Connected to the CIB after %d attempts", attempts); + + rc = connection->cmds->set_connection_dnotify(connection, attrd_cib_destroy_cb); + if (rc != pcmk_ok) { + crm_err("Could not set disconnection callback"); + goto cleanup; + } + + rc = connection->cmds->add_notify_callback(connection, T_CIB_REPLACE_NOTIFY, attrd_cib_replaced_cb); + if(rc != pcmk_ok) { + crm_err("Could not set CIB notification callback"); + goto cleanup; + } + + return connection; + + cleanup: + if(connection) { + connection->cmds->signoff(connection); + cib_delete(connection); + } + return NULL; +} + static int32_t attrd_ipc_accept(qb_ipcs_connection_t * c, uid_t uid, gid_t gid) { crm_trace("Connection %p", c); if (shutting_down) { crm_info("Ignoring new client [%d] during shutdown", crm_ipcs_client_pid(c)); return -EPERM; } if (crm_client_new(c, uid, gid) == NULL) { return -EIO; } return 0; } static void attrd_ipc_created(qb_ipcs_connection_t * c) { crm_trace("Connection %p", c); } static int32_t attrd_ipc_dispatch(qb_ipcs_connection_t * c, void *data, size_t size) { uint32_t id = 0; uint32_t flags = 0; crm_client_t *client = crm_client_get(c); xmlNode *xml = crm_ipcs_recv(client, data, size, &id, &flags); if (flags & crm_ipc_client_response) { crm_trace("Ack'ing msg from %d (%p)", crm_ipcs_client_pid(c), c); crm_ipcs_send_ack(client, id, "ack", __FUNCTION__, __LINE__); } if (xml == NULL) { crm_debug("No msg from %d (%p)", crm_ipcs_client_pid(c), c); return 0; } #if ENABLE_ACL determine_request_user(client->user, xml, F_ATTRD_USER); #endif crm_trace("Processing msg from %d (%p)", crm_ipcs_client_pid(c), c); crm_log_xml_trace(xml, __PRETTY_FUNCTION__); attrd_client_message(client, xml); free_xml(xml); return 0; } /* Error code means? */ static int32_t attrd_ipc_closed(qb_ipcs_connection_t * c) { crm_client_t *client = crm_client_get(c); crm_trace("Connection %p", c); crm_client_destroy(client); return 0; } static void attrd_ipc_destroy(qb_ipcs_connection_t * c) { crm_trace("Connection %p", c); } struct qb_ipcs_service_handlers ipc_callbacks = { .connection_accept = attrd_ipc_accept, .connection_created = attrd_ipc_created, .msg_process = attrd_ipc_dispatch, .connection_closed = attrd_ipc_closed, .connection_destroyed = attrd_ipc_destroy }; /* *INDENT-OFF* */ static struct crm_option long_options[] = { /* Top-level Options */ {"help", 0, 0, '?', "\tThis text"}, {"verbose", 0, 0, 'V', "\tIncrease debug output"}, {0, 0, 0, 0} }; /* *INDENT-ON* */ int main(int argc, char **argv) { + int rc = pcmk_ok; int flag = 0; int index = 0; int argerr = 0; qb_ipcs_service_t *ipcs = NULL; mloop = g_main_new(FALSE); crm_log_init(T_ATTRD, LOG_NOTICE, TRUE, FALSE, argc, argv, FALSE); crm_set_options(NULL, "[options]", long_options, "Daemon for aggregating and atomically storing node attribute updates into the CIB"); mainloop_add_signal(SIGTERM, attrd_shutdown); while (1) { flag = crm_get_option(argc, argv, &index); if (flag == -1) break; switch (flag) { case 'V': crm_bump_log_level(argc, argv); break; case 'h': /* Help message */ crm_help(flag, EX_OK); break; default: ++argerr; break; } } if (optind > argc) { ++argerr; } if (argerr) { crm_help('?', EX_USAGE); } crm_info("Starting up"); attributes = g_hash_table_new_full(crm_str_hash, g_str_equal, NULL, free_attribute); cluster = malloc(sizeof(crm_cluster_t)); cluster->destroy = attrd_cpg_destroy; cluster->cpg.cpg_deliver_fn = attrd_cpg_dispatch; cluster->cpg.cpg_confchg_fn = pcmk_cpg_membership; crm_set_status_callback(attrd_peer_change_cb); if (crm_cluster_connect(cluster) == FALSE) { crm_err("Cluster connection failed"); + rc = DAEMON_RESPAWN_STOP; goto done; } - crm_info("Cluster connection active"); + writer = election_init(T_ATTRD, cluster->uname, 120, attrd_election_cb); attrd_ipc_server_init(&ipcs, &ipc_callbacks); - crm_info("Accepting attribute updates"); + + the_cib = attrd_cib_connect(10); + if (the_cib == NULL) { + rc = DAEMON_RESPAWN_STOP; + goto done; + } + + crm_info("CIB connection active"); g_main_run(mloop); done: crm_notice("Cleaning up before exit"); election_fini(writer); crm_client_disconnect_all(ipcs); qb_ipcs_destroy(ipcs); g_hash_table_destroy(attributes); - return crm_exit(pcmk_ok); + if (the_cib) { + the_cib->cmds->signoff(the_cib); + cib_delete(the_cib); + } + + return crm_exit(rc); }