diff --git a/src/booth.h b/src/booth.h index 0bb3f7c..26f6339 100644 --- a/src/booth.h +++ b/src/booth.h @@ -1,236 +1,254 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _BOOTH_H #define _BOOTH_H #include #include #include #include #include #define BOOTH_RUN_DIR "/var/run/booth/" #define BOOTH_LOG_DIR "/var/log" #define BOOTH_LOGFILE_NAME "booth.log" #define BOOTH_DEFAULT_CONF_DIR "/etc/booth/" #define BOOTH_DEFAULT_CONF_NAME "booth" #define BOOTH_DEFAULT_CONF_EXT ".conf" #define BOOTH_DEFAULT_CONF \ BOOTH_DEFAULT_CONF_DIR BOOTH_DEFAULT_CONF_NAME BOOTH_DEFAULT_CONF_EXT #define DAEMON_NAME "boothd" #define BOOTH_PATH_LEN 127 #define BOOTH_DEFAULT_PORT 9929 /* TODO: remove */ #define BOOTH_PROTO_FAMILY AF_INET #define BOOTHC_MAGIC 0x5F1BA08C #define BOOTHC_VERSION 0x00010002 /** Timeout value for poll(). * Determines frequency of periodic jobs, eg. when send-retries are done. * See process_tickets(). */ #define POLL_TIMEOUT 1000 /** @{ */ /** The on-network data structures and constants. */ #define BOOTH_NAME_LEN 64 /* NONE wouldn't be specific enough. */ #define NO_ONE (-1) typedef unsigned char boothc_site [BOOTH_NAME_LEN]; typedef unsigned char boothc_ticket[BOOTH_NAME_LEN]; struct boothc_header { /** Authentication data; not used now. */ uint32_t iv; uint32_t auth1; uint32_t auth2; /** BOOTHC_MAGIC */ uint32_t magic; /** BOOTHC_VERSION */ uint32_t version; /** Packet source; site_id. See add_site(). */ uint32_t from; /** Length including header */ uint32_t length; /** The command respectively protocol state. See cmd_request_t. */ uint32_t cmd; /** Result of operation. 0 == OK */ uint32_t result; char data[0]; } __attribute__((packed)); struct ticket_msg { /** Ticket name. */ boothc_ticket id; /** Current leader. May be NO_ONE. See add_site(). * For a OP_REQ_VOTE this is */ uint32_t leader; /** Current term. */ uint32_t term; uint32_t term_valid_for; #if 0 union { uint32_t prev_log_term; uint32_t last_log_term; }; #endif union { uint32_t prev_log_index; uint32_t last_log_index; }; uint32_t leader_commit; } __attribute__((packed)); struct boothc_ticket_msg { struct boothc_header header; struct ticket_msg ticket; } __attribute__((packed)); #define CHAR2CONST(a,b,c,d) ((a << 24) | (b << 16) | (c << 8) | d) #define STG2CONST(X) ({ const char _ggg[4] = X; return (uint32_t*)_ggg; }) typedef enum { /* 0x43 = "C"ommands */ CMD_LIST = CHAR2CONST('C', 'L', 's', 't'), CMD_GRANT = CHAR2CONST('C', 'G', 'n', 't'), CMD_REVOKE = CHAR2CONST('C', 'R', 'v', 'k'), /* Replies */ CMR_GENERAL = CHAR2CONST('G', 'n', 'l', 'R'), // Increase distance to CMR_GRANT CMR_LIST = CHAR2CONST('R', 'L', 's', 't'), CMR_GRANT = CHAR2CONST('R', 'G', 'n', 't'), CMR_REVOKE = CHAR2CONST('R', 'R', 'v', 'k'), /* Raft */ OP_REQ_VOTE = CHAR2CONST('R', 'V', 'o', 't'), OP_VOTE_FOR = CHAR2CONST('V', 't', 'F', 'r'), OP_HEARTBEAT= CHAR2CONST('H', 'r', 't', 'B'), /* AppendEntry in Raft */ OP_MY_INDEX = CHAR2CONST('M', 'I', 'd', 'x'), /* Answer to Heartbeat */ OP_REJECTED = CHAR2CONST('R', 'J', 'C', '!'), } cmd_request_t; /* TODO: make readable constants */ typedef enum { /* for compatibility with other functions */ RLT_SUCCESS = 0, RLT_ASYNC = CHAR2CONST('A', 's', 'y', 'n'), RLT_SYNC_SUCC = CHAR2CONST('S', 'c', 'c', 's'), RLT_SYNC_FAIL = CHAR2CONST('F', 'a', 'i', 'l'), RLT_INVALID_ARG = CHAR2CONST('I', 'A', 'r', 'g'), RLT_OVERGRANT = CHAR2CONST('O', 'v', 'e', 'r'), RLT_PROBABLY_SUCCESS = CHAR2CONST('S', 'u', 'c', '?'), RLT_BUSY = CHAR2CONST('B', 'u', 's', 'y'), - RLT_TERM_OUTDATED = CHAR2CONST('t', 'O', 'd', 'a'), + RLT_TERM_OUTDATED = CHAR2CONST('T', 'O', 'd', 't'), + RLT_TERM_STILL_VALID = CHAR2CONST('T', 'V', 'l', 'd'), } cmd_result_t; /** @} */ /** @{ */ struct booth_site { /** Calculated ID. See add_site(). */ int site_id; int type; int local; /** Roles, like ACCEPTOR, PROPOSER, or LEARNER. Not really used ATM. */ int role; char addr_string[BOOTH_NAME_LEN]; int tcp_fd; int udp_fd; /* 0-based, used for indexing into per-ticket weights */ int index; uint64_t bitmask; unsigned short family; union { struct sockaddr_in sa4; struct sockaddr_in6 sa6; }; int saddrlen; int addrlen; } __attribute__((packed)); extern struct booth_site *local; /** @} */ struct booth_transport; struct client { int fd; const struct booth_transport *transport; void (*workfn)(int); void (*deadfn)(int); }; extern struct client *clients; extern struct pollfd *pollfds; int client_add(int fd, const struct booth_transport *tpt, void (*workfn)(int ci), void (*deadfn)(int ci)); int do_read(int fd, void *buf, size_t count); int do_write(int fd, void *buf, size_t count); void process_connection(int ci); void safe_copy(char *dest, char *value, size_t buflen, const char *description); struct command_line { int type; /* ACT_ */ int op; /* OP_ */ char configfile[BOOTH_PATH_LEN]; char lockfile[BOOTH_PATH_LEN]; char site[BOOTH_NAME_LEN]; struct boothc_ticket_msg msg; }; extern struct command_line cl; + + + +/* http://gcc.gnu.org/onlinedocs/gcc/Typeof.html */ +#define min(a__,b__) \ + ({ typeof (a__) _a = (a__); \ + typeof (b__) _b = (b__); \ + _a < _b ? _a : _b; }) +#define max(a__,b__) \ + ({ typeof (a__) _a = (a__); \ + typeof (b__) _b = (b__); \ + _a > _b ? _a : _b; }) + + + + + #endif /* _BOOTH_H */ diff --git a/src/config.c b/src/config.c index a88df22..1210ce0 100644 --- a/src/config.c +++ b/src/config.c @@ -1,711 +1,710 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include "booth.h" #include "config.h" #include "raft.h" #include "ticket.h" #include "log.h" static int ticket_size = 0; static int ticket_realloc(void) { const int added = 5; int had, want; void *p; had = booth_conf->ticket_allocated; want = had + added; p = realloc(booth_conf->ticket, sizeof(struct ticket_config) * want); if (!booth_conf) { log_error("can't alloc more tickets"); return -ENOMEM; } booth_conf->ticket = p; memset(booth_conf->ticket + had, 0, sizeof(struct ticket_config) * added); booth_conf->ticket_allocated = want; return 0; } int add_site(char *address, int type); int add_site(char *addr_string, int type) { int rv; struct booth_site *site; uLong nid; uint32_t mask; rv = 1; if (booth_conf->site_count == MAX_NODES) { log_error("too many nodes"); goto out; } if (strlen(addr_string)+1 >= sizeof(booth_conf->site[0].addr_string)) { log_error("site address \"%s\" too long", addr_string); goto out; } site = booth_conf->site + booth_conf->site_count; site->family = BOOTH_PROTO_FAMILY; site->type = type; /* Make site_id start at a non-zero point. * Perhaps use hash over string or address? */ strcpy(site->addr_string, addr_string); nid = crc32(0L, NULL, 0); /* booth_config() uses memset(), so sizeof() is guaranteed to give * the same result everywhere - no uninitialized bytes. */ site->site_id = crc32(nid, site->addr_string, sizeof(site->addr_string)); /* Make sure we will never collide with NO_ONE, * or be negative (to get "get_local_id() < 0" working). */ mask = 1 << (sizeof(site->site_id)*8 -1); assert(NO_ONE & mask); site->site_id &= ~mask; site->index = booth_conf->site_count; site->bitmask = 1 << booth_conf->site_count; /* Catch site overflow */ assert(site->bitmask); booth_conf->site_bits |= site->bitmask; site->tcp_fd = -1; booth_conf->site_count++; rv = 0; memset(&site->sa6, 0, sizeof(site->sa6)); if (inet_pton(AF_INET, site->addr_string, &site->sa4.sin_addr) > 0) { site->family = AF_INET; site->sa4.sin_family = site->family; site->sa4.sin_port = htons(booth_conf->port); site->saddrlen = sizeof(site->sa4); site->addrlen = sizeof(site->sa4.sin_addr); } else if (inet_pton(AF_INET6, site->addr_string, &site->sa6.sin6_addr) > 0) { site->family = AF_INET6; site->sa6.sin6_family = site->family; site->sa6.sin6_flowinfo = 0; site->sa6.sin6_port = htons(booth_conf->port); site->saddrlen = sizeof(site->sa6); site->addrlen = sizeof(site->sa6.sin6_addr); } else { log_error("Address string \"%s\" is bad", site->addr_string); rv = EINVAL; } out: return rv; } inline static char *skip_while_in(const char *cp, int (*fn)(int), const char *allowed) { /* strchr() returns a pointer to the terminator if *cp == 0. */ while (*cp && (fn(*cp) || strchr(allowed, *cp))) cp++; /* discard "const" qualifier */ return (char*)cp; } inline static char *skip_while(char *cp, int (*fn)(int)) { while (fn(*cp)) cp++; return cp; } inline static char *skip_until(char *cp, char expected) { while (*cp && *cp != expected) cp++; return cp; } static inline int is_end_of_line(char *cp) { char c = *cp; return c == '\n' || c == 0 || c == '#'; } static int add_ticket(const char *name, struct ticket_config **tkp, const struct ticket_config *def) { int rv; struct ticket_config *tk; if (booth_conf->ticket_count == booth_conf->ticket_allocated) { rv = ticket_realloc(); if (rv < 0) return rv; } tk = booth_conf->ticket + booth_conf->ticket_count; booth_conf->ticket_count++; if (!check_max_len_valid(name, sizeof(tk->name))) { log_error("ticket name \"%s\" too long.", name); return -EINVAL; } if (find_ticket_by_name(name, NULL)) { log_error("ticket name \"%s\" used again.", name); return -EINVAL; } if (* skip_while_in(name, isalnum, "-/")) { log_error("ticket name \"%s\" invalid; only alphanumeric names.", name); return -EINVAL; } strcpy(tk->name, name); tk->timeout = def->timeout; tk->term_duration = def->term_duration; tk->retries = def->retries; memcpy(tk->weight, def->weight, sizeof(tk->weight)); - tk->state = ST_INIT; if (tkp) *tkp = tk; return 0; } /* returns number of weights, or -1 on bad input. */ static int parse_weights(const char *input, int weights[MAX_NODES]) { int i, v; char *cp; for(i=0; iproto = UDP; booth_conf->port = BOOTH_DEFAULT_PORT; /* Provide safe defaults. -1 is reserved, though. */ booth_conf->uid = -2; booth_conf->gid = -2; strcpy(booth_conf->site_user, "hacluster"); strcpy(booth_conf->site_group, "haclient"); strcpy(booth_conf->arb_user, "nobody"); strcpy(booth_conf->arb_group, "nobody"); parse_weights("", defaults.weight); defaults.ext_verifier = NULL; defaults.term_duration = DEFAULT_TICKET_EXPIRY; defaults.timeout = DEFAULT_TICKET_TIMEOUT; defaults.retries = DEFAULT_RETRIES; defaults.acquire_after = 0; error = ""; log_debug("reading config file %s", path); while (fgets(line, sizeof(line), fp)) { lineno++; s = skip_while(line, isspace); if (is_end_of_line(s)) continue; key = s; /* Key */ end_of_key = skip_while_in(key, isalnum, "-_"); if (end_of_key == key) { error = "No key"; goto err; } if (!*end_of_key) goto exp_equal; /* whitespace, and something else but nothing more? */ s = skip_while(end_of_key, isspace); if (*s != '=') { exp_equal: error = "Expected '=' after key"; goto err; } s++; /* It's my buffer, and I terminate if I want to. */ /* But not earlier than that, because we had to check for = */ *end_of_key = 0; /* Value tokenizing */ s = skip_while(s, isspace); switch (*s) { case '"': case '\'': val = s+1; s = skip_until(val, *s); /* Terminate value */ if (!*s) { error = "Unterminated quoted string"; goto err; } /* Remove and skip quote */ *s = 0; s++; if (* skip_while(s, isspace)) { error = "Surplus data after value"; goto err; } *s = 0; break; case 0: no_value: error = "No value"; goto err; break; default: val = s; /* Rest of line. */ i = strlen(s); /* i > 0 because of "case 0" above. */ while (i > 0 && isspace(s[i-1])) i--; s += i; *s = 0; } if (val == s) goto no_value; if (strlen(key) > BOOTH_NAME_LEN || strlen(val) > BOOTH_NAME_LEN) { error = "key/value too long"; goto err; } if (strcmp(key, "transport") == 0) { if (got_transport) { error = "config file has multiple transport lines"; goto err; } if (strcasecmp(val, "UDP") == 0) booth_conf->proto = UDP; else if (strcasecmp(val, "SCTP") == 0) booth_conf->proto = SCTP; else { error = "invalid transport protocol"; goto err; } got_transport = 1; continue; } if (strcmp(key, "port") == 0) { booth_conf->port = atoi(val); continue; } if (strcmp(key, "name") == 0) { safe_copy(booth_conf->name, val, BOOTH_NAME_LEN, "name"); continue; } if (strcmp(key, "site") == 0) { if (add_site(val, SITE)) goto out; continue; } if (strcmp(key, "arbitrator") == 0) { if (add_site(val, ARBITRATOR)) goto out; continue; } if (strcmp(key, "ticket") == 0) { if (add_ticket(val, &last_ticket, &defaults)) goto out; /* last_ticket is valid until another one is needed - * and then it already has the new address and * is valid again. */ continue; } if (strcmp(key, "expire") == 0) { defaults.term_duration = strtol(val, &s, 0); if (*s || s == val || defaults.term_duration<10) { error = "Expected plain integer value >=10 for expire"; goto err; } if (last_ticket) last_ticket->term_duration = defaults.term_duration; continue; } if (strcmp(key, "site-user") == 0) { safe_copy(booth_conf->site_user, optarg, BOOTH_NAME_LEN, "site-user"); continue; } if (strcmp(key, "site-group") == 0) { safe_copy(booth_conf->site_group, optarg, BOOTH_NAME_LEN, "site-group"); continue; } if (strcmp(key, "arbitrator-user") == 0) { safe_copy(booth_conf->arb_user, optarg, BOOTH_NAME_LEN, "arbitrator-user"); continue; } if (strcmp(key, "arbitrator-group") == 0) { safe_copy(booth_conf->arb_group, optarg, BOOTH_NAME_LEN, "arbitrator-group"); continue; } if (strcmp(key, "timeout") == 0) { defaults.timeout = strtol(val, &s, 0); if (*s || s == val || defaults.timeout<1) { error = "Expected plain integer value >=1 for timeout"; goto err; } if (last_ticket) last_ticket->timeout = defaults.timeout; continue; } if (strcmp(key, "retries") == 0) { defaults.retries = strtol(val, &s, 0); if (*s || s == val || defaults.retries<3 || defaults.retries > 100) { error = "Expected plain integer value in the range [3, 100] for retries"; goto err; } if (last_ticket) last_ticket->retries = defaults.retries; continue; } if (strcmp(key, "acquire-after") == 0) { defaults.acquire_after = strtol(val, &s, 0); if (*s || s == val || defaults.acquire_after<0) { error = "Expected plain integer value >=1 for acquire-after"; goto err; } if (last_ticket) last_ticket->acquire_after = defaults.acquire_after; continue; } if (strcmp(key, "before-acquire-handler") == 0) { defaults.ext_verifier = strdup(val); if (*s || s == val || defaults.timeout<1) { error = "Expected plain integer value >=1 for timeout"; goto err; } if (last_ticket) last_ticket->ext_verifier = defaults.ext_verifier; continue; } if (strcmp(key, "weights") == 0) { if (parse_weights(val, defaults.weight) < 0) goto out; if (last_ticket) memcpy(last_ticket->weight, defaults.weight, sizeof(last_ticket->weight)); continue; } error = "Unknown item"; goto out; } if ((booth_conf->site_count % 2) == 0) { log_warn("An odd number of nodes is strongly recommended!"); } /* Default: make config name match config filename. */ if (!booth_conf->name[0]) { cp = strrchr(path, '/'); if (!cp) cp = path; /* TODO: locale? */ /* NUL-termination by memset. */ for(i=0; iname[i] = *(cp++); /* Last resort. */ if (!booth_conf->name[0]) strcpy(booth_conf->name, "booth"); } return 0; err: out: log_error("%s in config file line %d", error, lineno); free(booth_conf); booth_conf = NULL; return -1; } int check_config(int type) { struct passwd *pw; struct group *gr; char *cp, *input; if (!booth_conf) return -1; input = (type == ARBITRATOR) ? booth_conf->arb_user : booth_conf->site_user; if (!*input) goto u_inval; if (isdigit(input[0])) { booth_conf->uid = strtol(input, &cp, 0); if (*cp != 0) { u_inval: log_error("User \"%s\" cannot be resolved into a UID.", input); return ENOENT; } } else { pw = getpwnam(input); if (!pw) goto u_inval; booth_conf->uid = pw->pw_uid; } input = (type == ARBITRATOR) ? booth_conf->arb_group : booth_conf->site_group; if (!*input) goto g_inval; if (isdigit(input[0])) { booth_conf->gid = strtol(input, &cp, 0); if (*cp != 0) { g_inval: log_error("Group \"%s\" cannot be resolved into a UID.", input); return ENOENT; } } else { gr = getgrnam(input); if (!gr) goto g_inval; booth_conf->gid = gr->gr_gid; } /* TODO: check whether uid or gid is 0 again? * The admin may shoot himself in the foot, though. */ return 0; } int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type) { struct booth_site *n; int i; if (!booth_conf) return 0; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if ((n->type == SITE || any_type) && strcmp(n->addr_string, site) == 0) { *node = n; return 1; } } return 0; } int find_site_by_id(uint32_t site_id, struct booth_site **node) { struct booth_site *n; int i; if (site_id == NO_ONE) { *node = NULL; return 1; } if (!booth_conf) return 0; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if (n->site_id == site_id) { *node = n; return 1; } } return 0; } const char *type_to_string(int type) { switch (type) { case ARBITRATOR: return "arbitrator"; case SITE: return "site"; case CLIENT: return "client"; } return "??invalid-type??"; } diff --git a/src/config.h b/src/config.h index 5806442..a66a988 100644 --- a/src/config.h +++ b/src/config.h @@ -1,174 +1,176 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _CONFIG_H #define _CONFIG_H #include #include "booth.h" #include "raft.h" #include "transport.h" /** @{ */ /** Definitions for in-RAM data. */ #define MAX_NODES 16 #define TICKET_ALLOC 16 struct ticket_config { /** \name Configuration items. * @{ */ /** Name of ticket. */ boothc_ticket name; /** How many seconds a term lasts (if not refreshed). */ int term_duration; /** Network related timeouts. */ int timeout; /** Retries before giving up. */ int retries; /** If >0, time to wait for a site to get fenced. * The ticket may be acquired after that timespan by * another site. */ int acquire_after; /* TODO: needed? */ -#if 0 -#endif /* Program to ask whether it makes sense to * acquire the ticket */ char *ext_verifier; /** Node weights. */ int weight[MAX_NODES]; /** @} */ /** \name Runtime values. * @{ */ /** Current state. */ server_state_e state; /** When something has to be done */ struct timeval next_cron; /** Current leader. This is effectively the log[] in Raft. */ struct booth_site *leader; /** Timestamp of leadership expiration */ time_t term_expires; /** End of election period */ time_t election_end; struct booth_site *voted_for; /** Who the various sites vote for. * NO_OWNER = no vote yet. */ struct booth_site *votes_for[MAX_NODES]; /* bitmap */ uint64_t votes_received; /** Last voting round that was seen. */ uint32_t current_term; /** @} */ /** */ uint32_t commit_index; /** */ uint32_t last_applied; uint32_t next_index[MAX_NODES]; uint32_t match_index[MAX_NODES]; /** \name Needed while proposals are being done. * @{ */ /** Whom to vote for the next time. * Needed to push a ticket to someone else. */ #if 0 /** Bitmap of sites that acknowledge that state. */ uint64_t proposal_acknowledges; /** When an incompletely acknowledged proposal gets done. * If all peers agree, that happens sooner. * See switch_state_to(). */ struct timeval proposal_switch; /** Timestamp of proposal expiration. */ time_t proposal_expires; #endif /** Number of send retries left. * Used on the new owner. * Starts at 0, counts up. */ int retry_number; /** @} */ }; struct booth_config { char name[BOOTH_NAME_LEN]; transport_layer_t proto; uint16_t port; /** Stores the OR of the individual host bitmasks. */ uint64_t site_bits; char site_user[BOOTH_NAME_LEN]; char site_group[BOOTH_NAME_LEN]; char arb_user[BOOTH_NAME_LEN]; char arb_group[BOOTH_NAME_LEN]; uid_t uid; gid_t gid; int site_count; struct booth_site site[MAX_NODES]; int ticket_count; int ticket_allocated; struct ticket_config *ticket; }; extern struct booth_config *booth_conf; int read_config(const char *path); int check_config(int type); int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type); int find_site_by_id(uint32_t site_id, struct booth_site **node); const char *type_to_string(int type); +#include +#define R(tk_) printf("## %12s:%3d state %s, term %d, index %d, leader %s\n", __FILE__, __LINE__, state_to_string(tk_->state), tk_->current_term, tk_->commit_index, site_string(tk_->leader)) + + #endif /* _CONFIG_H */ diff --git a/src/raft.c b/src/raft.c index 3d29e1a..a0ac592 100644 --- a/src/raft.c +++ b/src/raft.c @@ -1,290 +1,389 @@ /* * Copyright (C) 2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include "booth.h" #include "transport.h" #include "inline-fn.h" #include "config.h" #include "raft.h" #include "ticket.h" #include "log.h" inline static void clear_election(struct ticket_config *tk) { int i; struct booth_site *site; log_info("clear election"); tk->votes_received = 0; foreach_node(i, site) tk->votes_for[site->index] = NULL; } inline static void site_voted_for(struct ticket_config *tk, struct booth_site *who, struct booth_site *vote) { log_info("site \"%s\" votes for \"%s\"", who->addr_string, vote->addr_string); if (!tk->votes_for[who->index]) { tk->votes_for[who->index] = vote; tk->votes_received |= who->bitmask; } else { if (tk->votes_for[who->index] != vote) log_error("voted previously (but in same term!) for \"%s\"...", tk->votes_for[who->index]->addr_string); } } +static void become_follower(struct ticket_config *tk, + struct boothc_ticket_msg *msg) +{ + uint32_t i; + int duration; + + tk->state = ST_FOLLOWER; + + + duration = tk->term_duration; + if (msg) + duration = min(duration, ntohl(msg->ticket.term_valid_for)); + tk->term_expires = time(NULL) + duration; + + + if (msg) { + i = ntohl(msg->ticket.term); + tk->current_term = max(i, tk->current_term); + + /* § 5.3 */ + i = ntohl(msg->ticket.leader_commit); + tk->commit_index = max(i, tk->commit_index); + } + + + ticket_write(tk); +} + + static struct booth_site *majority_votes(struct ticket_config *tk) { int i, n; struct booth_site *v; int count[MAX_NODES] = { 0, }; for(i=0; isite_count; i++) { v = tk->votes_for[i]; if (!v) continue; n = v->index; count[n]++; log_info("Majority: %d \"%s\" wants %d \"%s\" => %d", i, booth_conf->site[i].addr_string, n, v->addr_string, count[n]); if (count[n]*2 <= booth_conf->site_count) continue; log_info("Majority reached: %d of %d for \"%s\"", count[n], booth_conf->site_count, v->addr_string); return v; } return NULL; } static int answer_HEARTBEAT ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; - uint32_t index; term = ntohl(msg->ticket.term); log_debug("leader: %s, have %s; term %d vs %d", site_string(leader), ticket_leader_string(tk), term, tk->current_term); if (term < tk->current_term) return 0; //send_reject(sender, tk, RLT_TERM_OUTDATED); - /* § 5.3 */ - index = ntohl(msg->ticket.leader_commit); - if (index > tk->commit_index) - tk->commit_index = index; - - assert(tk->leader == leader); + become_follower(tk, msg); + assert(sender == leader); + tk->leader = leader; return 0; } static int process_VOTE_FOR( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; struct booth_site *new_leader; term = ntohl(msg->ticket.term); if (term < tk->current_term) return send_reject(sender, tk, RLT_TERM_OUTDATED); + + if (term == tk->current_term && + tk->election_end < time(NULL)) { + /* Election already ended - either by time or majority. + * Ignore. */ + return 0; + } + + if (term > tk->current_term) clear_election(tk); site_voted_for(tk, sender, leader); /* §5.2 */ new_leader = majority_votes(tk); if (new_leader) { tk->leader = new_leader; + tk->term_expires = time(NULL) + tk->term_duration; + tk->election_end = 0; + tk->voted_for = NULL; + if ( new_leader == local) { - tk->current_term++; + tk->commit_index++; // ?? tk->state = ST_LEADER; send_heartbeat(tk); - tk->voted_for = NULL; + ticket_write(tk); } else - tk->state = ST_FOLLOWER; - + become_follower(tk, NULL); } set_ticket_wakeup(tk); return 0; } static int process_REJECTED( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { + uint32_t rv; + + rv = ntohl(msg->header.result); + + if (tk->state == ST_CANDIDATE && + rv == RLT_TERM_OUTDATED) { + log_info("Am out of date, become follower."); + tk->leader = leader; + become_follower(tk, msg); + return 0; + } + + + if (tk->state == ST_CANDIDATE && + rv == RLT_TERM_STILL_VALID) { + log_error("There's a leader that I don't see: \"%s\"", + site_string(leader)); + tk->leader = leader; + become_follower(tk, msg); + return 0; + } + + log_error("unhandled reject: in state %s, got %s.", + state_to_string(tk->state), + state_to_string(rv)); + tk->leader = leader; + become_follower(tk, msg); return 0; } /* §5.2 */ static int answer_REQ_VOTE( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; + int valid; struct boothc_ticket_msg omsg; term = ntohl(msg->ticket.term); /* §5.1 */ if (term < tk->current_term) + { + log_info("sending REJECT, term too low."); return send_reject(sender, tk, RLT_TERM_OUTDATED); + } + + + /* This if() should trigger more or less always, as + * OP_REQ_VOTE *starts* an election. + * if (tk->election_end < time(NULL)) + */ + valid = term_valid_for(tk); + if (valid) { + log_debug("no election allowed, term valid for %d??", valid); + return send_reject(sender, tk, RLT_TERM_STILL_VALID); + } /* §5.2, §5.4 */ if (!tk->voted_for && ntohl(msg->ticket.last_log_index) >= tk->last_applied) { tk->voted_for = sender; site_voted_for(tk, sender, leader); goto yes_you_can; } yes_you_can: init_ticket_msg(&omsg, OP_VOTE_FOR, RLT_SUCCESS, tk); omsg.ticket.leader = htonl(get_node_id(tk->voted_for)); return transport()->broadcast(&omsg, sizeof(omsg)); } int new_election(struct ticket_config *tk, struct booth_site *preference) { struct booth_site *new_leader; time_t now; time(&now); log_debug("start new election?, now=%" PRIi64 ", end %" PRIi64, now, tk->election_end); if (now <= tk->election_end) return 0; /* §5.2 */ tk->current_term++; + tk->term_expires = 0; tk->election_end = now + tk->term_duration; log_debug("start new election! term=%d, until %" PRIi64, tk->current_term, tk->election_end); clear_election(tk); if(preference) new_leader = preference; else new_leader = (local->type == SITE) ? local : NULL; site_voted_for(tk, local, new_leader); tk->voted_for = new_leader; tk->state = ST_CANDIDATE; ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS); return 0; } int raft_answer( struct ticket_config *tk, struct booth_site *from, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int cmd; uint32_t term; + int rv; cmd = ntohl(msg->header.cmd); term = ntohl(msg->ticket.term); + R(tk); log_debug("got message %s from \"%s\", term %d vs. %d", state_to_string(cmd), from->addr_string, term, tk->current_term); + + if (cmd == OP_REJECTED) { + R(tk); + rv = process_REJECTED(tk, from, leader, msg); + R(tk); + return (rv); + } + + /* §5.1 */ if (term > tk->current_term) { tk->state = ST_FOLLOWER; tk->current_term = term; tk->leader = leader; log_info("higher term %d vs. %d, following \"%s\"", term, tk->current_term, ticket_leader_string(tk)); } + R(tk); switch (cmd) { case OP_REQ_VOTE: - return answer_REQ_VOTE (tk, from, leader, msg); + rv = answer_REQ_VOTE (tk, from, leader, msg); + break; case OP_VOTE_FOR: - return process_VOTE_FOR(tk, from, leader, msg); + rv = process_VOTE_FOR(tk, from, leader, msg); + break; case OP_HEARTBEAT: - return answer_HEARTBEAT(tk, from, leader, msg); + rv = answer_HEARTBEAT(tk, from, leader, msg); + break; case OP_REJECTED: - return process_REJECTED(tk, from, leader, msg); + assert(!"here"); + break; + default: + log_error("unprocessed message, cmd %x", cmd); + rv = -EINVAL; } - log_error("unprocessed message, cmd %x", cmd); - return -EINVAL; + R(tk); + return rv; } diff --git a/src/ticket.c b/src/ticket.c index 16aaf72..e596183 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -1,818 +1,668 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include "ticket.h" #include "config.h" #include "pacemaker.h" #include "inline-fn.h" #include "log.h" #include "booth.h" #include "raft.h" #include "handler.h" #define TK_LINE 256 /* Untrusted input, must fit (incl. \0) in a buffer of max chars. */ int check_max_len_valid(const char *s, int max) { int i; for(i=0; iticket_count; i++) { if (!strcmp(booth_conf->ticket[i].name, ticket)) { if (found) *found = booth_conf->ticket + i; return 1; } } return 0; } int check_ticket(char *ticket, struct ticket_config **found) { if (found) *found = NULL; if (!booth_conf) return 0; if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name))) return 0; return find_ticket_by_name(ticket, found); } int check_site(char *site, int *is_local) { struct booth_site *node; if (!check_max_len_valid(site, sizeof(node->addr_string))) return 0; if (find_site_by_name(site, &node, 0)) { *is_local = node->local; return 1; } return 0; } #if 0 /** Find out what others think about this ticket. * * If we're a SITE, we can ask (and have to tell) Pacemaker. * An ARBITRATOR can only ask others. */ static int ticket_send_catchup(struct ticket_config *tk) { int i, rv = 0; struct booth_site *site; struct boothc_ticket_msg msg; foreach_node(i, site) { if (!site->local) { init_ticket_msg(&msg, CMD_CATCHUP, RLT_SUCCESS, tk); log_debug("attempting catchup from %s", site->addr_string); rv = booth_udp_send(site, &msg, sizeof(msg)); } } ticket_activate_timeout(tk); return rv; } #endif int ticket_write(struct ticket_config *tk) { if (local->type != SITE) return -EINVAL; disown_if_expired(tk); if (tk->leader == local) { pcmk_handler.grant_ticket(tk); } else { pcmk_handler.revoke_ticket(tk); } return 0; } /* Ask an external program whether getting the ticket * makes sense. * Eg. if the services have a failcount of INFINITY, * we can't serve here anyway. */ int get_ticket_locally_if_allowed(struct ticket_config *tk) { int rv; if (!tk->ext_verifier) goto get_it; rv = run_handler(tk, tk->ext_verifier, 1); if (rv) { log_error("May not acquire ticket."); /* Give it to somebody else. * Just send a commit message, as the * others couldn't help anyway. */ if (leader_and_valid(tk)) { disown_ticket(tk); #if 0 tk->proposed_owner = NULL; /* Just go one further - others may easily override. */ tk->new_ballot++; ticket_broadcast_proposed_state(tk, OP_COMMITTED); tk->state = ST_STABLE; #endif ticket_broadcast(tk, OP_VOTE_FOR, RLT_SUCCESS); } return rv; } else { log_info("May keep ticket."); } get_it: if (leader_and_valid(tk)) { return send_heartbeat(tk); } else { new_election(tk, local); return ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS); } #if 0 return paxos_start_round(tk, local); #endif } /** Try to get the ticket for the local site. * */ int do_grant_ticket(struct ticket_config *tk) { int rv; if (tk->leader == local) return RLT_SUCCESS; if (tk->leader) return RLT_OVERGRANT; rv = get_ticket_locally_if_allowed(tk); return rv; } /** Start a PAXOS round for revoking. * That can be started from any site. */ int do_revoke_ticket(struct ticket_config *tk) { int rv; if (!tk->leader) return RLT_SUCCESS; disown_ticket(tk); ticket_write(tk); return ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS); #if 0 rv = paxos_start_round(tk, NULL); #endif return rv; } int list_ticket(char **pdata, unsigned int *len) { struct ticket_config *tk; char timeout_str[64]; char *data, *cp; int i, alloc; *pdata = NULL; *len = 0; alloc = 256 + booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128); data = malloc(alloc); if (!data) return -ENOMEM; cp = data; foreach_ticket(i, tk) { if (tk->term_expires != 0) strftime(timeout_str, sizeof(timeout_str), "%F %T", localtime(&tk->term_expires)); else strcpy(timeout_str, "INF"); cp += sprintf(cp, "ticket: %s, leader: %s, expires: %s, commit: %d\n", tk->name, ticket_leader_string(tk), timeout_str, tk->commit_index); *len = cp - data; assert(*len < alloc); } *pdata = data; return 0; } int setup_ticket(void) { struct ticket_config *tk; int i; /* TODO */ foreach_ticket(i, tk) { tk->leader = NULL; tk->term_expires = 0; -// abort_proposal(tk); + // abort_proposal(tk); if (local->type == SITE) { pcmk_handler.load_ticket(tk); } + + /* There might be a leader; wait for its notification. */ + tk->term_expires = time(NULL) + tk->term_duration; + tk->state = ST_FOLLOWER; } return 0; } int ticket_answer_list(int fd, struct boothc_ticket_msg *msg) { char *data; int olen, rv; struct boothc_header hdr; rv = list_ticket(&data, &olen); if (rv < 0) return rv; init_header(&hdr, CMR_LIST, RLT_SUCCESS, sizeof(hdr) + olen); return send_header_plus(fd, &hdr, data, olen); } int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { log_error("Client asked to grant unknown ticket"); rv = RLT_INVALID_ARG; goto reply; } if (tk->leader) { log_error("client wants to get an (already granted!) ticket \"%s\"", msg->ticket.id); rv = RLT_OVERGRANT; goto reply; } rv = do_grant_ticket(tk); reply: init_header(&msg->header, CMR_GRANT, rv ?: RLT_ASYNC, sizeof(*msg)); return send_ticket_msg(fd, msg); } int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { log_error("Client asked to grant unknown ticket"); rv = RLT_INVALID_ARG; goto reply; } if (!tk->leader) { log_info("client wants to revoke a free ticket \"%s\"", msg->ticket.id); /* Return a different result code? */ rv = RLT_SUCCESS; goto reply; } rv = do_revoke_ticket(tk); if (rv == 0) rv = RLT_ASYNC; reply: init_ticket_msg(msg, CMR_REVOKE, rv, tk); return send_ticket_msg(fd, msg); } -#if 0 -/** Got a CMD_CATCHUP query. - * In this file because it's mostly used during startup. */ -static int ticket_answer_catchup( - struct ticket_config *tk, - struct booth_site *from, - struct boothc_ticket_msg *msg, - uint32_t ballot, - struct booth_site *new_owner) -{ - int rv; - - - log_debug("got CATCHUP query for \"%s\" from %s", - msg->ticket.id, from->addr_string); - - /* We do _always_ answer. - * In case all booth daemons are restarted at the same time, nobody - * would answer any questions, leading to timeouts and delays. - * Just admit we don't know. */ - - rv = (tk->state == ST_INIT) ? - RLT_PROBABLY_SUCCESS : RLT_SUCCESS; - - init_ticket_msg(msg, CMR_CATCHUP, rv, tk); - - /* On catchup, don't tell about ongoing proposals; - * if we did, the other site might believe that the - * ballot numbers have already been used. - * Send the known ballot number, so that a PREPARE - * gets accepted. */ - msg->ticket.ballot = msg->ticket.prev_ballot; - - return booth_udp_send(from, msg, sizeof(*msg)); -} - - -/** Got a CMR_CATCHUP message. - * Gets handled here because it's not PAXOS per se, - * but only needed during startup. */ -static int ticket_process_catchup( - struct ticket_config *tk, - struct booth_site *from, - struct boothc_ticket_msg *msg, - uint32_t ballot, - struct booth_site *new_owner) -{ - int rv; - uint32_t prev_ballot; - time_t peer_expiry; - - - log_info("got CATCHUP answer for \"%s\" from %s; says owner %s with ballot %d", - tk->name, from->addr_string, - ticket_leader_string(new_owner), ballot); - prev_ballot = ntohl(msg->ticket.prev_ballot); - - rv = ntohl(msg->header.result); - if (rv != RLT_SUCCESS && - rv != RLT_PROBABLY_SUCCESS) { - log_error("dropped because of wrong rv: 0x%x", rv); - return -EINVAL; - } - - if (ballot == tk->new_ballot && - ballot == tk->last_ack_ballot && - new_owner == tk->owner) { - /* Peer says the same thing we're believing. */ - tk->proposal_acknowledges |= from->bitmask | local->bitmask; - tk->expires = ntohl(msg->ticket.expiry) + time(NULL); - - if (should_switch_state_p(tk)) { - if (tk->state == ST_INIT) - tk->state = ST_STABLE; - } - - disown_if_expired(tk); - log_debug("catchup: peer ack 0x%" PRIx64 ", now state '%s'", - tk->proposal_acknowledges, - state_to_string(tk->state)); - goto ex; - } - - - if (ticket_valid_for(tk) == 0 && !tk->owner) { - /* We see the ticket as expired, and therefore don't know an owner. - * So believe some other host. */ - tk->state = ST_STABLE; - log_debug("catchup: no owner locally, believe peer."); - goto accept; - } - - - if (ballot >= tk->new_ballot && - ballot >= tk->last_ack_ballot && - rv == RLT_SUCCESS) { - /* Peers seems to know better, but as yet we only have _her_ - * word for that. */ - log_debug("catchup: peer has higher ballot: %d >= %d/%d", - ballot, tk->new_ballot, tk->last_ack_ballot); - -accept: - peer_expiry = ntohl(msg->ticket.expiry) + time(NULL); - tk->expires = (tk->expires > peer_expiry) ? - tk->expires : peer_expiry; - tk->new_ballot = ballot_max2(ballot, tk->new_ballot); - tk->last_ack_ballot = ballot_max2(prev_ballot, tk->last_ack_ballot); - tk->owner = new_owner; - tk->proposal_acknowledges = from->bitmask; - - /* We stay in ST_INIT and wait for confirmation. */ - goto ex; - } - - - if (ballot >= tk->last_ack_ballot && - rv == RLT_PROBABLY_SUCCESS && - tk->state == ST_INIT && - tk->retry_number > 3) { - /* Peer seems to know better than us, and there's no - * convincing other report. Just take it. */ - tk->state = ST_STABLE; - log_debug("catchup: exceeded retries, peer has higher ballot."); - goto accept; - } - - - if (ballot < tk->new_ballot || - ballot < tk->last_ack_ballot) { - /* Peer seems outdated ... tell it to reload? */ - log_debug("catchup: peer outdated?"); -#if 0 - init_ticket_msg(&msg, CMD_DO_CATCHUP, RLT_SUCCESS, tk, &tk->current_state); -#endif - goto ex; - } - - - if (ballot >= tk->last_ack_ballot && - local->type == SITE && - new_owner == tk->owner) { - /* We've got some information (local Pacemaker?), and a peer - * says same owner, with same or higher ballot number. */ - log_debug("catchup: peer agrees about owner."); - goto ex; - } - - log_debug("catchup: unhandled situation!"); - -ex: - ticket_write(tk); - - if (tk->state == ST_STABLE) { - /* If we believe to have enough information, we can try to - * acquire the ticket (again). */ - time(&tk->expires); - } - - /* Allow further actions. */ - ticket_activate_timeout(tk); - - return 0; -} -#endif - - int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_result_t res) { struct boothc_ticket_msg msg; init_ticket_msg(&msg, cmd, res, tk); log_debug("broadcasting '%s' for ticket \"%s\"", state_to_string(cmd), tk->name); return transport()->broadcast(&msg, sizeof(msg)); } #if 0 /** Send new state request to all sites. * Perhaps this should take a flag for ACCEPTOR etc.? * No need currently, as all nodes are more or less identical. */ int ticket_broadcast_proposed_state(struct ticket_config *tk, cmd_request_t state) { struct boothc_ticket_msg msg; tk->state = state; init_ticket_msg(&msg, state, RLT_SUCCESS, tk); msg.ticket.leader = htonl(get_node_id(tk->proposed_owner)); log_debug("broadcasting '%s' for ticket \"%s\"", state_to_string(state), tk->name); /* Switch state after one second, if the majority says ok. */ gettimeofday(&tk->proposal_switch, NULL); tk->proposal_switch.tv_sec++; return transport()->broadcast(&msg, sizeof(msg)); } #endif static void ticket_cron(struct ticket_config *tk) { time_t now; now = time(NULL); + R(tk); /* Has an owner, has an expiry date, and expiry date in the past? * Losing the ticket must happen in _every_ state. */ if (tk->term_expires && tk->leader && now > tk->term_expires) { log_info("LOST ticket: \"%s\" no longer at %s", tk->name, ticket_leader_string(tk)); /* Couldn't renew in time - ticket lost. */ disown_ticket(tk); /* New vote round; §5.2 */ if (local->type == SITE) new_election(tk, NULL); /* should be "always" that way else tk->state = ST_FOLLOWER; */ // abort_proposal(tk); TODO ticket_write(tk); ticket_activate_timeout(tk); /* May not try to re-acquire now, need to find out * what others think. */ return; } + R(tk); switch(tk->state) { case ST_INIT: /* Unknown state, ask others. */ // ticket_send_catchup(tk); - return; + break; case ST_FOLLOWER: + if (tk->term_expires && + now > tk->term_expires) { + new_election(tk, NULL); + } + break; case ST_CANDIDATE: /* §5.2 */ if (now > tk->election_end) new_election(tk, NULL); - return; + break; case ST_LEADER: tk->term_expires = now + tk->term_duration; - ticket_broadcast(tk, OP_HEARTBEAT, RLT_SUCCESS); + send_heartbeat(tk); + ticket_write(tk); + set_ticket_wakeup(tk); + break; default: break; } + R(tk); } void process_tickets(void) { struct ticket_config *tk; int i; struct timeval now; float sec_until; gettimeofday(&now, NULL); foreach_ticket(i, tk) { sec_until = timeval_to_float(tk->next_cron) - timeval_to_float(now); if (0) log_debug("ticket %s next cron %" PRIx64 ".%03d, " "now %" PRIx64 "%03d, in %f", tk->name, (uint64_t)tk->next_cron.tv_sec, timeval_msec(tk->next_cron), (uint64_t)now.tv_sec, timeval_msec(now), sec_until); if (sec_until > 0.0) continue; log_debug("ticket cron: doing %s", tk->name); /* Set next value, handler may override. * This should already be handled via the state logic; * but to be on the safe side the renew repetition is * duplicated here, too. */ set_ticket_wakeup(tk); ticket_cron(tk); } } void tickets_log_info(void) { struct ticket_config *tk; int i; foreach_ticket(i, tk) { log_info("Ticket %s: state '%s' " "commit index %d " "leader \"%s\" " "expires %-24.24s", tk->name, state_to_string(tk->state), tk->commit_index, ticket_leader_string(tk), ctime(&tk->term_expires)); } } /* UDP message receiver. */ int message_recv(struct boothc_ticket_msg *msg, int msglen) { int rv; uint32_t from; struct booth_site *source; struct ticket_config *tk; struct booth_site *leader; uint32_t leader_u; if (check_boothc_header(&msg->header, sizeof(*msg)) < 0 || msglen != sizeof(*msg)) { log_error("message receive error"); return -1; } from = ntohl(msg->header.from); if (!find_site_by_id(from, &source) || !source) { log_error("unknown sender: %08x", from); return -1; } if (!check_ticket(msg->ticket.id, &tk)) { log_error("got invalid ticket name \"%s\" from %s", msg->ticket.id, source->addr_string); return -EINVAL; } leader_u = ntohl(msg->ticket.leader); if (!find_site_by_id(leader_u, &leader)) { log_error("Message with unknown owner %x received", leader_u); return -EINVAL; } rv = raft_answer(tk, source, leader, msg); #if 0 cmd = ntohl(msg->header.cmd); switch (cmd) { case CMD_CATCHUP: return ticket_answer_catchup(tk, source, msg, ballot, new_owner_p); case CMR_CATCHUP: return ticket_process_catchup(tk, source, msg, ballot, new_owner_p); default: /* only used in catchup, and not even really there ?? */ assert(ntohl(msg->header.result) == 0); rv = raft_answer(tk, source, msg); // TODO assert((tk->proposal_acknowledges & ~booth_conf->site_bits) == 0); return rv; } #endif return rv; } void set_ticket_wakeup(struct ticket_config *tk) { struct timeval tv, now; /* At least every hour, perhaps sooner. */ ticket_next_cron_in(tk, 3600); switch (tk->state) { case ST_LEADER: assert(tk->leader == local); gettimeofday(&now, NULL); tv = now; tv.tv_sec = next_vote_starts_at(tk); /* If timestamp is in the past, look again in one second. */ if (timeval_compare(tv, now) <= 0) tv.tv_sec = now.tv_sec + 1; ticket_next_cron_at(tk, tv); break; case ST_CANDIDATE: assert(tk->election_end); ticket_next_cron_at_coarse(tk, tk->election_end); break; + case ST_INIT: case ST_FOLLOWER: /* If there is (or should be) some owner, check on her later on. * If no one is interested - don't care. */ if ((tk->leader || tk->acquire_after) && (local->type == SITE)) ticket_next_cron_at_coarse(tk, tk->term_expires + tk->acquire_after); break; default: log_error("why here?"); } } /* Given a state (in host byte order), return a human-readable (char*). * An array is used so that multiple states can be printed in a single printf(). */ char *state_to_string(uint32_t state_ho) { union mu { cmd_request_t s; char c[5]; }; static union mu cache[6] = { { 0 } }, *cur; static int current = 0; current ++; if (current >= sizeof(cache)/sizeof(cache[0])) current = 0; cur = cache + current; cur->s = htonl(state_ho); /* Shouldn't be necessary, union array is initialized with zeroes, and * these bytes never get written. */ cur->c[4] = 0; return cur->c; } int send_reject(struct booth_site *dest, struct ticket_config *tk, cmd_result_t code) { struct boothc_ticket_msg msg; init_ticket_msg(&msg, OP_REJECTED, code, tk); return booth_udp_send(dest, &msg, sizeof(msg)); }