diff --git a/src/config.c b/src/config.c index 3b41c37..6166a27 100644 --- a/src/config.c +++ b/src/config.c @@ -1,573 +1,573 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include "booth.h" #include "config.h" #include "paxos.h" #include "ticket.h" #include "log.h" static int ticket_size = 0; static int ticket_realloc(void) { const int added = 5; int had, want; void *p; had = booth_conf->ticket_allocated; want = had + added; p = realloc(booth_conf->ticket, sizeof(struct ticket_config) * want); if (!booth_conf) { log_error("can't alloc more tickets"); return -ENOMEM; } booth_conf->ticket = p; memset(booth_conf->ticket + had, 0, sizeof(struct ticket_config) * added); booth_conf->ticket_allocated = want; return 0; } int add_site(char *address, int type); int add_site(char *addr_string, int type) { int rv; struct booth_site *site; uLong nid; uint32_t mask; rv = 1; if (booth_conf->site_count == MAX_NODES) { log_error("too many nodes"); goto out; } if (strlen(addr_string)+1 >= sizeof(booth_conf->site[0].addr_string)) { log_error("site address \"%s\" too long", addr_string); goto out; } site = booth_conf->site + booth_conf->site_count; site->family = BOOTH_PROTO_FAMILY; site->type = type; /* Make site_id start at a non-zero point. * Perhaps use hash over string or address? */ strcpy(site->addr_string, addr_string); nid = crc32(0L, NULL, 0); /* booth_config() uses memset(), so sizeof() is guaranteed to give * the same result everywhere - no uninitialized bytes. */ site->site_id = crc32(nid, site->addr_string, sizeof(site->addr_string)); /* Make sure we will never collide with NO_OWNER, * or be negative (to get "get_local_id() < 0" working). */ mask = 1 << (sizeof(site->site_id)*8 -1); assert(NO_OWNER & mask); site->site_id &= ~mask; site->index = booth_conf->site_count; site->bitmask = 1 << booth_conf->site_count; /* Catch site overflow */ assert(site->bitmask); booth_conf->site_bits |= site->bitmask; site->tcp_fd = -1; if (site->type == SITE) site->role = PROPOSER | ACCEPTOR | LEARNER; else if (site->type == ARBITRATOR) site->role = ACCEPTOR | LEARNER; booth_conf->site_count++; rv = 0; memset(&site->sa6, 0, sizeof(site->sa6)); if (inet_pton(AF_INET, site->addr_string, &site->sa4.sin_addr) > 0) { site->family = AF_INET; site->sa4.sin_family = site->family; site->sa4.sin_port = htons(booth_conf->port); site->saddrlen = sizeof(site->sa4); site->addrlen = sizeof(site->sa4.sin_addr); } else if (inet_pton(AF_INET6, site->addr_string, &site->sa6.sin6_addr) > 0) { site->family = AF_INET6; site->sa6.sin6_family = site->family; site->sa6.sin6_flowinfo = 0; site->sa6.sin6_port = htons(booth_conf->port); site->saddrlen = sizeof(site->sa6); site->addrlen = sizeof(site->sa6.sin6_addr); } else { log_error("Address string \"%s\" is bad", site->addr_string); rv = EINVAL; } out: return rv; } inline static const char *skip_while_in(const char *cp, int (*fn)(int), const char *allowed) { /* strchr() returns a pointer to the terminator if *cp == 0. */ while (*cp && (fn(*cp) || strchr(allowed, *cp))) cp++; return cp; } inline static char *skip_while(char *cp, int (*fn)(int)) { while (fn(*cp)) cp++; return cp; } inline static char *skip_until(char *cp, char expected) { while (*cp && *cp != expected) cp++; return cp; } static inline int is_end_of_line(char *cp) { char c = *cp; return c == '\n' || c == 0 || c == '#'; } static int add_ticket(const char *name, struct ticket_config **tkp, const struct ticket_config *def) { int rv; struct ticket_config *tk; if (booth_conf->ticket_count == booth_conf->ticket_allocated) { rv = ticket_realloc(); if (rv < 0) return rv; } tk = booth_conf->ticket + booth_conf->ticket_count; booth_conf->ticket_count++; if (!check_max_len_valid(name, sizeof(tk->name))) { log_error("ticket name \"%s\" too long.", name); return -EINVAL; } if (find_ticket_by_name(name, NULL)) { log_error("ticket name \"%s\" used again.", name); return -EINVAL; } if (* skip_while_in(name, isalnum, "-/")) { log_error("ticket name \"%s\" invalid; only alphanumeric names.", name); return -EINVAL; } strcpy(tk->name, name); tk->timeout = def->timeout; tk->expiry = def->expiry; memcpy(tk->weight, def->weight, sizeof(tk->weight)); - tk->current_state.state = ST_INIT; + tk->state = ST_INIT; if (tkp) *tkp = tk; return 0; } /* returns number of weights, or -1 on bad input. */ static int parse_weights(const char *input, int weights[MAX_NODES]) { int i, v; char *cp; for(i=0; iproto = UDP; booth_conf->port = BOOTH_DEFAULT_PORT; parse_weights("", defaults.weight); defaults.expiry = DEFAULT_TICKET_EXPIRY; defaults.timeout = DEFAULT_TICKET_TIMEOUT; error = ""; log_debug("reading config file %s", path); while (fgets(line, sizeof(line), fp)) { lineno++; s = skip_while(line, isspace); if (is_end_of_line(s)) continue; key = s; /* Key */ end_of_key = skip_while(key, isalnum); if (end_of_key == key) { error = "No key"; goto err; } if (!*end_of_key) goto exp_equal; /* whitespace, and something else but nothing more? */ s = skip_while(end_of_key, isspace); if (*s != '=') { exp_equal: error = "Expected '=' after key"; goto err; } s++; /* It's my buffer, and I terminate if I want to. */ /* But not earlier than that, because we had to check for = */ *end_of_key = 0; /* Value tokenizing */ s = skip_while(s, isspace); switch (*s) { case '"': case '\'': val = s+1; s = skip_until(val, *s); /* Terminate value */ if (!*s) { error = "Unterminated quoted string"; goto err; } /* Remove and skip quote */ *s = 0; s++; if (* skip_while(s, isspace)) { error = "Surplus data after value"; goto err; } *s = 0; break; case 0: no_value: error = "No value"; goto err; break; default: val = s; /* Rest of line. */ i = strlen(s); /* i > 0 because of "case 0" above. */ while (i > 0 && isspace(s[i-1])) i--; s += i; *s = 0; } if (val == s) goto no_value; if (strlen(key) > BOOTH_NAME_LEN || strlen(val) > BOOTH_NAME_LEN) { error = "key/value too long"; goto err; } if (strcmp(key, "transport") == 0) { if (got_transport) { error = "config file has multiple transport lines"; goto err; } if (strcasecmp(val, "UDP") == 0) booth_conf->proto = UDP; else if (strcasecmp(val, "SCTP") == 0) booth_conf->proto = SCTP; else { error = "invalid transport protocol"; goto err; } got_transport = 1; } if (strcmp(key, "port") == 0) booth_conf->port = atoi(val); if (strcmp(key, "name") == 0) { if(strlen(val)+1 >= BOOTH_NAME_LEN) { error = "Config name too long."; goto err; } } if (strcmp(key, "site") == 0) { if (add_site(val, SITE)) goto out; } if (strcmp(key, "arbitrator") == 0) { if (add_site(val, ARBITRATOR)) goto out; } if (strcmp(key, "ticket") == 0) { if (add_ticket(val, &last_ticket, &defaults)) goto out; /* last_ticket is valid until another one is needed - * and then it already has the new address and * is valid again. */ } if (strcmp(key, "expire") == 0) { defaults.expiry = strtol(val, &s, 0); if (*s || s == val || defaults.expiry<10) { error = "Expected plain integer value >=10 for expire"; goto err; } if (last_ticket) last_ticket->expiry = defaults.expiry; } if (strcmp(key, "timeout") == 0) { defaults.timeout = strtol(val, &s, 0); if (*s || s == val || defaults.timeout<1) { error = "Expected plain integer value >=1 for timeout"; goto err; } if (last_ticket) last_ticket->timeout = defaults.timeout; } if (strcmp(key, "weights") == 0) { if (parse_weights(val, defaults.weight) < 0) goto out; if (last_ticket) memcpy(last_ticket->weight, defaults.weight, sizeof(last_ticket->weight)); } } /* Default: make config name match config filename. */ if (!booth_conf->name[0]) { cp = strrchr(path, '/'); if (!cp) cp = path; /* TODO: locale? */ /* NUL-termination by memset. */ for(i=0; iname[i] = *(cp++); /* Last resort. */ if (!booth_conf->name[0]) strcpy(booth_conf->name, "booth"); } return 0; err: out: log_error("%s in config file line %d", error, lineno); free(booth_conf); booth_conf = NULL; return -1; } int check_config(int type) { if (!booth_conf) return -1; return 0; } int find_site_by_name(unsigned char *site, struct booth_site **node) { struct booth_site *n; int i; if (!booth_conf) return 0; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if (n->type == SITE && strcmp(n->addr_string, site) == 0) { *node = n; return 1; } } return 0; } int find_site_by_id(uint32_t site_id, struct booth_site **node) { struct booth_site *n; int i; if (site_id == NO_OWNER) { *node = NULL; return 1; } if (!booth_conf) return 0; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if (n->site_id == site_id) { *node = n; return 1; } } return 0; } const char *type_to_string(int type) { switch (type) { case ARBITRATOR: return "arbitrator"; case SITE: return "site"; case CLIENT: return "client"; } return "??invalid-type??"; } diff --git a/src/config.h b/src/config.h index 8afccb1..70c9d4c 100644 --- a/src/config.h +++ b/src/config.h @@ -1,113 +1,111 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _CONFIG_H #define _CONFIG_H #include #include "booth.h" #include "transport.h" /** @{ */ /** Definitions for in-RAM data. */ #define MAX_NODES 16 #define TICKET_ALLOC 16 struct ticket_paxos_state { /** Who tries to change the current status. */ struct booth_site *proposer; /** Current owner of ticket. */ struct booth_site *owner; /** Timestamp of expiration. */ time_t expires; - /** State. */ - cmd_request_t state; /** Current ballot number. Might be < prev_ballot if overflown. */ uint32_t ballot; /** Previous ballot. */ uint32_t prev_ballot; /** Bitmap of sites that acknowledge that state. */ uint64_t acknowledges; }; struct ticket_config { boothc_ticket name; /** How many seconds until expiration. */ int expiry; /** Network related timeouts. */ int timeout; - -// pl_handle_t handle; not needed? + /** State. */ + cmd_request_t state; int weight[MAX_NODES]; time_t next_cron; struct ticket_paxos_state current_state; struct ticket_paxos_state proposed_state; }; struct booth_config { char name[BOOTH_NAME_LEN]; transport_layer_t proto; uint16_t port; /** Stores the OR of the individual host bitmasks. */ uint32_t site_bits; int site_count; struct booth_site site[MAX_NODES]; int ticket_count; int ticket_allocated; struct ticket_config *ticket; }; extern struct booth_config *booth_conf; int read_config(const char *path); int check_config(int type); int find_site_by_name(unsigned char *site, struct booth_site **node); int find_site_by_id(uint32_t site_id, struct booth_site **node); const char *type_to_string(int type); #define STATE_STRING(s_) ({ union { cmd_request_t s; char c[5]; } d; d.s = htonl(s_); d.c[4] = 0; d.c; }) #endif /* _CONFIG_H */ diff --git a/src/paxos.c b/src/paxos.c index 06d8782..3a93439 100644 --- a/src/paxos.c +++ b/src/paxos.c @@ -1,352 +1,349 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include "booth.h" #include "transport.h" #include "inline-fn.h" #include "config.h" #include "paxos.h" #include "log.h" static uint32_t next_ballot_number(struct ticket_config *tk) { uint32_t b; /* TODO: getenv() for debugging */ b = tk->current_state.ballot; /* + unique number */ b += local->bitmask; /* + weight */ b += booth_conf->site_bits * tk->weight[ local->index ]; return b; } int paxos_start_round(struct ticket_config *tk, struct booth_site *new_owner) { struct ticket_paxos_state *tps; // TODO needs to be done from cron tps = &tk->proposed_state; - tps->_proposer = local; + tps->proposer = local; tps->prev_ballot = tk->current_state.ballot; tps->ballot = next_ballot_number(tk); tps->owner = new_owner; ticket_activate_timeout(tk); return ticket_broadcast_proposed_state(tk, OP_PREPARING); } /** @{ */ /** Message handling functions. * * Not all use all arguments; but to keep the interface the same, * they're all just passed everything we have. * * A PAXOS round starts by sending out an OP_PREPARING. * */ /** Answering OP_PREPARING means sending out OP_PROMISING. */ inline static int answer_PREP( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, int cmd, uint32_t ballot, struct booth_site *new_owner) { if (!(local->role & ACCEPTOR)) return 0; /* We have to be careful here. * Getting multiple copies of the same message must not trigger * rejections, but only repeated promises. */ if (ballot > tk->current_state.ballot) { msg->ticket.prev_ballot = htonl(tk->current_state.ballot); msg->header.cmd = htonl(OP_PROMISING); /* Not allowed: * tk->current_state.ballot = ballot; * * See above for reasoning. */ tk->proposed_state.ballot = ballot; tk->proposed_state.proposer = from; /* We lose (?) */ - tk->current_state.state = ST_STABLE; - tk->proposed_state.state = ST_STABLE; + tk->state = ST_STABLE; log_info("PROMISING for ticket \"%s\" (by %s) for %d", tk->name, from->addr_string, ballot); } else { msg->ticket.ballot = htonl(tk->current_state.ballot); msg->header.cmd = htonl(OP_REJECTED); log_info("REJECTING (prep) for ticket \"%s\" from %s - have %d, wanted %d", tk->name, from->addr_string, tk->current_state.ballot, ballot); } init_header_bare(&msg->header); return booth_udp_send(from, msg, sizeof(*msg)); } /** Getting OP_REJECTED means abandoning the current operation. */ inline static int answer_REJ( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, int cmd, uint32_t ballot, struct booth_site *new_owner) { log_info("got REJECTED for ticket \"%s\", ballot %d (has %d), from %s", tk->name, tk->proposed_state.ballot, ballot, from->addr_string); tk->current_state.ballot = ballot; tk->proposed_state.ballot = ballot; - tk->current_state.state = ST_STABLE; + tk->state = ST_STABLE; return 0; } /** After a few OP_PROMISING replies we can send out OP_PROPOSING. */ inline static int answer_PROM( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, int cmd, uint32_t ballot, struct booth_site *new_owner) { /* Ignore delayed promises. * They'd only cause packet repetitions anyway. */ - if (tk->proposed_state.state == OP_PREPARING) { + if (tk->state == OP_PREPARING) { tk->proposed_state.acknowledges |= from->bitmask; log_info("Got PROMISE from %s for \"%s\", now %" PRIx64, from->addr_string, tk->name, tk->proposed_state.acknowledges); /* TODO: only check for count? */ if (promote_ticket_state(tk)) { ticket_activate_timeout(tk); return ticket_broadcast_proposed_state(tk, OP_PROPOSING); } } /* Wait for further data */ return 0; } /** Answering OP_PROPOSING means sending out OP_ACCEPTING. */ inline static int answer_PROP( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, int cmd, uint32_t ballot, struct booth_site *new_owner) { if (!(local->role & ACCEPTOR)) return 0; /* We have to be careful here. * Getting multiple copies of the same message must not trigger * rejections, but only repeated OP_ACCEPTING messages. */ if (ballot > tk->current_state.ballot && ballot == tk->proposed_state.ballot && ntohl(msg->ticket.prev_ballot) == tk->current_state.ballot) { - tk->proposer = from; + tk->proposed_state.proposer = from; init_ticket_msg(msg, OP_ACCEPTING, RLT_SUCCESS, tk, &tk->proposed_state); log_info("ACCEPTING for ticket \"%s\" (by %s) for %d - new owner %s", tk->name, from->addr_string, ballot, ticket_owner_string(new_owner)); } else { msg->ticket.ballot = htonl(tk->current_state.ballot); msg->header.cmd = htonl(OP_REJECTED); log_info("REJECTING (prop) for ticket \"%s\" from %s - have %d, wanted %d", tk->name, from->addr_string, tk->current_state.ballot, ballot); } init_header_bare(&msg->header); return booth_udp_send(from, msg, sizeof(*msg)); } /** After enough OP_ACCEPTING we can do the change, and send an OP_COMMITTED. */ inline static int answer_ACC( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, int cmd, uint32_t ballot, struct booth_site *new_owner) { int rv; - if (tk->proposed_state.state == OP_PROPOSING) { + if (tk->state == OP_PROPOSING) { tk->proposed_state.acknowledges |= from->bitmask; log_info("Got ACCEPTING from %s for \"%s\", now %" PRIx64, from->addr_string, tk->name, tk->proposed_state.acknowledges); /* TODO: only check for count? */ if (promote_ticket_state(tk)) { /* Get previous value for next round */ tk->proposed_state.prev_ballot = tk->current_state.prev_ballot = tk->current_state.ballot; tk->current_state.ballot = tk->proposed_state.ballot; tk->current_state.owner = tk->proposed_state.owner; tk->current_state.expires = time(NULL) + tk->expiry; /* TODO */ tk->next_cron = time(NULL) + tk->current_state.owner == local ? tk->expiry / 2 : tk->expiry; log_info("Now actively COMMITTED for \"%s\", new owner %s", tk->name, ticket_owner_string(tk->current_state.owner)); ticket_write(tk); rv = ticket_broadcast_proposed_state(tk, OP_COMMITTED); - tk->current_state.state = - tk->proposed_state.state = ST_STABLE; + tk->state = ST_STABLE; return rv; } } /* Wait for further data */ return 0; } /** An OP_COMMITTED gets no answer; just record the new state. */ inline static int answer_COMM( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, int cmd, uint32_t ballot, struct booth_site *new_owner) { log_info("COMMITTED for ticket \"%s\", ballot %d, from %s, new owner %s", tk->name, ballot, from->addr_string, ticket_owner_string(new_owner) ); tk->proposed_state.prev_ballot = tk->current_state.prev_ballot = tk->current_state.ballot; tk->proposed_state.ballot = tk->current_state.ballot = ballot; tk->proposed_state.owner = tk->current_state.owner = new_owner; - tk->current_state.state = - tk->proposed_state.state = ST_STABLE; + tk->state = ST_STABLE; tk->current_state.expires = tk->proposed_state.expires = time(NULL) + tk->expiry; /* Nothing to do? */ tk->next_cron = time(NULL) + tk->current_state.owner == local ? tk->expiry / 2 : tk->expiry; ticket_write(tk); /* Send ack? */ return 0; } /** @} */ int paxos_answer(struct boothc_ticket_msg *msg, struct ticket_config *tk, struct booth_site *from) { int cmd; uint32_t ballot, new_owner; struct booth_site *new_owner_p; cmd = ntohl(msg->header.cmd); ballot = ntohl(msg->ticket.ballot); new_owner = ntohl(msg->ticket.owner); if (!find_site_by_id(new_owner, &new_owner_p)) { log_error("Message with unknown owner %x received", new_owner); return -EINVAL; } /* These are in roughly chronological order. * What the first machine sends is an OP_PREPARING * (see paxos_start_round()), which gets received * (below) from the others ... */ switch (cmd) { case OP_PREPARING: return answer_PREP(tk, from, msg, cmd, ballot, new_owner_p); case OP_REJECTED: return answer_REJ(tk, from, msg, cmd, ballot, new_owner_p); case OP_PROMISING: return answer_PROM(tk, from, msg, cmd, ballot, new_owner_p); case OP_PROPOSING: return answer_PROP(tk, from, msg, cmd, ballot, new_owner_p); case OP_ACCEPTING: return answer_ACC(tk, from, msg, cmd, ballot, new_owner_p); case OP_COMMITTED: return answer_COMM(tk, from, msg, cmd, ballot, new_owner_p); default: log_error("unprocessed message, cmd %x", cmd); return -EINVAL; } } diff --git a/src/ticket.c b/src/ticket.c index 16cbb20..c82b02e 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -1,655 +1,640 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include "ticket.h" #include "config.h" #include "pacemaker.h" #include "inline-fn.h" #include "log.h" #include "booth.h" #include "paxos.h" #define TK_LINE 256 /* Untrusted input, must fit (incl. \0) in a buffer of max chars. */ int check_max_len_valid(const char *s, int max) { int i; for(i=0; iticket_count; i++) { if (!strcmp(booth_conf->ticket[i].name, ticket)) { if (found) *found = booth_conf->ticket + i; return 1; } } return 0; } int check_ticket(char *ticket, struct ticket_config **found) { if (found) *found = NULL; if (!booth_conf) return 0; if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name))) return 0; return find_ticket_by_name(ticket, found); } int check_site(char *site, int *is_local) { struct booth_site *node; if (!check_max_len_valid(site, sizeof(node->addr_string))) return 0; if (find_site_by_name(site, &node)) { *is_local = node->local; return 1; } return 0; } -static inline int is_same_or_better_state(cmd_request_t here, cmd_request_t there) -{ - if (here == there) - return 1; - - if (here == ST_INIT) - return 1; - - return 0; -} - - static void combine_paxos_states(struct ticket_config *tk, struct ticket_paxos_state *new) { struct ticket_paxos_state *is; is = &tk->proposed_state; - log_info("combine %s: state %s->%s " + log_info("combine %s: state %s " "mask %" PRIx64 "/%" PRIx64 " " "ballot %x/%x ", tk->name, - STATE_STRING(is->state), - STATE_STRING(new->state), + STATE_STRING(tk->state), is->acknowledges, new->acknowledges, is->ballot, new->ballot); if (is->ballot > new->ballot) { log_debug("ticket %s got older ballot %d %d, ignored.", tk->name, is->ballot, new->ballot); return; } if (is->ballot < new->ballot) { log_debug("ticket %s got newer ballot %d %d, loaded.", tk->name, is->ballot, new->ballot); /* Eg. for CATCHUP */ *is = *new; return; } if (is->prev_ballot != new->prev_ballot) { /* Reject? */ log_debug("ticket %s got different prev ballots %d %d.", tk->name, is->prev_ballot, new->prev_ballot); return; } + assert(!"when do we get here? "); /* ballot numbers the same. */ - if (is_same_or_better_state(is->state, new->state) && - is->owner == new->owner) { + if (is->owner == new->owner) { is->acknowledges |= new->acknowledges; log_debug("ticket %s got ack %" PRIx64 ", now %" PRIx64, tk->name, new->acknowledges, is->acknowledges); } else { } } int promote_ticket_state(struct ticket_config *tk) { /* >= or > ? */ if (__builtin_popcount(tk->proposed_state.acknowledges) * 2 > booth_conf->site_count) { tk->current_state = tk->proposed_state; - if (tk->current_state.state == ST_INIT) - tk->current_state.state = ST_STABLE; + if (tk->state == ST_INIT) + tk->state = ST_STABLE; log_debug("ticket %s changing into state %s", - tk->name, STATE_STRING(tk->current_state.state)); + tk->name, STATE_STRING(tk->state)); return 1; } return 0; } static void ticket_parse(struct ticket_config *tk, struct boothc_ticket_msg *tmsg, struct booth_site *from) { struct ticket_paxos_state tp, *tps; struct booth_site *owner; time_t now; time(&now); tps = &tp; if (ntohl(tmsg->header.result) == RLT_SUCCESS) { if (!find_site_by_id( ntohl(tmsg->ticket.owner), &owner)) { log_error("wrong site_id %x as ticket owner, msg from %x", tmsg->ticket.owner, tmsg->header.from); return; } tps->expires = ntohl(tmsg->ticket.expiry) + now; tps->ballot = ntohl(tmsg->ticket.ballot); tps->prev_ballot = ntohl(tmsg->ticket.prev_ballot); tps->owner = owner; tps->acknowledges= from->bitmask; - tps->state = ST_STABLE; } if (now >= tps->expires) { tps->owner = NULL; tps->expires = 0; } combine_paxos_states(tk, tps); promote_ticket_state(tk); if (local->type != ARBITRATOR) { pcmk_handler.store_ticket(tk); if (tps->owner == local) pcmk_handler.grant_ticket(tk); else pcmk_handler.revoke_ticket(tk); } } /** Find out what others think about this ticket. * * If we're a SITE, we can ask (and have to tell) Pacemaker. * An ARBITRATOR can only ask others. */ static int ticket_send_catchup(struct ticket_config *tk) { int i, rv = 0; struct booth_site *site; struct boothc_ticket_msg msg; foreach_node(i, site) { if (!site->local) { init_ticket_msg(&msg, CMD_CATCHUP, RLT_SUCCESS, tk, &tk->current_state); log_debug("attempting catchup from %s", site->addr_string); rv = booth_udp_send(site, &msg, sizeof(msg)); } } ticket_activate_timeout(tk); return rv; } int ticket_write(struct ticket_config *tk) { pcmk_handler.store_ticket(tk); if (tk->current_state.owner == local) { pcmk_handler.grant_ticket(tk); } else if (!tk->current_state.owner) { pcmk_handler.revoke_ticket(tk); } return 0; } /* UDP message receiver. */ int message_recv(struct boothc_ticket_msg *msg, int msglen) { int cmd, rv; uint32_t from; struct booth_site *dest; struct ticket_config *tk; if (check_boothc_header(&msg->header, sizeof(*msg)) < 0 || msglen != sizeof(*msg)) { log_error("message receive error"); return -1; } from = ntohl(msg->header.from); if (!find_site_by_id(from, &dest)) { log_error("unknown sender: %08x", from); return -1; } if (!check_ticket(msg->ticket.id, &tk)) { log_error("got invalid ticket name \"%s\" from %s", msg->ticket.id, dest->addr_string); return -EINVAL; } cmd = ntohl(msg->header.cmd); switch (cmd) { case CMD_CATCHUP: rv = ticket_answer_catchup(msg, tk); if (rv < 0) return rv; return booth_udp_send(dest, msg, sizeof(*msg)); case CMR_CATCHUP: - if (tk->current_state.state == ST_INIT) + if (tk->state == ST_INIT) return ticket_process_catchup(msg, tk, dest); break; default: rv = paxos_answer(msg, tk, dest); assert((tk->proposed_state.acknowledges & ~booth_conf->site_bits) == 0); assert((tk->current_state.acknowledges & ~booth_conf->site_bits) == 0); return rv; } return 0; } /** Try to get the ticket for the local site. * */ int do_grant_ticket(struct ticket_config *tk) { int rv; if (tk->current_state.owner == local) return RLT_SUCCESS; if (tk->current_state.owner) return RLT_OVERGRANT; rv = paxos_start_round(tk, local); return rv; } /** Start a PAXOS round for revoking. * That can be started from any site. */ int do_revoke_ticket(struct ticket_config *tk) { int rv; if (!tk->current_state.owner) return RLT_SUCCESS; rv = paxos_start_round(tk, NULL); return rv; } int list_ticket(char **pdata, unsigned int *len) { struct ticket_config *tk; struct ticket_paxos_state *tps; char timeout_str[64]; char *data, *cp; int i, alloc; *pdata = NULL; *len = 0; alloc = 256 + booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128); data = malloc(alloc); if (!data) return -ENOMEM; cp = data; foreach_ticket(i, tk) { tps = &tk->current_state; if (tps->expires != 0) strftime(timeout_str, sizeof(timeout_str), "%F %T", localtime(&tps->expires)); else strcpy(timeout_str, "INF"); cp += sprintf(cp, "ticket: %s, owner: %s, expires: %s\n", tk->name, tps->owner ? tps->owner->addr_string : "None", timeout_str); *len = cp - data; assert(*len < alloc); } *pdata = data; return 0; } int setup_ticket(void) { struct ticket_config *tk; int i; /* TODO */ foreach_ticket(i, tk) { tk->current_state.owner = NULL; tk->current_state.expires = 0; - tk->current_state.state = ST_INIT; tk->proposed_state = tk->current_state; - if (local->type != ARBITRATOR) { + tk->state = ST_INIT; + + if (local->role & PROPOSER) { pcmk_handler.load_ticket(tk); } } return 0; } int ticket_answer_list(int fd, struct boothc_ticket_msg *msg) { char *data; int olen, rv; struct boothc_header hdr; rv = list_ticket(&data, &olen); if (rv < 0) return rv; init_header(&hdr, CMR_LIST, RLT_SUCCESS, sizeof(hdr) + olen); return send_header_plus(fd, &hdr, data, olen); } int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { rv = RLT_INVALID_ARG; goto reply; } if (tk->current_state.owner) { log_error("client want to get an granted " "ticket %s", msg->ticket.id); rv = RLT_OVERGRANT; goto reply; } rv = do_grant_ticket(tk); reply: init_header(&msg->header, CMR_GRANT, rv ?: RLT_ASYNC, sizeof(*msg)); return send_ticket_msg(fd, msg); } int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { rv = RLT_INVALID_ARG; goto reply; } if (!tk->current_state.owner) { log_info("client want to revoke a free ticket \"%s\"", msg->ticket.id); rv = RLT_SUCCESS; goto reply; } rv = do_revoke_ticket(tk); reply: init_ticket_msg(msg, CMR_REVOKE, rv ?: RLT_ASYNC, tk, &tk->current_state); return send_ticket_msg(fd, msg); } int ticket_answer_catchup(struct boothc_ticket_msg *msg, struct ticket_config *tk) { int rv; log_debug("got catchup request for \"%s\" from %08x", msg->ticket.id, ntohl(msg->header.from)); if (!msg && !check_ticket(msg->ticket.id, &tk)) { rv = RLT_INVALID_ARG; goto reply; } /* We do _always_ answer. * In case all booth daemons are restarted at the same time, nobody * would answer any questions, leading to timeouts and delays. * Just admit we don't know. */ rv = RLT_SUCCESS; reply: init_ticket_msg(msg, CMR_CATCHUP, rv, tk, - (tk->current_state.state == ST_INIT ? + (tk->state == ST_INIT ? &tk->proposed_state : &tk->current_state)); return 1; } /* TODO: move all that into paxos.c, like all other message handling? */ int ticket_process_catchup(struct boothc_ticket_msg *msg, struct ticket_config *tk, struct booth_site *sender) { int rv; log_debug("got catchup answer for \"%s\" from %s", msg->ticket.id, sender->addr_string); ticket_parse(tk, msg, sender); rv = 0; log_debug("got catchup result from %s: result %d", sender->addr_string, rv); return rv; } /** Send new state request to all sites. * Perhaps this should take a flag for ACCEPTOR etc.? * No need currently, as all nodes are more or less identical. */ int ticket_broadcast_proposed_state(struct ticket_config *tk, cmd_request_t state) { struct boothc_ticket_msg msg; tk->proposed_state.acknowledges = local->bitmask; - tk->proposed_state.state = state; + tk->state = state; init_ticket_msg(&msg, state, RLT_SUCCESS, tk, &tk->proposed_state); log_debug("broadcasting %s for ticket \"%s\"", STATE_STRING(state), tk->name); return transport()->broadcast(&msg, sizeof(msg)); } static void ticket_cron(struct ticket_config *tk) { time_t now; - switch(tk->current_state.state) { + now = time(NULL); + + switch(tk->state) { case ST_INIT: /* Unknown state, ask others. */ ticket_send_catchup(tk); return; - default: - break; - } - - now = time(NULL); - switch(tk->proposed_state.state) { case OP_COMMITTED: case ST_STABLE: /* Has an owner, has an expiry date, and expiry date in the past? */ if (tk->current_state.expires && tk->current_state.owner && now > tk->current_state.expires) { log_info("LOST ticket: \"%s\" no longer at %s", tk->name, ticket_owner_string(tk->current_state.owner)); /* Couldn't renew in time - ticket lost. */ - tk->current_state.state = ST_INIT; tk->current_state.owner = NULL; ticket_write(tk); + + /* Ask others (repeatedly) until we know the new owner. */ + tk->state = ST_INIT; + ticket_activate_timeout(tk); } /* Do we need to refresh? */ if (tk->current_state.owner == local && now + tk->expiry/2 > tk->current_state.expires) { log_info("RENEW ticket \"%s\"", tk->name); paxos_start_round(tk, local); /* TODO: remember when we started, and restart afresh after some retries */ } break; case OP_PREPARING: case OP_PROPOSING: /* We ask others for a change; retry to get consensus. */ - ticket_broadcast_proposed_state(tk, tk->proposed_state.state); + ticket_broadcast_proposed_state(tk, tk->state); break; case OP_PROMISING: case OP_ACCEPTING: case OP_RECOVERY: case OP_REJECTED: break; default: break; } } void process_tickets(void) { struct ticket_config *tk; int i; time_t now; time(&now); foreach_ticket(i, tk) { if (0) log_debug("ticket %s next cron %" PRIx64 ", now %" PRIx64 ", in %" PRIi64, tk->name, (uint64_t)tk->next_cron, (uint64_t)now, tk->next_cron - now); if (tk->next_cron > now) continue; log_debug("ticket cron: doing %s", tk->name); /* Set next value, handler may override. */ tk->next_cron = INT_MAX; ticket_cron(tk); } } void tickets_log_info(void) { struct ticket_config *tk; struct ticket_paxos_state *c, *p; int i; foreach_ticket(i, tk) { c = &tk->current_state; p = &tk->proposed_state; - log_info("Ticket %s: state %s/%s " + log_info("Ticket %s: state %s " "mask %" PRIx64 "/%" PRIx64 " " "ballot %x/%x " "expires %-24.24s", tk->name, - STATE_STRING(c->state), - STATE_STRING(p->state), + STATE_STRING(tk->state), c->acknowledges, p->acknowledges, c->ballot, p->ballot, ctime(&c->expires)); } }