diff --git a/src/config.c b/src/config.c index cff563d..46ec2b6 100644 --- a/src/config.c +++ b/src/config.c @@ -1,970 +1,971 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include #include #include #include #include #include #include #include #include #include #include "b_config.h" #include "booth.h" #include "config.h" #include "raft.h" #include "ticket.h" #include "log.h" static int ticket_size = 0; static int ticket_realloc(void) { const int added = 5; int had, want; void *p; had = booth_conf->ticket_allocated; want = had + added; p = realloc(booth_conf->ticket, sizeof(struct ticket_config) * want); if (!p) { log_error("can't alloc more tickets"); return -ENOMEM; } booth_conf->ticket = p; memset(booth_conf->ticket + had, 0, sizeof(struct ticket_config) * added); booth_conf->ticket_allocated = want; return 0; } int add_site(char *address, int type); int add_site(char *addr_string, int type) { int rv; struct booth_site *site; uLong nid; uint32_t mask; int i; rv = 1; if (booth_conf->site_count == MAX_NODES) { log_error("too many nodes"); goto out; } - if (strlen(addr_string)+1 >= sizeof(booth_conf->site[0].addr_string)) { + if (strnlen(addr_string, sizeof(booth_conf->site[0].addr_string)) + >= sizeof(booth_conf->site[0].addr_string)) { log_error("site address \"%s\" too long", addr_string); goto out; } site = booth_conf->site + booth_conf->site_count; site->family = AF_INET; site->type = type; /* Make site_id start at a non-zero point. * Perhaps use hash over string or address? */ - strcpy(site->addr_string, addr_string); + strncpy(site->addr_string, addr_string, sizeof(site->addr_string)); site->index = booth_conf->site_count; site->bitmask = 1 << booth_conf->site_count; /* Catch site overflow */ assert(site->bitmask); booth_conf->all_bits |= site->bitmask; if (type == SITE) booth_conf->sites_bits |= site->bitmask; site->tcp_fd = -1; booth_conf->site_count++; rv = 0; memset(&site->sa6, 0, sizeof(site->sa6)); nid = crc32(0L, NULL, 0); /* Using the ASCII representation in site->addr_string (both sizeof() * and strlen()) gives quite a lot of collisions; a brute-force run * from 0.0.0.0 to 24.0.0.0 gives ~4% collisions, and this tends to * increase even more. * Whether there'll be a collision in real-life, with 3 or 5 nodes, is * another question ... but for now get the ID from the binary * representation - that had *no* collisions up to 32.0.0.0. */ if (inet_pton(AF_INET, site->addr_string, &site->sa4.sin_addr) > 0) { site->family = AF_INET; site->sa4.sin_family = site->family; site->sa4.sin_port = htons(booth_conf->port); site->saddrlen = sizeof(site->sa4); site->addrlen = sizeof(site->sa4.sin_addr); site->site_id = crc32(nid, (void*)&site->sa4.sin_addr, site->addrlen); } else if (inet_pton(AF_INET6, site->addr_string, &site->sa6.sin6_addr) > 0) { site->family = AF_INET6; site->sa6.sin6_family = site->family; site->sa6.sin6_flowinfo = 0; site->sa6.sin6_port = htons(booth_conf->port); site->saddrlen = sizeof(site->sa6); site->addrlen = sizeof(site->sa6.sin6_addr); site->site_id = crc32(nid, (void*)&site->sa6.sin6_addr, site->addrlen); } else { log_error("Address string \"%s\" is bad", site->addr_string); rv = EINVAL; } /* Make sure we will never collide with NO_ONE, * or be negative (to get "get_local_id() < 0" working). */ mask = 1 << (sizeof(site->site_id)*8 -1); assert(NO_ONE & mask); site->site_id &= ~mask; /* Test for collisions with other sites */ for(i=0; iindex; i++) if (booth_conf->site[i].site_id == site->site_id) { log_error("Got a site-ID collision. Please file a bug on https://github.com/ClusterLabs/booth/issues/new, attaching the configuration file."); exit(1); } out: return rv; } inline static char *skip_while_in(const char *cp, int (*fn)(int), const char *allowed) { /* strchr() returns a pointer to the terminator if *cp == 0. */ while (*cp && (fn(*cp) || strchr(allowed, *cp))) cp++; /* discard "const" qualifier */ return (char*)cp; } inline static char *skip_while(char *cp, int (*fn)(int)) { while (fn(*cp)) cp++; return cp; } inline static char *skip_until(char *cp, char expected) { while (*cp && *cp != expected) cp++; return cp; } static inline int is_end_of_line(char *cp) { char c = *cp; return c == '\n' || c == 0 || c == '#'; } static int add_ticket(const char *name, struct ticket_config **tkp, const struct ticket_config *def) { int rv; struct ticket_config *tk; if (booth_conf->ticket_count == booth_conf->ticket_allocated) { rv = ticket_realloc(); if (rv < 0) return rv; } tk = booth_conf->ticket + booth_conf->ticket_count; booth_conf->ticket_count++; if (!check_max_len_valid(name, sizeof(tk->name))) { log_error("ticket name \"%s\" too long.", name); return -EINVAL; } if (find_ticket_by_name(name, NULL)) { log_error("ticket name \"%s\" used again.", name); return -EINVAL; } if (* skip_while_in(name, isalnum, "-/")) { log_error("ticket name \"%s\" invalid; only alphanumeric names.", name); return -EINVAL; } strcpy(tk->name, name); tk->timeout = def->timeout; tk->term_duration = def->term_duration; tk->retries = def->retries; memcpy(tk->weight, def->weight, sizeof(tk->weight)); if (tkp) *tkp = tk; return 0; } static int postproc_ticket(struct ticket_config *tk) { if (!tk) return 1; if (!tk->renewal_freq) { tk->renewal_freq = tk->term_duration/2; } if (tk->timeout*(tk->retries+1) >= tk->renewal_freq) { log_error("%s: total amount of time to " "retry sending packets cannot exceed " "renewal frequency " "(%d*(%d+1) >= %d)", tk->name, tk->timeout, tk->retries, tk->renewal_freq); return 0; } return 1; } /* returns number of weights, or -1 on bad input. */ static int parse_weights(const char *input, int weights[MAX_NODES]) { int i, v; char *cp; for(i=0; i= MAX_ARGS) { log_error("too many arguments for the acquire-handler"); free(tk_test.path); return -1; } tk_test.argv[i++] = p; } while (p); return 0; } struct toktab grant_type[] = { { "auto", GRANT_AUTO}, { "manual", GRANT_MANUAL}, { NULL, 0}, }; struct toktab attr_op[] = { {"eq", ATTR_OP_EQ}, {"ne", ATTR_OP_NE}, {NULL, 0}, }; static int lookup_tokval(char *key, struct toktab *tab) { struct toktab *tp; for (tp = tab; tp->str; tp++) { if (!strcmp(tp->str, key)) return tp->val; } return 0; } /* attribute prerequisite */ static int parse_attr_prereq(char *val, struct ticket_config *tk) { struct attr_prereq *ap = NULL; char *p; ap = (struct attr_prereq *)calloc(1, sizeof(struct attr_prereq)); if (!ap) { log_error("out of memory"); return -1; } p = strtok(val, " \t"); if (!p) { log_error("not enough arguments to attr-prereq"); goto err_out; } ap->grant_type = lookup_tokval(p, grant_type); if (!ap->grant_type) { log_error("%s is not a grant type", p); goto err_out; } p = strtok(NULL, " \t"); if (!p) { log_error("not enough arguments to attr-prereq"); goto err_out; } if (!(ap->attr_name = strdup(p))) { log_error("out of memory"); goto err_out; } p = strtok(NULL, " \t"); if (!p) { log_error("not enough arguments to attr-prereq"); goto err_out; } ap->op = lookup_tokval(p, attr_op); if (!ap->op) { log_error("%s is not an attribute operation", p); goto err_out; } p = strtok(NULL, " \t"); if (!p) { log_error("not enough arguments to attr-prereq"); goto err_out; } if (!(ap->attr_val = strdup(p))) { log_error("out of memory"); goto err_out; } tk->attr_prereqs = g_list_append(tk->attr_prereqs, ap); if (!tk->attr_prereqs) { log_error("out of memory"); goto err_out; } return 0; err_out: if (ap) { if (ap->attr_val) free(ap->attr_val); if (ap->attr_name) free(ap->attr_name); free(ap); } return -1; } extern int poll_timeout; int read_config(const char *path, int type) { char line[1024]; FILE *fp; char *s, *key, *val, *end_of_key; const char *error; char *cp, *cp2; int i; int lineno = 0; int got_transport = 0; int min_timeout = 0; struct ticket_config defaults = { { 0 } }; struct ticket_config *current_tk = NULL; fp = fopen(path, "r"); if (!fp) { log_error("failed to open %s: %s", path, strerror(errno)); return -1; } booth_conf = malloc(sizeof(struct booth_config) + TICKET_ALLOC * sizeof(struct ticket_config)); if (!booth_conf) { log_error("failed to alloc memory for booth config"); return -ENOMEM; } memset(booth_conf, 0, sizeof(struct booth_config) + TICKET_ALLOC * sizeof(struct ticket_config)); ticket_size = TICKET_ALLOC; booth_conf->proto = UDP; booth_conf->port = BOOTH_DEFAULT_PORT; booth_conf->maxtimeskew = BOOTH_DEFAULT_MAX_TIME_SKEW; booth_conf->authkey[0] = '\0'; /* Provide safe defaults. -1 is reserved, though. */ booth_conf->uid = -2; booth_conf->gid = -2; strcpy(booth_conf->site_user, "hacluster"); strcpy(booth_conf->site_group, "haclient"); strcpy(booth_conf->arb_user, "nobody"); strcpy(booth_conf->arb_group, "nobody"); parse_weights("", defaults.weight); defaults.clu_test.path = NULL; defaults.clu_test.pid = 0; defaults.clu_test.status = 0; defaults.clu_test.progstate = EXTPROG_IDLE; defaults.term_duration = DEFAULT_TICKET_EXPIRY; defaults.timeout = DEFAULT_TICKET_TIMEOUT; defaults.retries = DEFAULT_RETRIES; defaults.acquire_after = 0; error = ""; log_debug("reading config file %s", path); while (fgets(line, sizeof(line), fp)) { lineno++; s = skip_while(line, isspace); if (is_end_of_line(s) || *s == '#') continue; key = s; /* Key */ end_of_key = skip_while_in(key, isalnum, "-_"); if (end_of_key == key) { error = "No key"; goto err; } if (!*end_of_key) goto exp_equal; /* whitespace, and something else but nothing more? */ s = skip_while(end_of_key, isspace); if (*s != '=') { exp_equal: error = "Expected '=' after key"; goto err; } s++; /* It's my buffer, and I terminate if I want to. */ /* But not earlier than that, because we had to check for = */ *end_of_key = 0; /* Value tokenizing */ s = skip_while(s, isspace); switch (*s) { case '"': case '\'': val = s+1; s = skip_until(val, *s); /* Terminate value */ if (!*s) { error = "Unterminated quoted string"; goto err; } /* Remove and skip quote */ *s = 0; s++; if (*(s = skip_while(s, isspace)) && *s != '#') { error = "Surplus data after value"; goto err; } *s = 0; break; case 0: no_value: error = "No value"; goto err; break; default: val = s; /* Rest of line. */ i = strlen(s); /* i > 0 because of "case 0" above. */ while (i > 0 && isspace(s[i-1])) i--; s += i; *s = 0; } if (val == s) goto no_value; if (strlen(key) > BOOTH_NAME_LEN || strlen(val) > BOOTH_NAME_LEN) { error = "key/value too long"; goto err; } if (strcmp(key, "transport") == 0) { if (got_transport) { error = "config file has multiple transport lines"; goto err; } if (strcasecmp(val, "UDP") == 0) booth_conf->proto = UDP; else if (strcasecmp(val, "SCTP") == 0) booth_conf->proto = SCTP; else { error = "invalid transport protocol"; goto err; } got_transport = 1; continue; } if (strcmp(key, "port") == 0) { booth_conf->port = atoi(val); continue; } if (strcmp(key, "name") == 0) { safe_copy(booth_conf->name, val, BOOTH_NAME_LEN, "name"); continue; } #if HAVE_LIBGCRYPT || HAVE_LIBMHASH if (strcmp(key, "authfile") == 0) { safe_copy(booth_conf->authfile, val, BOOTH_PATH_LEN, "authfile"); continue; } if (strcmp(key, "maxtimeskew") == 0) { booth_conf->maxtimeskew = atoi(val); continue; } #endif if (strcmp(key, "site") == 0) { if (add_site(val, SITE)) goto out; continue; } if (strcmp(key, "arbitrator") == 0) { if (add_site(val, ARBITRATOR)) goto out; continue; } if (strcmp(key, "site-user") == 0) { safe_copy(booth_conf->site_user, optarg, BOOTH_NAME_LEN, "site-user"); continue; } if (strcmp(key, "site-group") == 0) { safe_copy(booth_conf->site_group, optarg, BOOTH_NAME_LEN, "site-group"); continue; } if (strcmp(key, "arbitrator-user") == 0) { safe_copy(booth_conf->arb_user, optarg, BOOTH_NAME_LEN, "arbitrator-user"); continue; } if (strcmp(key, "arbitrator-group") == 0) { safe_copy(booth_conf->arb_group, optarg, BOOTH_NAME_LEN, "arbitrator-group"); continue; } if (strcmp(key, "debug") == 0) { if (type != CLIENT && type != GEOSTORE) debug_level = max(debug_level, atoi(val)); continue; } if (strcmp(key, "ticket") == 0) { if (current_tk && strcmp(current_tk->name, "__defaults__")) { if (!postproc_ticket(current_tk)) { goto out; } } if (!strcmp(val, "__defaults__")) { current_tk = &defaults; } else if (add_ticket(val, ¤t_tk, &defaults)) { goto out; } continue; } /* current_tk must be allocated at this point, otherwise * we don't know to which ticket the key refers */ if (!current_tk) { error = "Unexpected keyword"; goto err; } if (strcmp(key, "expire") == 0) { current_tk->term_duration = read_time(val); if (current_tk->term_duration <= 0) { error = "Expected time >0 for expire"; goto err; } continue; } if (strcmp(key, "timeout") == 0) { current_tk->timeout = read_time(val); if (current_tk->timeout <= 0) { error = "Expected time >0 for timeout"; goto err; } if (!min_timeout) { min_timeout = current_tk->timeout; } else { min_timeout = min(min_timeout, current_tk->timeout); } continue; } if (strcmp(key, "retries") == 0) { current_tk->retries = strtol(val, &s, 0); if (*s || s == val || current_tk->retries<3 || current_tk->retries > 100) { error = "Expected plain integer value in the range [3, 100] for retries"; goto err; } continue; } if (strcmp(key, "renewal-freq") == 0) { current_tk->renewal_freq = read_time(val); if (current_tk->renewal_freq <= 0) { error = "Expected time >0 for renewal-freq"; goto err; } continue; } if (strcmp(key, "acquire-after") == 0) { current_tk->acquire_after = read_time(val); if (current_tk->acquire_after < 0) { error = "Expected time >=0 for acquire-after"; goto err; } continue; } if (strcmp(key, "before-acquire-handler") == 0) { if (parse_extprog(val, current_tk)) { goto err; } continue; } if (strcmp(key, "attr-prereq") == 0) { if (parse_attr_prereq(val, current_tk)) { goto err; } continue; } if (strcmp(key, "weights") == 0) { if (parse_weights(val, current_tk->weight) < 0) goto out; continue; } error = "Unknown keyword"; goto err; } if ((booth_conf->site_count % 2) == 0) { log_warn("Odd number of nodes is strongly recommended!"); } /* Default: make config name match config filename. */ if (!booth_conf->name[0]) { cp = strrchr(path, '/'); cp = cp ? cp+1 : (char *)path; cp2 = strrchr(cp, '.'); if (!cp2) cp2 = cp + strlen(cp); if (cp2-cp >= BOOTH_NAME_LEN) { log_error("booth config file name too long"); goto err; } strncpy(booth_conf->name, cp, cp2-cp); *(booth_conf->name+(cp2-cp)) = '\0'; } if (!postproc_ticket(current_tk)) { goto out; } poll_timeout = min(POLL_TIMEOUT, min_timeout/10); if (!poll_timeout) poll_timeout = POLL_TIMEOUT; return 0; err: out: log_error("%s in config file line %d", error, lineno); free(booth_conf); booth_conf = NULL; return -1; } int check_config(int type) { struct passwd *pw; struct group *gr; char *cp, *input; if (!booth_conf) return -1; input = (type == ARBITRATOR) ? booth_conf->arb_user : booth_conf->site_user; if (!*input) goto u_inval; if (isdigit(input[0])) { booth_conf->uid = strtol(input, &cp, 0); if (*cp != 0) { u_inval: log_error("User \"%s\" cannot be resolved into a UID.", input); return ENOENT; } } else { pw = getpwnam(input); if (!pw) goto u_inval; booth_conf->uid = pw->pw_uid; } input = (type == ARBITRATOR) ? booth_conf->arb_group : booth_conf->site_group; if (!*input) goto g_inval; if (isdigit(input[0])) { booth_conf->gid = strtol(input, &cp, 0); if (*cp != 0) { g_inval: log_error("Group \"%s\" cannot be resolved into a UID.", input); return ENOENT; } } else { gr = getgrnam(input); if (!gr) goto g_inval; booth_conf->gid = gr->gr_gid; } return 0; } static int get_other_site(struct booth_site **node) { struct booth_site *n; int i; *node = NULL; if (!booth_conf) return 0; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if (n != local && n->type == SITE) { if (!*node) { *node = n; } else { return 0; } } } return !*node ? 0 : 1; } int find_site_by_name(char *site, struct booth_site **node, int any_type) { struct booth_site *n; int i; if (!booth_conf) return 0; if (!strcmp(site, OTHER_SITE)) return get_other_site(node); for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if ((n->type == SITE || any_type) && - strcmp(n->addr_string, site) == 0) { + strncmp(n->addr_string, site, sizeof(n->addr_string)) == 0) { *node = n; return 1; } } return 0; } int find_site_by_id(uint32_t site_id, struct booth_site **node) { struct booth_site *n; int i; if (site_id == NO_ONE) { *node = no_leader; return 1; } if (!booth_conf) return 0; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if (n->site_id == site_id) { *node = n; return 1; } } return 0; } const char *type_to_string(int type) { switch (type) { case ARBITRATOR: return "arbitrator"; case SITE: return "site"; case CLIENT: return "client"; case GEOSTORE: return "attr"; } return "??invalid-type??"; } diff --git a/src/main.c b/src/main.c index 9e22929..3ca0db2 100644 --- a/src/main.c +++ b/src/main.c @@ -1,1588 +1,1590 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "b_config.h" #include "log.h" #include "booth.h" #include "config.h" #include "transport.h" #include "inline-fn.h" #include "pacemaker.h" #include "ticket.h" #include "request.h" #include "attr.h" #include "handler.h" #define RELEASE_VERSION "1.0" #define RELEASE_STR RELEASE_VERSION " (build " BOOTH_BUILD_VERSION ")" #define CLIENT_NALLOC 32 int daemonize = 0; int enable_stderr = 0; timetype start_time; /** Structure for "clients". * Filehandles with incoming data get registered here (and in pollfds), * along with their callbacks. * Because these can be reallocated with every new fd, addressing * happens _only_ by their numeric index. */ struct client *clients = NULL; struct pollfd *pollfds = NULL; static int client_maxi; static int client_size = 0; static const struct booth_site _no_leader = { .addr_string = "none", .site_id = NO_ONE, }; struct booth_site *no_leader = (struct booth_site*)& _no_leader; typedef enum { BOOTHD_STARTED=0, BOOTHD_STARTING } BOOTH_DAEMON_STATE; int poll_timeout; struct booth_config *booth_conf; struct command_line cl; static void client_alloc(void) { int i; if (!clients) { clients = malloc(CLIENT_NALLOC * sizeof(struct client)); pollfds = malloc(CLIENT_NALLOC * sizeof(struct pollfd)); } else { clients = realloc(clients, (client_size + CLIENT_NALLOC) * sizeof(struct client)); pollfds = realloc(pollfds, (client_size + CLIENT_NALLOC) * sizeof(struct pollfd)); } if (!clients || !pollfds) { log_error("can't alloc for client array"); exit(1); } for (i = client_size; i < client_size + CLIENT_NALLOC; i++) { clients[i].workfn = NULL; clients[i].deadfn = NULL; clients[i].fd = -1; pollfds[i].fd = -1; pollfds[i].revents = 0; } client_size += CLIENT_NALLOC; } static void client_dead(int ci) { struct client *c = clients + ci; if (c->fd != -1) { log_debug("removing client %d", c->fd); close(c->fd); } c->fd = -1; c->workfn = NULL; if (c->msg) { free(c->msg); c->msg = NULL; c->offset = 0; } pollfds[ci].fd = -1; } int client_add(int fd, const struct booth_transport *tpt, void (*workfn)(int ci), void (*deadfn)(int ci)) { int i; struct client *c; if (client_size - 1 <= client_maxi ) { client_alloc(); } for (i = 0; i < client_size; i++) { c = clients + i; if (c->fd != -1) continue; c->workfn = workfn; if (deadfn) c->deadfn = deadfn; else c->deadfn = client_dead; c->transport = tpt; c->fd = fd; c->msg = NULL; c->offset = 0; pollfds[i].fd = fd; pollfds[i].events = POLLIN; if (i > client_maxi) client_maxi = i; return i; } assert(!"no client"); } int find_client_by_fd(int fd) { int i; if (fd < 0) return -1; for (i = 0; i <= client_maxi; i++) { if (clients[i].fd == fd) return i; } return -1; } static int format_peers(char **pdata, unsigned int *len) { struct booth_site *s; char *data, *cp; char time_str[64]; int i, alloc; *pdata = NULL; *len = 0; alloc = booth_conf->site_count * (BOOTH_NAME_LEN + 256); data = malloc(alloc); if (!data) return -ENOMEM; cp = data; foreach_node(i, s) { if (s == local) continue; strftime(time_str, sizeof(time_str), "%F %T", localtime(&s->last_recv)); cp += snprintf(cp, alloc - (cp - data), "%-12s %s, last recv: %s\n", type_to_string(s->type), s->addr_string, time_str); cp += snprintf(cp, alloc - (cp - data), "\tSent pkts:%u error:%u resends:%u\n", s->sent_cnt, s->sent_err_cnt, s->resend_cnt); cp += snprintf(cp, alloc - (cp - data), "\tRecv pkts:%u error:%u authfail:%u invalid:%u\n\n", s->recv_cnt, s->recv_err_cnt, s->sec_cnt, s->invalid_cnt); if (alloc - (cp - data) <= 0) return -ENOMEM; } *pdata = data; *len = cp - data; return 0; } void list_peers(int fd) { char *data; int olen; struct boothc_hdr_msg hdr; if (format_peers(&data, &olen) < 0) goto out; init_header(&hdr.header, CL_LIST, 0, 0, RLT_SUCCESS, 0, sizeof(hdr) + olen); (void)send_header_plus(fd, &hdr, data, olen); out: if (data) free(data); } /* trim trailing spaces if the key is ascii */ static void trim_key() { char *p; int i; for (i=0, p=booth_conf->authkey; i < booth_conf->authkey_len; i++, p++) if (!isascii(*p)) return; p = booth_conf->authkey; while (booth_conf->authkey_len > 0 && isspace(*p)) { p++; booth_conf->authkey_len--; } memmove(booth_conf->authkey, p, booth_conf->authkey_len); p = booth_conf->authkey + booth_conf->authkey_len - 1; while (booth_conf->authkey_len > 0 && isspace(*p)) { booth_conf->authkey_len--; p--; } } static int read_authkey() { int fd; booth_conf->authkey[0] = '\0'; if (stat(booth_conf->authfile, &booth_conf->authstat) < 0) { log_error("cannot stat authentication file %s: %s", booth_conf->authfile, strerror(errno)); return -1; } if (booth_conf->authstat.st_mode & (S_IRGRP | S_IROTH)) { log_error("%s: file can be readable only for the owner", booth_conf->authfile); return -1; } fd = open(booth_conf->authfile, O_RDONLY); if (fd < 0) { log_error("cannot open %s: %s", booth_conf->authfile, strerror(errno)); return -1; } booth_conf->authkey_len = read(fd, booth_conf->authkey, BOOTH_MAX_KEY_LEN); close(fd); trim_key(); log_debug("read key of size %d in authfile %s", booth_conf->authkey_len, booth_conf->authfile); /* make sure that the key is of minimum length */ return (booth_conf->authkey_len >= BOOTH_MIN_KEY_LEN) ? 0 : -1; } int update_authkey() { struct stat statbuf; if (stat(booth_conf->authfile, &statbuf) < 0) { log_error("cannot stat authentication file %s: %s", booth_conf->authfile, strerror(errno)); return -1; } if (statbuf.st_mtime > booth_conf->authstat.st_mtime) { return read_authkey(); } return 0; } static int setup_config(int type) { int rv; rv = read_config(cl.configfile, type); if (rv < 0) goto out; if (booth_conf->authfile[0] != '\0') { rv = read_authkey(); if (rv < 0) goto out; } /* Set "local" pointer, ignoring errors. */ if (cl.type == DAEMON && cl.site[0]) { if (!find_site_by_name(cl.site, &local, 1)) { log_error("Cannot find \"%s\" in the configuration.", cl.site); return -EINVAL; } local->local = 1; } else find_myself(NULL, type == CLIENT || type == GEOSTORE); rv = check_config(type); if (rv < 0) goto out; /* Per default the PID file name is derived from the * configuration name. */ if (!cl.lockfile[0]) { snprintf(cl.lockfile, sizeof(cl.lockfile)-1, "%s/%s.pid", BOOTH_RUN_DIR, booth_conf->name); } out: return rv; } static int setup_transport(void) { int rv; rv = transport()->init(message_recv); if (rv < 0) { log_error("failed to init booth_transport %s", transport()->name); goto out; } rv = booth_transport[TCP].init(NULL); if (rv < 0) { log_error("failed to init booth_transport[TCP]"); goto out; } out: return rv; } static int write_daemon_state(int fd, int state) { char buffer[1024]; int rv, size; size = sizeof(buffer) - 1; rv = snprintf(buffer, size, "booth_pid=%d " "booth_state=%s " "booth_type=%s " "booth_cfg_name='%s' " "booth_id=%d " "booth_addr_string='%s' " "booth_port=%d\n", getpid(), ( state == BOOTHD_STARTED ? "started" : state == BOOTHD_STARTING ? "starting" : "invalid"), type_to_string(local->type), booth_conf->name, local->site_id, local->addr_string, booth_conf->port); if (rv < 0 || rv == size) { log_error("Buffer filled up in write_daemon_state()."); return -1; } size = rv; rv = ftruncate(fd, 0); if (rv < 0) { log_error("lockfile %s truncate error %d: %s", cl.lockfile, errno, strerror(errno)); return rv; } rv = lseek(fd, 0, SEEK_SET); if (rv < 0) { log_error("lseek set fd(%d) offset to 0 error, return(%d), message(%s)", fd, rv, strerror(errno)); rv = -1; return rv; } rv = write(fd, buffer, size); if (rv != size) { log_error("write to fd(%d, %d) returned %d, errno %d, message(%s)", fd, size, rv, errno, strerror(errno)); return -1; } return 0; } static int loop(int fd) { void (*workfn) (int ci); void (*deadfn) (int ci); int rv, i; rv = setup_transport(); if (rv < 0) goto fail; rv = setup_ticket(); if (rv < 0) goto fail; rv = write_daemon_state(fd, BOOTHD_STARTED); if (rv != 0) { log_error("write daemon state %d to lockfile error %s: %s", BOOTHD_STARTED, cl.lockfile, strerror(errno)); goto fail; } log_info("BOOTH %s daemon started, node id is 0x%08X (%d).", type_to_string(local->type), local->site_id, local->site_id); while (1) { rv = poll(pollfds, client_maxi + 1, poll_timeout); if (rv == -1 && errno == EINTR) continue; if (rv < 0) { log_error("poll failed: %s (%d)", strerror(errno), errno); goto fail; } for (i = 0; i <= client_maxi; i++) { if (clients[i].fd < 0) continue; if (pollfds[i].revents & POLLIN) { workfn = clients[i].workfn; if (workfn) workfn(i); } if (pollfds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) { deadfn = clients[i].deadfn; if (deadfn) deadfn(i); } } process_tickets(); } return 0; fail: return -1; } static int test_reply(cmd_result_t reply_code, cmd_request_t cmd) { int rv = 0; const char *op_str = ""; if (cmd == CMD_GRANT) op_str = "grant"; else if (cmd == CMD_REVOKE) op_str = "revoke"; else if (cmd == CMD_LIST) op_str = "list"; else if (cmd == CMD_PEERS) op_str = "peers"; else { log_error("internal error reading reply result!"); return -1; } switch (reply_code) { case RLT_OVERGRANT: log_info("You're granting a granted ticket. " "If you wanted to migrate a ticket, " "use revoke first, then use grant."); rv = -1; break; case RLT_TICKET_IDLE: log_info("ticket is not owned"); rv = 0; break; case RLT_ASYNC: log_info("%s command sent, result will be returned " "asynchronously. Please use \"booth list\" to " "see the outcome.", op_str); rv = 0; break; case RLT_CIB_PENDING: log_info("%s succeeded (CIB commit pending)", op_str); /* wait for the CIB commit? */ rv = (cl.options & OPT_WAIT_COMMIT) ? 3 : 0; break; case RLT_MORE: rv = 2; break; case RLT_SYNC_SUCC: case RLT_SUCCESS: if (cmd != CMD_LIST && cmd != CMD_PEERS) log_info("%s succeeded!", op_str); rv = 0; break; case RLT_SYNC_FAIL: log_info("%s failed!", op_str); rv = -1; break; case RLT_INVALID_ARG: log_error("ticket \"%s\" does not exist", cl.msg.ticket.id); rv = -1; break; case RLT_AUTH: log_error("authentication error"); rv = -1; break; case RLT_EXT_FAILED: log_error("before-acquire-handler for ticket \"%s\" failed, grant denied", cl.msg.ticket.id); rv = -1; break; case RLT_ATTR_PREREQ: log_error("attr-prereq for ticket \"%s\" failed, grant denied", cl.msg.ticket.id); rv = -1; break; case RLT_REDIRECT: /* talk to another site */ rv = 1; break; default: log_error("got an error code: %x", rv); rv = -1; } return rv; } static int query_get_string_answer(cmd_request_t cmd) { struct booth_site *site; struct boothc_hdr_msg reply; struct boothc_header *header; char *data; int data_len; int rv; struct booth_transport const *tpt; int (*test_reply_f) (cmd_result_t reply_code, cmd_request_t cmd); size_t msg_size; void *request; if (cl.type == GEOSTORE) { test_reply_f = test_attr_reply; msg_size = sizeof(cl.attr_msg); request = &cl.attr_msg; } else { test_reply_f = test_reply; msg_size = sizeof(cl.msg); request = &cl.msg; } header = (struct boothc_header *)request; data = NULL; init_header(header, cmd, 0, cl.options, 0, 0, msg_size); if (!*cl.site) site = local; else if (!find_site_by_name(cl.site, &site, 1)) { log_error("cannot find site \"%s\"", cl.site); rv = ENOENT; goto out; } tpt = booth_transport + TCP; rv = tpt->open(site); if (rv < 0) goto out_close; rv = tpt->send(site, request, msg_size); if (rv < 0) goto out_close; rv = tpt->recv_auth(site, &reply, sizeof(reply)); if (rv < 0) goto out_close; data_len = ntohl(reply.header.length) - rv; /* no attribute, or no ticket found */ if (!data_len) { goto out_test_reply; } data = malloc(data_len+1); if (!data) { rv = -ENOMEM; goto out_close; } rv = tpt->recv(site, data, data_len); if (rv < 0) goto out_close; *(data+data_len) = '\0'; *(data + data_len) = '\0'; (void)fputs(data, stdout); fflush(stdout); rv = 0; out_test_reply: rv = test_reply_f(ntohl(reply.header.result), cmd); out_close: tpt->close(site); out: if (data) free(data); return rv; } static int do_command(cmd_request_t cmd) { struct booth_site *site; struct boothc_ticket_msg reply; struct booth_transport const *tpt; uint32_t leader_id; int rv; int reply_cnt = 0, msg_logged = 0; const char *op_str = ""; if (cmd == CMD_GRANT) op_str = "grant"; else if (cmd == CMD_REVOKE) op_str = "revoke"; rv = 0; site = NULL; /* Always use TCP for client - at least for now. */ tpt = booth_transport + TCP; if (!*cl.site) site = local; else { if (!find_site_by_name(cl.site, &site, 1)) { log_error("Site \"%s\" not configured.", cl.site); goto out_close; } } if (site->type == ARBITRATOR) { if (site == local) { log_error("We're just an arbitrator, cannot grant/revoke tickets here."); } else { log_error("%s is just an arbitrator, cannot grant/revoke tickets there.", cl.site); } goto out_close; } assert(site->type == SITE); /* We don't check for existence of ticket, so that asking can be * done without local configuration, too. * Although, that means that the UDP port has to be specified, too. */ if (!cl.msg.ticket.id[0]) { /* If the loaded configuration has only a single ticket defined, use that. */ if (booth_conf->ticket_count == 1) { - strcpy(cl.msg.ticket.id, booth_conf->ticket[0].name); + strncpy(cl.msg.ticket.id, booth_conf->ticket[0].name, + sizeof(cl.msg.ticket.id)); } else { log_error("No ticket given."); goto out_close; } } redirect: init_header(&cl.msg.header, cmd, 0, cl.options, 0, 0, sizeof(cl.msg)); rv = tpt->open(site); if (rv < 0) goto out_close; rv = tpt->send(site, &cl.msg, sendmsglen(&cl.msg)); if (rv < 0) goto out_close; read_more: rv = tpt->recv_auth(site, &reply, sizeof(reply)); if (rv < 0) { /* print any errors depending on the code sent by the * server */ (void)test_reply(ntohl(reply.header.result), cmd); goto out_close; } rv = test_reply(ntohl(reply.header.result), cmd); if (rv == 1) { tpt->close(site); leader_id = ntohl(reply.ticket.leader); if (!find_site_by_id(leader_id, &site)) { log_error("Message with unknown redirect site %x received", leader_id); rv = -1; goto out_close; } goto redirect; } else if (rv == 2 || rv == 3) { /* the server has more to say */ /* don't wait too long */ if (reply_cnt > 1 && !(cl.options & OPT_WAIT)) { rv = 0; log_info("Giving up on waiting for the definite result. " "Please use \"booth list\" later to " "see the outcome."); goto out_close; } if (reply_cnt == 0) { log_info("%s request sent, " "waiting for the result ...", op_str); msg_logged++; } else if (rv == 3 && msg_logged < 2) { log_info("waiting for the CIB commit ..."); msg_logged++; } reply_cnt++; goto read_more; } out_close: if (site) tpt->close(site); return rv; } static int _lockfile(int mode, int *fdp, pid_t *locked_by) { struct flock lock; int fd, rv; /* After reboot the directory may not yet exist. * Try to create it, but ignore errors. */ if (strncmp(cl.lockfile, BOOTH_RUN_DIR, strlen(BOOTH_RUN_DIR)) == 0) mkdir(BOOTH_RUN_DIR, 0775); if (locked_by) *locked_by = 0; *fdp = -1; fd = open(cl.lockfile, mode, 0664); if (fd < 0) return errno; *fdp = fd; lock.l_type = F_WRLCK; lock.l_start = 0; lock.l_whence = SEEK_SET; lock.l_len = 0; lock.l_pid = 0; if (fcntl(fd, F_SETLK, &lock) == 0) return 0; rv = errno; if (locked_by) if (fcntl(fd, F_GETLK, &lock) == 0) *locked_by = lock.l_pid; return rv; } static inline int is_root(void) { return geteuid() == 0; } static int create_lockfile(void) { int rv, fd; fd = -1; rv = _lockfile(O_CREAT | O_WRONLY, &fd, NULL); if (fd == -1) { log_error("lockfile %s open error %d: %s", cl.lockfile, rv, strerror(rv)); return -1; } if (rv < 0) { log_error("lockfile %s setlk error %d: %s", cl.lockfile, rv, strerror(rv)); goto fail; } rv = write_daemon_state(fd, BOOTHD_STARTING); if (rv != 0) { log_error("write daemon state %d to lockfile error %s: %s", BOOTHD_STARTING, cl.lockfile, strerror(errno)); goto fail; } if (is_root()) { if (fchown(fd, booth_conf->uid, booth_conf->gid) < 0) log_error("fchown() on lockfile said %d: %s", errno, strerror(errno)); } return fd; fail: close(fd); return -1; } static void unlink_lockfile(int fd) { unlink(cl.lockfile); close(fd); } static void print_usage(void) { printf( "Usage:\n" " booth list [options]\n" " booth {grant|revoke} [options] \n" " booth status [options]\n" "\n" " list: List all tickets\n" " grant: Grant ticket to site\n" " revoke: Revoke ticket\n" "\n" "Options:\n" " -c FILE Specify config file [default " BOOTH_DEFAULT_CONF "]\n" " Can be a path or just a name without \".conf\" suffix\n" " -s Connect/grant to a different site\n" " -F Try to grant the ticket immediately\n" " even if not all sites are reachable\n" " -w Wait forever for the outcome of the request\n" " -C Wait until the ticket is committed to the CIB (grant only)\n" " -h Print this help\n" "\n" "Examples:\n" "\n" " # booth list (list tickets)\n" " # booth grant ticket-A (grant ticket here)\n" " # booth grant -s 10.121.8.183 ticket-A (grant ticket to site 10.121.8.183)\n" " # booth revoke ticket-A (revoke ticket)\n" "\n" "See the booth(8) man page for more details.\n" ); } #define OPTION_STRING "c:Dl:t:s:FhSwC" #define ATTR_OPTION_STRING "c:Dt:s:h" void safe_copy(char *dest, char *value, size_t buflen, const char *description) { int content_len = buflen - 1; if (strlen(value) >= content_len) { fprintf(stderr, "'%s' exceeds maximum %s length of %d\n", value, description, content_len); exit(EXIT_FAILURE); } strncpy(dest, value, content_len); dest[content_len] = 0; } static int host_convert(char *hostname, char *ip_str, size_t ip_size) { struct addrinfo *result = NULL, hints = {0}; int re = -1; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_INET; hints.ai_socktype = SOCK_DGRAM; re = getaddrinfo(hostname, NULL, &hints, &result); if (re == 0) { struct in_addr addr = ((struct sockaddr_in *)result->ai_addr)->sin_addr; const char *re_ntop = inet_ntop(AF_INET, &addr, ip_str, ip_size); if (re_ntop == NULL) { re = -1; } } freeaddrinfo(result); return re; } #define cparg(dest, descr) do { \ if (optind >= argc) \ goto missingarg; \ safe_copy(dest, argv[optind], sizeof(dest), descr); \ optind++; \ } while(0) static int read_arguments(int argc, char **argv) { int optchar; char *arg1 = argv[1]; char *op = NULL; char *cp; const char *opt_string = OPTION_STRING; char site_arg[INET_ADDRSTRLEN] = {0}; int left; cl.type = 0; if ((cp = strstr(argv[0], ATTR_PROG)) && !strcmp(cp, ATTR_PROG)) { cl.type = GEOSTORE; op = argv[1]; optind = 2; opt_string = ATTR_OPTION_STRING; } else if (argc > 1 && (strcmp(arg1, "arbitrator") == 0 || strcmp(arg1, "site") == 0 || strcmp(arg1, "start") == 0 || strcmp(arg1, "daemon") == 0)) { cl.type = DAEMON; optind = 2; } else if (argc > 1 && (strcmp(arg1, "status") == 0)) { cl.type = STATUS; optind = 2; } else if (argc > 1 && (strcmp(arg1, "client") == 0)) { cl.type = CLIENT; if (argc < 3) { print_usage(); exit(EXIT_FAILURE); } op = argv[2]; optind = 3; } if (!cl.type) { cl.type = CLIENT; op = argv[1]; optind = 2; } if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") || !strcmp(arg1, "-h")) { if (cl.type == GEOSTORE) print_geostore_usage(); else print_usage(); exit(EXIT_SUCCESS); } if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") || !strcmp(arg1, "-V")) { printf("%s %s\n", argv[0], RELEASE_STR); exit(EXIT_SUCCESS); } if (cl.type == CLIENT) { if (!strcmp(op, "list")) cl.op = CMD_LIST; else if (!strcmp(op, "grant")) cl.op = CMD_GRANT; else if (!strcmp(op, "revoke")) cl.op = CMD_REVOKE; else if (!strcmp(op, "peers")) cl.op = CMD_PEERS; else { fprintf(stderr, "client operation \"%s\" is unknown\n", op); exit(EXIT_FAILURE); } } else if (cl.type == GEOSTORE) { if (!strcmp(op, "list")) cl.op = ATTR_LIST; else if (!strcmp(op, "set")) cl.op = ATTR_SET; else if (!strcmp(op, "get")) cl.op = ATTR_GET; else if (!strcmp(op, "delete")) cl.op = ATTR_DEL; else { fprintf(stderr, "attribute operation \"%s\" is unknown\n", op); exit(EXIT_FAILURE); } } while (optind < argc) { optchar = getopt(argc, argv, opt_string); switch (optchar) { case 'c': if (strchr(optarg, '/')) { safe_copy(cl.configfile, optarg, sizeof(cl.configfile), "config file"); } else { /* If no "/" in there, use with default directory. */ strcpy(cl.configfile, BOOTH_DEFAULT_CONF_DIR); cp = cl.configfile + strlen(BOOTH_DEFAULT_CONF_DIR); assert(cp > cl.configfile); assert(*(cp-1) == '/'); /* Write at the \0, ie. after the "/" */ safe_copy(cp, optarg, (sizeof(cl.configfile) - (cp - cl.configfile) - strlen(BOOTH_DEFAULT_CONF_EXT)), "config name"); /* If no extension, append ".conf". * Space is available, see -strlen() above. */ if (!strchr(cp, '.')) strcat(cp, BOOTH_DEFAULT_CONF_EXT); } break; case 'D': debug_level++; enable_stderr = 1; /* Fall through */ case 'S': daemonize = 1; break; case 'l': safe_copy(cl.lockfile, optarg, sizeof(cl.lockfile), "lock file"); break; case 't': if (cl.op == CMD_GRANT || cl.op == CMD_REVOKE) { safe_copy(cl.msg.ticket.id, optarg, sizeof(cl.msg.ticket.id), "ticket name"); } else if (cl.type == GEOSTORE) { safe_copy(cl.attr_msg.attr.tkt_id, optarg, sizeof(cl.attr_msg.attr.tkt_id), "ticket name"); } else { print_usage(); exit(EXIT_FAILURE); } break; case 's': /* For testing and debugging: allow "-s site" also for * daemon start, so that the address that should be used * can be set manually. * This makes it easier to start multiple processes * on one machine. */ if (cl.type == CLIENT || cl.type == GEOSTORE || (cl.type == DAEMON && debug_level)) { if (strcmp(optarg, OTHER_SITE) && host_convert(optarg, site_arg, INET_ADDRSTRLEN) == 0) { safe_copy(cl.site, site_arg, sizeof(cl.site), "site name"); } else { safe_copy(cl.site, optarg, sizeof(cl.site), "site name"); } } else { log_error("\"-s\" not allowed in daemon mode."); exit(EXIT_FAILURE); } break; case 'F': if (cl.type != CLIENT || cl.op != CMD_GRANT) { log_error("use \"-F\" only for client grant"); exit(EXIT_FAILURE); } cl.options |= OPT_IMMEDIATE; break; case 'w': if (cl.type != CLIENT || (cl.op != CMD_GRANT && cl.op != CMD_REVOKE)) { log_error("use \"-w\" only for grant and revoke"); exit(EXIT_FAILURE); } cl.options |= OPT_WAIT; break; case 'C': if (cl.type != CLIENT || cl.op != CMD_GRANT) { log_error("use \"-C\" only for grant"); exit(EXIT_FAILURE); } cl.options |= OPT_WAIT | OPT_WAIT_COMMIT; break; case 'h': if (cl.type == GEOSTORE) print_geostore_usage(); else print_usage(); exit(EXIT_SUCCESS); break; case ':': case '?': fprintf(stderr, "Please use '-h' for usage.\n"); exit(EXIT_FAILURE); break; case -1: /* No more parameters on cmdline, only arguments. */ goto extra_args; default: goto unknown; }; } return 0; extra_args: if (cl.type == CLIENT && !cl.msg.ticket.id[0]) { cparg(cl.msg.ticket.id, "ticket name"); } else if (cl.type == GEOSTORE) { if (cl.op != ATTR_LIST) { cparg(cl.attr_msg.attr.name, "attribute name"); } if (cl.op == ATTR_SET) { cparg(cl.attr_msg.attr.val, "attribute value"); } } if (optind == argc) return 0; left = argc - optind; fprintf(stderr, "Superfluous argument%s: %s%s\n", left == 1 ? "" : "s", argv[optind], left == 1 ? "" : "..."); exit(EXIT_FAILURE); unknown: fprintf(stderr, "unknown option: %s\n", argv[optind]); exit(EXIT_FAILURE); missingarg: fprintf(stderr, "not enough arguments\n"); exit(EXIT_FAILURE); } static void set_scheduler(void) { struct sched_param sched_param; struct rlimit rlimit; int rv; rlimit.rlim_cur = RLIM_INFINITY; rlimit.rlim_max = RLIM_INFINITY; setrlimit(RLIMIT_MEMLOCK, &rlimit); rv = mlockall(MCL_CURRENT | MCL_FUTURE); if (rv < 0) { log_error("mlockall failed"); } rv = sched_get_priority_max(SCHED_RR); if (rv != -1) { sched_param.sched_priority = rv; rv = sched_setscheduler(0, SCHED_RR, &sched_param); if (rv == -1) log_error("could not set SCHED_RR priority %d: %s (%d)", sched_param.sched_priority, strerror(errno), errno); } else { log_error("could not get maximum scheduler priority err %d", errno); } } static int set_procfs_val(const char *path, const char *val) { int rc = -1; FILE *fp = fopen(path, "w"); if (fp) { if (fprintf(fp, "%s", val) > 0) rc = 0; fclose(fp); } return rc; } static int do_status(int type) { pid_t pid; int rv, lock_fd, ret; const char *reason = NULL; char lockfile_data[1024], *cp; ret = PCMK_OCF_NOT_RUNNING; rv = setup_config(type); if (rv) { reason = "Error reading configuration."; ret = PCMK_OCF_UNKNOWN_ERROR; goto quit; } if (!local) { reason = "No Service IP active here."; goto quit; } rv = _lockfile(O_RDWR, &lock_fd, &pid); if (rv == 0) { reason = "PID file not locked."; goto quit; } if (lock_fd == -1) { reason = "No PID file."; goto quit; } if (pid) { fprintf(stdout, "booth_lockpid=%d ", pid); fflush(stdout); } rv = read(lock_fd, lockfile_data, sizeof(lockfile_data) - 1); if (rv < 4) { reason = "Cannot read lockfile data."; ret = PCMK_LSB_UNKNOWN_ERROR; goto quit; } lockfile_data[rv] = 0; if (lock_fd != -1) close(lock_fd); /* Make sure it's only a single line */ cp = strchr(lockfile_data, '\r'); if (cp) *cp = 0; cp = strchr(lockfile_data, '\n'); if (cp) *cp = 0; rv = setup_tcp_listener(1); if (rv == 0) { reason = "TCP port not in use."; goto quit; } fprintf(stdout, "booth_lockfile='%s' %s\n", cl.lockfile, lockfile_data); if (daemonize) fprintf(stderr, "Booth at %s port %d seems to be running.\n", local->addr_string, booth_conf->port); return 0; quit: log_debug("not running: %s", reason); /* Ie. "DEBUG" */ if (daemonize) fprintf(stderr, "not running: %s\n", reason); return ret; } static int limit_this_process(void) { int rv; if (!is_root()) return 0; if (setregid(booth_conf->gid, booth_conf->gid) < 0) { rv = errno; log_error("setregid() didn't work: %s", strerror(rv)); return rv; } if (setreuid(booth_conf->uid, booth_conf->uid) < 0) { rv = errno; log_error("setreuid() didn't work: %s", strerror(rv)); return rv; } return 0; } static int lock_fd = -1; static void server_exit(void) { int rv; if (lock_fd >= 0) { /* We might not be able to delete it, but at least * make it empty. */ rv = ftruncate(lock_fd, 0); (void)rv; unlink_lockfile(lock_fd); } log_info("exiting"); } static void sig_exit_handler(int sig) { log_info("caught signal %d", sig); exit(0); } static int do_server(int type) { int rv = -1; static char log_ent[128] = DAEMON_NAME "-"; rv = setup_config(type); if (rv < 0) return rv; if (!local) { log_error("Cannot find myself in the configuration."); exit(EXIT_FAILURE); } if (!daemonize) { if (daemon(0, 0) < 0) { perror("daemon error"); exit(EXIT_FAILURE); } } /* The lockfile must be written to _after_ the call to daemon(), so * that the lockfile contains the pid of the daemon, not the parent. */ lock_fd = create_lockfile(); if (lock_fd < 0) return lock_fd; atexit(server_exit); strcat(log_ent, type_to_string(local->type)); cl_log_set_entity(log_ent); cl_log_enable_stderr(enable_stderr ? TRUE : FALSE); cl_log_set_facility(HA_LOG_FACILITY); cl_inherit_logging_environment(0); log_info("BOOTH %s %s daemon is starting", type_to_string(local->type), RELEASE_STR); signal(SIGUSR1, (__sighandler_t)tickets_log_info); signal(SIGTERM, (__sighandler_t)sig_exit_handler); signal(SIGINT, (__sighandler_t)sig_exit_handler); /* we'll handle errors there and then */ signal(SIGPIPE, SIG_IGN); set_scheduler(); /* we don't want to be killed by the OOM-killer */ if (set_procfs_val("/proc/self/oom_score_adj", "-999")) (void)set_procfs_val("/proc/self/oom_adj", "-16"); set_proc_title("%s %s %s for [%s]:%d", DAEMON_NAME, cl.configfile, type_to_string(local->type), local->addr_string, booth_conf->port); rv = limit_this_process(); if (rv) return rv; if (cl_enable_coredumps(TRUE) < 0){ log_error("enabling core dump failed"); } cl_cdtocoredir(); prctl(PR_SET_DUMPABLE, (unsigned long)TRUE, 0UL, 0UL, 0UL); signal(SIGCHLD, (__sighandler_t)wait_child); rv = loop(lock_fd); return rv; } static int do_client(void) { int rv; rv = setup_config(CLIENT); if (rv < 0) { log_error("cannot read config"); goto out; } switch (cl.op) { case CMD_LIST: case CMD_PEERS: rv = query_get_string_answer(cl.op); break; case CMD_GRANT: case CMD_REVOKE: rv = do_command(cl.op); break; } out: return rv; } static int do_attr(void) { int rv = -1; rv = setup_config(GEOSTORE); if (rv < 0) { log_error("cannot read config"); goto out; } /* We don't check for existence of ticket, so that asking can be * done without local configuration, too. * Although, that means that the UDP port has to be specified, too. */ if (!cl.attr_msg.attr.tkt_id[0]) { /* If the loaded configuration has only a single ticket defined, use that. */ if (booth_conf->ticket_count == 1) { - strcpy(cl.attr_msg.attr.tkt_id, booth_conf->ticket[0].name); + strncpy(cl.attr_msg.attr.tkt_id, booth_conf->ticket[0].name, + sizeof(cl.attr_msg.attr.tkt_id)); } else { rv = 1; log_error("No ticket given."); goto out; } } switch (cl.op) { case ATTR_LIST: case ATTR_GET: rv = query_get_string_answer(cl.op); break; case ATTR_SET: case ATTR_DEL: rv = do_attr_command(cl.op); break; } out: return rv; } int main(int argc, char *argv[], char *envp[]) { int rv; char *cp; init_set_proc_title(argc, argv, envp); get_time(&start_time); memset(&cl, 0, sizeof(cl)); strncpy(cl.configfile, BOOTH_DEFAULT_CONF, BOOTH_PATH_LEN - 1); cl.lockfile[0] = 0; debug_level = 0; cl_log_set_entity( (cp = strstr(argv[0], ATTR_PROG)) && !strcmp(cp, ATTR_PROG) ? ATTR_PROG : "booth" ); cl_log_enable_stderr(TRUE); cl_log_set_facility(0); rv = read_arguments(argc, argv); if (rv < 0) goto out; switch (cl.type) { case STATUS: rv = do_status(cl.type); break; case ARBITRATOR: case DAEMON: case SITE: rv = do_server(cl.type); break; case CLIENT: rv = do_client(); break; case GEOSTORE: rv = do_attr(); break; } out: /* Normalize values. 0x100 would be seen as "OK" by waitpid(). */ return (rv >= 0 && rv < 0x70) ? rv : 1; } diff --git a/src/ticket.c b/src/ticket.c index 9b62b2c..475b4ac 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -1,1286 +1,1287 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include #include #include #include #include #include #include #include #include #include "b_config.h" #include "ticket.h" #include "config.h" #include "pacemaker.h" #include "inline-fn.h" #include "log.h" #include "booth.h" #include "raft.h" #include "handler.h" #include "request.h" #define TK_LINE 256 extern int TIME_RES; /* Untrusted input, must fit (incl. \0) in a buffer of max chars. */ int check_max_len_valid(const char *s, int max) { int i; for(i=0; iticket_count; i++) { - if (!strcmp(booth_conf->ticket[i].name, ticket)) { + if (!strncmp(booth_conf->ticket[i].name, ticket, + sizeof(booth_conf->ticket[i].name))) { if (found) *found = booth_conf->ticket + i; return 1; } } return 0; } int check_ticket(char *ticket, struct ticket_config **found) { if (found) *found = NULL; if (!booth_conf) return 0; if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name))) return 0; return find_ticket_by_name(ticket, found); } int check_site(char *site, int *is_local) { struct booth_site *node; if (!check_max_len_valid(site, sizeof(node->addr_string))) return 0; if (find_site_by_name(site, &node, 0)) { *is_local = node->local; return 1; } return 0; } /* is it safe to commit the grant? * if we didn't hear from all sites on the initial grant, we may * need to delay the commit * * TODO: investigate possibility to devise from history whether a * missing site could be holding a ticket or not */ static int ticket_dangerous(struct ticket_config *tk) { int tdiff; /* we may be invoked often, don't spam the log unnecessarily */ static int no_log_delay_msg; if (!is_time_set(&tk->delay_commit)) return 0; if (is_past(&tk->delay_commit) || all_sites_replied(tk)) { if (tk->leader == local) { tk_log_info("%s, committing to CIB", is_past(&tk->delay_commit) ? "ticket delay expired" : "all sites replied"); } time_reset(&tk->delay_commit); no_log_delay_msg = 0; return 0; } tdiff = time_left(&tk->delay_commit); tk_log_debug("delay ticket commit for another " intfmt(tdiff)); if (!no_log_delay_msg) { tk_log_info("delaying ticket commit to CIB for " intfmt(tdiff)); tk_log_info("(or all sites are reached)"); no_log_delay_msg = 1; } return 1; } int ticket_write(struct ticket_config *tk) { if (local->type != SITE) return -EINVAL; if (ticket_dangerous(tk)) return 1; if (tk->leader == local) { if (tk->state != ST_LEADER) { tk_log_info("ticket state not yet consistent, " "delaying ticket grant to CIB"); return 1; } pcmk_handler.grant_ticket(tk); } else { pcmk_handler.revoke_ticket(tk); } tk->update_cib = 0; return 0; } void save_committed_tkt(struct ticket_config *tk) { if (!tk->last_valid_tk) { tk->last_valid_tk = malloc(sizeof(struct ticket_config)); if (!tk->last_valid_tk) { log_error("out of memory"); return; } } memcpy(tk->last_valid_tk, tk, sizeof(struct ticket_config)); } static void ext_prog_failed(struct ticket_config *tk, int start_election) { /* Give it to somebody else. * Just send a VOTE_FOR message, so the * others can start elections. */ if (leader_and_valid(tk)) { save_committed_tkt(tk); reset_ticket(tk); ticket_write(tk); if (start_election) { ticket_broadcast(tk, OP_VOTE_FOR, OP_REQ_VOTE, RLT_SUCCESS, OR_LOCAL_FAIL); } } } #define attr_found(geo_ap, ap) \ ((geo_ap) && !strcmp((geo_ap)->val, (ap)->attr_val)) int check_attr_prereq(struct ticket_config *tk, grant_type_e grant_type) { GList *el; struct attr_prereq *ap; struct geo_attr *geo_ap; for (el = g_list_first(tk->attr_prereqs); el; el = g_list_next(el)) { ap = (struct attr_prereq *)el->data; if (ap->grant_type != grant_type) continue; geo_ap = (struct geo_attr *)g_hash_table_lookup(tk->attr, ap->attr_name); switch(ap->op) { case ATTR_OP_EQ: if (!attr_found(geo_ap, ap)) goto fail; break; case ATTR_OP_NE: if (attr_found(geo_ap, ap)) goto fail; break; default: break; } } return 0; fail: tk_log_warn("'%s' attr-prereq failed", ap->attr_name); return 1; } /* do we need to run the external program? * or we already done that and waiting for the outcome * or program exited and we can collect the status * return codes * 0: no program defined * RUNCMD_MORE: program forked, results later * != 0: executing program failed (or some other failure) */ static int do_ext_prog(struct ticket_config *tk, int start_election) { int rv = 0; if (!tk_test.path) return 0; switch(tk_test.progstate) { case EXTPROG_IDLE: rv = run_handler(tk); if (rv == RUNCMD_ERR) { tk_log_warn("couldn't run external test, not allowed to acquire ticket"); ext_prog_failed(tk, start_election); } break; case EXTPROG_RUNNING: /* should never get here, but just in case */ rv = RUNCMD_MORE; break; case EXTPROG_EXITED: rv = tk_test_exit_status(tk); if (rv) { ext_prog_failed(tk, start_election); } break; case EXTPROG_IGNORE: /* nothing to do here */ break; } return rv; } /* Try to acquire a ticket * Could be manual grant or after start (if the ticket is granted * and still valid in the CIB) * If the external program needs to run, this is run twice, once * to start the program, and then to get the result and start * elections. */ int acquire_ticket(struct ticket_config *tk, cmd_reason_t reason) { int rv; if (reason == OR_ADMIN && check_attr_prereq(tk, GRANT_MANUAL)) return RLT_ATTR_PREREQ; switch(do_ext_prog(tk, 0)) { case 0: /* everything fine */ break; case RUNCMD_MORE: /* need to wait for the outcome before starting elections */ return 0; default: return RLT_EXT_FAILED; } rv = new_election(tk, local, 1, reason); return rv ? RLT_SYNC_FAIL : 0; } /** Try to get the ticket for the local site. * */ int do_grant_ticket(struct ticket_config *tk, int options) { int rv; tk_log_info("granting ticket"); if (tk->leader == local) return RLT_SUCCESS; if (is_owned(tk)) return RLT_OVERGRANT; set_future_time(&tk->delay_commit, tk->term_duration + tk->acquire_after); if (options & OPT_IMMEDIATE) { tk_log_warn("granting ticket immediately! If there are " "unreachable sites, _hope_ you are sure that they don't " "have the ticket!"); time_reset(&tk->delay_commit); } rv = acquire_ticket(tk, OR_ADMIN); if (rv) { time_reset(&tk->delay_commit); return rv; } else { return RLT_MORE; } } static void start_revoke_ticket(struct ticket_config *tk) { tk_log_info("revoking ticket"); save_committed_tkt(tk); reset_ticket(tk); set_leader(tk, no_leader); ticket_write(tk); ticket_broadcast(tk, OP_REVOKE, OP_ACK, RLT_SUCCESS, OR_ADMIN); } /** Ticket revoke. * Only to be started from the leader. */ int do_revoke_ticket(struct ticket_config *tk) { if (tk->acks_expected) { tk_log_info("delay ticket revoke until the current operation finishes"); set_next_state(tk, ST_INIT); return RLT_MORE; } else { start_revoke_ticket(tk); return RLT_SUCCESS; } } int list_ticket(char **pdata, unsigned int *len) { struct ticket_config *tk; char timeout_str[64]; char pending_str[64]; char *data, *cp; int i, alloc; time_t ts; *pdata = NULL; *len = 0; alloc = 256 + booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128); data = malloc(alloc); if (!data) return -ENOMEM; cp = data; foreach_ticket(i, tk) { if (is_time_set(&tk->term_expires)) { ts = wall_ts(&tk->term_expires); strftime(timeout_str, sizeof(timeout_str), "%F %T", localtime(&ts)); } else strcpy(timeout_str, "INF"); if (tk->leader == local && is_time_set(&tk->delay_commit) && !is_past(&tk->delay_commit)) { ts = wall_ts(&tk->delay_commit); strcpy(pending_str, " (commit pending until "); strftime(pending_str + strlen(" (commit pending until "), sizeof(pending_str) - strlen(" (commit pending until ") - 1, "%F %T", localtime(&ts)); strcat(pending_str, ")"); } else *pending_str = '\0'; cp += snprintf(cp, alloc - (cp - data), "ticket: %s, leader: %s", tk->name, ticket_leader_string(tk)); if (is_owned(tk)) { cp += snprintf(cp, alloc - (cp - data), ", expires: %s%s\n", timeout_str, pending_str); } else { cp += snprintf(cp, alloc - (cp - data), "\n"); } if (alloc - (cp - data) <= 0) return -ENOMEM; } *pdata = data; *len = cp - data; return 0; } void disown_ticket(struct ticket_config *tk) { set_leader(tk, NULL); tk->is_granted = 0; get_time(&tk->term_expires); } int disown_if_expired(struct ticket_config *tk) { if (is_past(&tk->term_expires) || !tk->leader) { disown_ticket(tk); return 1; } return 0; } void reset_ticket(struct ticket_config *tk) { ignore_ext_test(tk); disown_ticket(tk); no_resends(tk); set_state(tk, ST_INIT); tk->voted_for = NULL; } static void log_reacquire_reason(struct ticket_config *tk) { int valid; const char *where_granted = "\0"; char buff[64]; valid = is_time_set(&tk->term_expires) && !is_past(&tk->term_expires); if (tk->leader == local) { where_granted = "granted here"; } else { snprintf(buff, sizeof(buff), "granted to %s", site_string(tk->leader)); where_granted = buff; } if (!valid) { tk_log_warn("%s, but not valid " "anymore (will try to reacquire)", where_granted); } if (tk->is_granted && tk->leader != local) { if (tk->leader && tk->leader != no_leader) { tk_log_error("granted here, but also %s, " "that's really too bad (will try to reacquire)", where_granted); } else { tk_log_warn("granted here, but we're " "not recorded as the grantee (will try to reacquire)"); } } } void update_ticket_state(struct ticket_config *tk, struct booth_site *sender) { if (tk->state == ST_CANDIDATE) { tk_log_info("learned from %s about " "newer ticket, stopping elections", site_string(sender)); /* there could be rejects coming from others; don't log * warnings unnecessarily */ tk->expect_more_rejects = 1; } if (tk->leader == local || tk->is_granted) { /* message from a live leader with valid ticket? */ if (sender == tk->leader && term_time_left(tk)) { if (tk->is_granted) { tk_log_warn("ticket was granted here, " "but it's live at %s (revoking here)", site_string(sender)); } else { tk_log_info("ticket live at %s", site_string(sender)); } disown_ticket(tk); ticket_write(tk); set_state(tk, ST_FOLLOWER); set_next_state(tk, ST_FOLLOWER); } else { if (tk->state == ST_CANDIDATE) { set_state(tk, ST_FOLLOWER); } set_next_state(tk, ST_LEADER); } } else { if (!tk->leader || tk->leader == no_leader) { if (sender) tk_log_info("ticket is not granted"); else tk_log_info("ticket is not granted (from CIB)"); set_state(tk, ST_INIT); } else { if (sender) tk_log_info("ticket granted to %s (says %s)", site_string(tk->leader), tk->leader == sender ? "they" : site_string(sender)); else tk_log_info("ticket granted to %s (from CIB)", site_string(tk->leader)); set_state(tk, ST_FOLLOWER); /* just make sure that we check the ticket soon */ set_next_state(tk, ST_FOLLOWER); } } } int setup_ticket(void) { struct ticket_config *tk; int i; foreach_ticket(i, tk) { reset_ticket(tk); if (local->type == SITE) { if (!pcmk_handler.load_ticket(tk)) { update_ticket_state(tk, NULL); } tk->update_cib = 1; } tk_log_info("broadcasting state query"); /* wait until all send their status (or the first * timeout) */ tk->start_postpone = 1; ticket_broadcast(tk, OP_STATUS, OP_MY_INDEX, RLT_SUCCESS, 0); } return 0; } int ticket_answer_list(int fd) { char *data; int olen, rv; struct boothc_hdr_msg hdr; rv = list_ticket(&data, &olen); if (rv < 0) goto out; init_header(&hdr.header, CL_LIST, 0, 0, RLT_SUCCESS, 0, sizeof(hdr) + olen); rv = send_header_plus(fd, &hdr, data, olen); out: if (data) free(data); return rv; } int process_client_request(struct client *req_client, void *buf) { int rv, rc = 1; struct ticket_config *tk; int cmd; struct boothc_ticket_msg omsg; struct boothc_ticket_msg *msg; msg = (struct boothc_ticket_msg *)buf; cmd = ntohl(msg->header.cmd); if (!check_ticket(msg->ticket.id, &tk)) { log_warn("client referenced unknown ticket %s", msg->ticket.id); rv = RLT_INVALID_ARG; goto reply_now; } if ((cmd == CMD_GRANT) && is_owned(tk)) { log_warn("client wants to grant an (already granted!) ticket %s", msg->ticket.id); rv = RLT_OVERGRANT; goto reply_now; } if ((cmd == CMD_REVOKE) && !is_owned(tk)) { log_info("client wants to revoke a free ticket %s", msg->ticket.id); rv = RLT_TICKET_IDLE; goto reply_now; } if ((cmd == CMD_REVOKE) && tk->leader != local) { tk_log_info("not granted here, redirect to %s", ticket_leader_string(tk)); rv = RLT_REDIRECT; goto reply_now; } if (cmd == CMD_REVOKE) rv = do_revoke_ticket(tk); else rv = do_grant_ticket(tk, ntohl(msg->header.options)); if (rv == RLT_MORE) { /* client may receive further notifications, save the * request for further processing */ add_req(tk, req_client, msg); tk_log_debug("queue request %s for client %d", state_to_string(cmd), req_client->fd); rc = 0; /* we're not yet done with the message */ } reply_now: init_ticket_msg(&omsg, CL_RESULT, 0, rv, 0, tk); send_client_msg(req_client->fd, &omsg); return rc; } int notify_client(struct ticket_config *tk, int client_fd, struct boothc_ticket_msg *msg) { struct boothc_ticket_msg omsg; void (*deadfn) (int ci); int rv, rc, ci; int cmd, options; struct client *req_client; cmd = ntohl(msg->header.cmd); options = ntohl(msg->header.options); rv = tk->outcome; ci = find_client_by_fd(client_fd); if (ci < 0) { tk_log_info("client %d (request %s) left before being notified", client_fd, state_to_string(cmd)); return 0; } tk_log_debug("notifying client %d (request %s)", client_fd, state_to_string(cmd)); init_ticket_msg(&omsg, CL_RESULT, 0, rv, 0, tk); rc = send_client_msg(client_fd, &omsg); if (rc == 0 && ((rv == RLT_MORE) || (rv == RLT_CIB_PENDING && (options & OPT_WAIT_COMMIT)))) { /* more to do here, keep the request */ return 1; } else { /* we sent a definite answer or there was a write error, drop * the client */ if (rc) { tk_log_debug("failed to notify client %d (request %s)", client_fd, state_to_string(cmd)); } else { tk_log_debug("client %d (request %s) got final notification", client_fd, state_to_string(cmd)); } req_client = clients + ci; deadfn = req_client->deadfn; if(deadfn) { deadfn(ci); } return 0; /* we're done with this request */ } } int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_request_t expected_reply, cmd_result_t res, cmd_reason_t reason) { struct boothc_ticket_msg msg; init_ticket_msg(&msg, cmd, 0, res, reason, tk); tk_log_debug("broadcasting '%s' (term=%d, valid=%d)", state_to_string(cmd), ntohl(msg.ticket.term), msg_term_time(&msg)); tk->last_request = cmd; if (expected_reply) { expect_replies(tk, expected_reply); } ticket_activate_timeout(tk); return transport()->broadcast_auth(&msg, sendmsglen(&msg)); } /* update the ticket on the leader, write it to the CIB, and send out the update message to others with the new expiry time */ int leader_update_ticket(struct ticket_config *tk) { int rv = 0, rv2; timetype now; if (tk->ticket_updated >= 2) return 0; if (tk->ticket_updated < 1) { tk->ticket_updated = 1; get_time(&now); copy_time(&now, &tk->last_renewal); set_future_time(&tk->term_expires, tk->term_duration); rv = ticket_broadcast(tk, OP_UPDATE, OP_ACK, RLT_SUCCESS, 0); } if (tk->ticket_updated < 2) { rv2 = ticket_write(tk); switch(rv2) { case 0: tk->ticket_updated = 2; tk->outcome = RLT_SUCCESS; foreach_tkt_req(tk, notify_client); break; case 1: if (tk->outcome != RLT_CIB_PENDING) { tk->outcome = RLT_CIB_PENDING; foreach_tkt_req(tk, notify_client); } break; default: break; } } return rv; } static void log_lost_servers(struct ticket_config *tk) { struct booth_site *n; int i; if (tk->retry_number > 1) /* log those that we couldn't reach, but do * that only on the first retry */ return; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if (!(tk->acks_received & n->bitmask)) { tk_log_warn("%s %s didn't acknowledge our %s, " "will retry %d times", (n->type == ARBITRATOR ? "arbitrator" : "site"), site_string(n), state_to_string(tk->last_request), tk->retries); } } } static void resend_msg(struct ticket_config *tk) { struct booth_site *n; int i; if (!(tk->acks_received ^ local->bitmask)) { ticket_broadcast(tk, tk->last_request, 0, RLT_SUCCESS, 0); } else { for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if (!(tk->acks_received & n->bitmask)) { n->resend_cnt++; tk_log_debug("resending %s to %s", state_to_string(tk->last_request), site_string(n) ); send_msg(tk->last_request, tk, n, NULL); } } ticket_activate_timeout(tk); } } static void handle_resends(struct ticket_config *tk) { int ack_cnt; if (++tk->retry_number > tk->retries) { tk_log_info("giving up on sending retries"); no_resends(tk); set_ticket_wakeup(tk); return; } /* try to reach some sites again if we just stepped down */ if (tk->last_request == OP_VOTE_FOR) { tk_log_warn("no answers to our VtFr request to step down (try #%d), " "we are alone", tk->retry_number); goto just_resend; } if (!majority_of_bits(tk, tk->acks_received)) { ack_cnt = count_bits(tk->acks_received) - 1; if (!ack_cnt) { tk_log_warn("no answers to our request (try #%d), " "we are alone", tk->retry_number); } else { tk_log_warn("not enough answers to our request (try #%d): " "only got %d answers", tk->retry_number, ack_cnt); } } else { log_lost_servers(tk); } just_resend: resend_msg(tk); } int postpone_ticket_processing(struct ticket_config *tk) { extern timetype start_time; return tk->start_postpone && (-time_left(&start_time) < tk->timeout); } #define has_extprog_exited(tk) ((tk)->clu_test.progstate == EXTPROG_EXITED) static void process_next_state(struct ticket_config *tk) { int rv; switch(tk->next_state) { case ST_LEADER: if (has_extprog_exited(tk)) { if (tk->state != ST_LEADER) { rv = acquire_ticket(tk, OR_ADMIN); if (rv != 0) { /* external program failed */ tk->outcome = rv; foreach_tkt_req(tk, notify_client); } } } else { log_reacquire_reason(tk); acquire_ticket(tk, OR_REACQUIRE); } break; case ST_INIT: no_resends(tk); start_revoke_ticket(tk); tk->outcome = RLT_SUCCESS; foreach_tkt_req(tk, notify_client); break; /* wanting to be follower is not much of an ambition; no * processing, just return; don't reset start_postpone until * we got some replies to status */ case ST_FOLLOWER: return; default: break; } tk->start_postpone = 0; } static void ticket_lost(struct ticket_config *tk) { int reason = OR_TKT_LOST; if (tk->leader != local) { tk_log_warn("lost at %s", site_string(tk->leader)); } else { if (is_ext_prog_running(tk)) { ext_prog_timeout(tk); reason = OR_LOCAL_FAIL; } else { tk_log_warn("lost majority (revoking locally)"); reason = tk->election_reason ? tk->election_reason : OR_REACQUIRE; } } tk->lost_leader = tk->leader; save_committed_tkt(tk); reset_ticket(tk); set_state(tk, ST_FOLLOWER); if (local->type == SITE) { ticket_write(tk); schedule_election(tk, reason); } } static void next_action(struct ticket_config *tk) { int rv; switch(tk->state) { case ST_INIT: /* init state, handle resends for ticket revoke */ /* and rebroadcast if stepping down */ /* try to acquire ticket on grant */ if (has_extprog_exited(tk)) { rv = acquire_ticket(tk, OR_ADMIN); if (rv != 0) { /* external program failed */ tk->outcome = rv; foreach_tkt_req(tk, notify_client); } } else { if (tk->acks_expected) { handle_resends(tk); } } break; case ST_FOLLOWER: /* leader/ticket lost? and we didn't vote yet */ tk_log_debug("leader: %s, voted_for: %s", site_string(tk->leader), site_string(tk->voted_for)); if (!tk->leader) { if (!tk->voted_for || !tk->in_election) { disown_ticket(tk); if (!new_election(tk, NULL, 1, OR_AGAIN)) { ticket_activate_timeout(tk); } } else { /* we should restart elections in case nothing * happens in the meantime */ tk->in_election = 0; ticket_activate_timeout(tk); } } break; case ST_CANDIDATE: /* elections timed out? */ elections_end(tk); break; case ST_LEADER: /* timeout or ticket renewal? */ if (tk->acks_expected) { handle_resends(tk); if (majority_of_bits(tk, tk->acks_received)) { leader_update_ticket(tk); } } else { /* this is ticket renewal, run local test */ if (!do_ext_prog(tk, 1)) { ticket_broadcast(tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0); tk->ticket_updated = 0; } } break; default: break; } } static void ticket_cron(struct ticket_config *tk) { /* don't process the tickets too early after start */ if (postpone_ticket_processing(tk)) { tk_log_debug("ticket processing postponed (start_postpone=%d)", tk->start_postpone); /* but run again soon */ ticket_activate_timeout(tk); return; } /* no need for status resends, we hope we got at least one * my_index back */ if (tk->acks_expected == OP_MY_INDEX) { no_resends(tk); } /* after startup, we need to decide what to do based on the * current ticket state; tk->next_state has a hint * also used for revokes which had to be delayed */ if (tk->next_state) { process_next_state(tk); goto out; } /* Has an owner, has an expiry date, and expiry date in the past? * Losing the ticket must happen in _every_ state. */ if (is_owned(tk) && is_time_set(&tk->term_expires) && is_past(&tk->term_expires)) { ticket_lost(tk); goto out; } next_action(tk); out: tk->next_state = 0; if (!tk->in_election && tk->update_cib) ticket_write(tk); } void process_tickets(void) { struct ticket_config *tk; int i; timetype last_cron; foreach_ticket(i, tk) { if (!has_extprog_exited(tk) && is_time_set(&tk->next_cron) && !is_past(&tk->next_cron)) continue; tk_log_debug("ticket cron"); copy_time(&tk->next_cron, &last_cron); ticket_cron(tk); if (time_cmp(&last_cron, &tk->next_cron, ==)) { tk_log_debug("nobody set ticket wakeup"); set_ticket_wakeup(tk); } } } void tickets_log_info(void) { struct ticket_config *tk; int i; time_t ts; foreach_ticket(i, tk) { ts = wall_ts(&tk->term_expires); tk_log_info("state '%s' " "term %d " "leader %s " "expires %-24.24s", state_to_string(tk->state), tk->current_term, ticket_leader_string(tk), ctime(&ts)); } } static void update_acks( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t cmd; uint32_t req; cmd = ntohl(msg->header.cmd); req = ntohl(msg->header.request); if (req != tk->last_request || (tk->acks_expected != cmd && tk->acks_expected != OP_REJECTED)) return; /* got an ack! */ tk->acks_received |= sender->bitmask; if (all_replied(tk) || /* we just stepped down, need only one site to start * elections */ (cmd == OP_REQ_VOTE && tk->last_request == OP_VOTE_FOR)) { no_resends(tk); tk->start_postpone = 0; set_ticket_wakeup(tk); } } /* read ticket message */ int ticket_recv(void *buf, struct booth_site *source) { struct boothc_ticket_msg *msg; struct ticket_config *tk; struct booth_site *leader; uint32_t leader_u; msg = (struct boothc_ticket_msg *)buf; if (!check_ticket(msg->ticket.id, &tk)) { log_warn("got invalid ticket name %s from %s", msg->ticket.id, site_string(source)); source->invalid_cnt++; return -EINVAL; } leader_u = ntohl(msg->ticket.leader); if (!find_site_by_id(leader_u, &leader)) { tk_log_error("message with unknown leader %u received", leader_u); source->invalid_cnt++; return -EINVAL; } update_acks(tk, source, leader, msg); return raft_answer(tk, source, leader, msg); } static void log_next_wakeup(struct ticket_config *tk) { int left; left = time_left(&tk->next_cron); tk_log_debug("set ticket wakeup in " intfmt(left)); } /* New vote round; ยง5.2 */ /* delay the next election start for some random time * (up to 1 second) */ void add_random_delay(struct ticket_config *tk) { timetype tv; interval_add(&tk->next_cron, rand_time(min(1000, tk->timeout)), &tv); ticket_next_cron_at(tk, &tv); if (ANYDEBUG) { log_next_wakeup(tk); } } void set_ticket_wakeup(struct ticket_config *tk) { timetype near_future, tv, next_vote; /* At least every hour, perhaps sooner (default) */ ticket_next_cron_in(tk, 3600*TIME_RES); set_future_time(&near_future, 10); switch (tk->state) { case ST_LEADER: assert(tk->leader == local); get_next_election_time(tk, &next_vote); /* If timestamp is in the past, wakeup in * near future */ if (!is_time_set(&next_vote)) { tk_log_debug("next ts unset, wakeup soon"); ticket_next_cron_at(tk, &near_future); } else if (is_past(&next_vote)) { int tdiff = time_left(&next_vote); tk_log_debug("next ts in the past " intfmt(tdiff)); ticket_next_cron_at(tk, &near_future); } else { ticket_next_cron_at(tk, &next_vote); } break; case ST_CANDIDATE: assert(is_time_set(&tk->election_end)); ticket_next_cron_at(tk, &tk->election_end); break; case ST_INIT: case ST_FOLLOWER: /* If there is (or should be) some owner, check on it later on. * If no one is interested - don't care. */ if (is_owned(tk)) { interval_add(&tk->term_expires, tk->acquire_after, &tv); ticket_next_cron_at(tk, &tv); } break; default: tk_log_error("unknown ticket state: %d", tk->state); } if (tk->next_state) { /* we need to do something soon here */ if (!tk->acks_expected) { ticket_next_cron_at(tk, &near_future); } else { ticket_activate_timeout(tk); } } if (ANYDEBUG) { log_next_wakeup(tk); } } void schedule_election(struct ticket_config *tk, cmd_reason_t reason) { if (local->type != SITE) return; tk->election_reason = reason; get_time(&tk->next_cron); /* introduce a short delay before starting election */ add_random_delay(tk); } /* Given a state (in host byte order), return a human-readable (char*). * An array is used so that multiple states can be printed in a single printf(). */ char *state_to_string(uint32_t state_ho) { union mu { cmd_request_t s; char c[5]; }; static union mu cache[6] = { { 0 } }, *cur; static int current = 0; current ++; if (current >= sizeof(cache)/sizeof(cache[0])) current = 0; cur = cache + current; cur->s = htonl(state_ho); /* Shouldn't be necessary, union array is initialized with zeroes, and * these bytes never get written. */ cur->c[4] = 0; return cur->c; } int send_reject(struct booth_site *dest, struct ticket_config *tk, cmd_result_t code, struct boothc_ticket_msg *in_msg) { int req = ntohl(in_msg->header.cmd); struct boothc_ticket_msg msg; tk_log_debug("sending reject to %s", site_string(dest)); init_ticket_msg(&msg, OP_REJECTED, req, code, 0, tk); return booth_udp_send_auth(dest, &msg, sendmsglen(&msg)); } int send_msg ( int cmd, struct ticket_config *tk, struct booth_site *dest, struct boothc_ticket_msg *in_msg ) { int req = 0; struct ticket_config *valid_tk = tk; struct boothc_ticket_msg msg; /* if we want to send the last valid ticket, then if we're in * the ST_CANDIDATE state, the last valid ticket is in * tk->last_valid_tk */ if (cmd == OP_MY_INDEX) { if (tk->state == ST_CANDIDATE && tk->last_valid_tk) { valid_tk = tk->last_valid_tk; } tk_log_info("sending status to %s", site_string(dest)); } if (in_msg) req = ntohl(in_msg->header.cmd); init_ticket_msg(&msg, cmd, req, RLT_SUCCESS, 0, valid_tk); return booth_udp_send_auth(dest, &msg, sendmsglen(&msg)); }