diff --git a/src/config.c b/src/config.c index 5a38532..a5b2440 100644 --- a/src/config.c +++ b/src/config.c @@ -1,589 +1,601 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include "booth.h" #include "config.h" #include "paxos.h" #include "ticket.h" #include "log.h" static int ticket_size = 0; static int ticket_realloc(void) { const int added = 5; int had, want; void *p; had = booth_conf->ticket_allocated; want = had + added; p = realloc(booth_conf->ticket, sizeof(struct ticket_config) * want); if (!booth_conf) { log_error("can't alloc more tickets"); return -ENOMEM; } booth_conf->ticket = p; memset(booth_conf->ticket + had, 0, sizeof(struct ticket_config) * added); booth_conf->ticket_allocated = want; return 0; } int add_site(char *address, int type); int add_site(char *addr_string, int type) { int rv; struct booth_site *site; uLong nid; uint32_t mask; rv = 1; if (booth_conf->site_count == MAX_NODES) { log_error("too many nodes"); goto out; } if (strlen(addr_string)+1 >= sizeof(booth_conf->site[0].addr_string)) { log_error("site address \"%s\" too long", addr_string); goto out; } site = booth_conf->site + booth_conf->site_count; site->family = BOOTH_PROTO_FAMILY; site->type = type; /* Make site_id start at a non-zero point. * Perhaps use hash over string or address? */ strcpy(site->addr_string, addr_string); nid = crc32(0L, NULL, 0); /* booth_config() uses memset(), so sizeof() is guaranteed to give * the same result everywhere - no uninitialized bytes. */ site->site_id = crc32(nid, site->addr_string, sizeof(site->addr_string)); /* Make sure we will never collide with NO_OWNER, * or be negative (to get "get_local_id() < 0" working). */ mask = 1 << (sizeof(site->site_id)*8 -1); assert(NO_OWNER & mask); site->site_id &= ~mask; site->index = booth_conf->site_count; site->bitmask = 1 << booth_conf->site_count; /* Catch site overflow */ assert(site->bitmask); booth_conf->site_bits |= site->bitmask; site->tcp_fd = -1; if (site->type == SITE) site->role = PROPOSER | ACCEPTOR | LEARNER; else if (site->type == ARBITRATOR) site->role = ACCEPTOR | LEARNER; booth_conf->site_count++; rv = 0; memset(&site->sa6, 0, sizeof(site->sa6)); if (inet_pton(AF_INET, site->addr_string, &site->sa4.sin_addr) > 0) { site->family = AF_INET; site->sa4.sin_family = site->family; site->sa4.sin_port = htons(booth_conf->port); site->saddrlen = sizeof(site->sa4); site->addrlen = sizeof(site->sa4.sin_addr); } else if (inet_pton(AF_INET6, site->addr_string, &site->sa6.sin6_addr) > 0) { site->family = AF_INET6; site->sa6.sin6_family = site->family; site->sa6.sin6_flowinfo = 0; site->sa6.sin6_port = htons(booth_conf->port); site->saddrlen = sizeof(site->sa6); site->addrlen = sizeof(site->sa6.sin6_addr); } else { log_error("Address string \"%s\" is bad", site->addr_string); rv = EINVAL; } out: return rv; } inline static char *skip_while_in(const char *cp, int (*fn)(int), const char *allowed) { /* strchr() returns a pointer to the terminator if *cp == 0. */ while (*cp && (fn(*cp) || strchr(allowed, *cp))) cp++; /* discard "const" qualifier */ return (char*)cp; } inline static char *skip_while(char *cp, int (*fn)(int)) { while (fn(*cp)) cp++; return cp; } inline static char *skip_until(char *cp, char expected) { while (*cp && *cp != expected) cp++; return cp; } static inline int is_end_of_line(char *cp) { char c = *cp; return c == '\n' || c == 0 || c == '#'; } static int add_ticket(const char *name, struct ticket_config **tkp, const struct ticket_config *def) { int rv; struct ticket_config *tk; if (booth_conf->ticket_count == booth_conf->ticket_allocated) { rv = ticket_realloc(); if (rv < 0) return rv; } tk = booth_conf->ticket + booth_conf->ticket_count; booth_conf->ticket_count++; if (!check_max_len_valid(name, sizeof(tk->name))) { log_error("ticket name \"%s\" too long.", name); return -EINVAL; } if (find_ticket_by_name(name, NULL)) { log_error("ticket name \"%s\" used again.", name); return -EINVAL; } if (* skip_while_in(name, isalnum, "-/")) { log_error("ticket name \"%s\" invalid; only alphanumeric names.", name); return -EINVAL; } strcpy(tk->name, name); tk->timeout = def->timeout; tk->expiry = def->expiry; memcpy(tk->weight, def->weight, sizeof(tk->weight)); tk->state = ST_INIT; if (tkp) *tkp = tk; return 0; } /* returns number of weights, or -1 on bad input. */ static int parse_weights(const char *input, int weights[MAX_NODES]) { int i, v; char *cp; for(i=0; iproto = UDP; booth_conf->port = BOOTH_DEFAULT_PORT; parse_weights("", defaults.weight); defaults.expiry = DEFAULT_TICKET_EXPIRY; defaults.timeout = DEFAULT_TICKET_TIMEOUT; + defaults.retries = DEFAULT_RETRIES; error = ""; log_debug("reading config file %s", path); while (fgets(line, sizeof(line), fp)) { lineno++; s = skip_while(line, isspace); if (is_end_of_line(s)) continue; key = s; /* Key */ end_of_key = skip_while_in(key, isalnum, "-_"); if (end_of_key == key) { error = "No key"; goto err; } if (!*end_of_key) goto exp_equal; /* whitespace, and something else but nothing more? */ s = skip_while(end_of_key, isspace); if (*s != '=') { exp_equal: error = "Expected '=' after key"; goto err; } s++; /* It's my buffer, and I terminate if I want to. */ /* But not earlier than that, because we had to check for = */ *end_of_key = 0; /* Value tokenizing */ s = skip_while(s, isspace); switch (*s) { case '"': case '\'': val = s+1; s = skip_until(val, *s); /* Terminate value */ if (!*s) { error = "Unterminated quoted string"; goto err; } /* Remove and skip quote */ *s = 0; s++; if (* skip_while(s, isspace)) { error = "Surplus data after value"; goto err; } *s = 0; break; case 0: no_value: error = "No value"; goto err; break; default: val = s; /* Rest of line. */ i = strlen(s); /* i > 0 because of "case 0" above. */ while (i > 0 && isspace(s[i-1])) i--; s += i; *s = 0; } if (val == s) goto no_value; if (strlen(key) > BOOTH_NAME_LEN || strlen(val) > BOOTH_NAME_LEN) { error = "key/value too long"; goto err; } if (strcmp(key, "transport") == 0) { if (got_transport) { error = "config file has multiple transport lines"; goto err; } if (strcasecmp(val, "UDP") == 0) booth_conf->proto = UDP; else if (strcasecmp(val, "SCTP") == 0) booth_conf->proto = SCTP; else { error = "invalid transport protocol"; goto err; } got_transport = 1; } if (strcmp(key, "port") == 0) booth_conf->port = atoi(val); if (strcmp(key, "name") == 0) { if(strlen(val)+1 >= BOOTH_NAME_LEN) { error = "Config name too long."; goto err; } } if (strcmp(key, "site") == 0) { if (add_site(val, SITE)) goto out; } if (strcmp(key, "arbitrator") == 0) { if (add_site(val, ARBITRATOR)) goto out; } if (strcmp(key, "ticket") == 0) { if (add_ticket(val, &last_ticket, &defaults)) goto out; /* last_ticket is valid until another one is needed - * and then it already has the new address and * is valid again. */ } if (strcmp(key, "expire") == 0) { defaults.expiry = strtol(val, &s, 0); if (*s || s == val || defaults.expiry<10) { error = "Expected plain integer value >=10 for expire"; goto err; } if (last_ticket) last_ticket->expiry = defaults.expiry; } if (strcmp(key, "timeout") == 0) { defaults.timeout = strtol(val, &s, 0); if (*s || s == val || defaults.timeout<1) { error = "Expected plain integer value >=1 for timeout"; goto err; } if (last_ticket) last_ticket->timeout = defaults.timeout; } + if (strcmp(key, "retries") == 0) { + defaults.retries = strtol(val, &s, 0); + if (*s || s == val || defaults.retries<3 || defaults.retries > 100) { + error = "Expected plain integer value in the range [3, 100] for retries"; + goto err; + } + + if (last_ticket) + last_ticket->retries = defaults.retries; + } + if (strcmp(key, "acquire-after") == 0) { defaults.acquire_after = strtol(val, &s, 0); if (*s || s == val || defaults.acquire_after<1) { error = "Expected plain integer value >=1 for acquire-after"; goto err; } if (last_ticket) last_ticket->acquire_after = defaults.acquire_after; } if (strcmp(key, "weights") == 0) { if (parse_weights(val, defaults.weight) < 0) goto out; if (last_ticket) memcpy(last_ticket->weight, defaults.weight, sizeof(last_ticket->weight)); } } if ((booth_conf->site_count % 2) == 0) { log_warn("An odd number of nodes is strongly recommended!"); } /* Default: make config name match config filename. */ if (!booth_conf->name[0]) { cp = strrchr(path, '/'); if (!cp) cp = path; /* TODO: locale? */ /* NUL-termination by memset. */ for(i=0; iname[i] = *(cp++); /* Last resort. */ if (!booth_conf->name[0]) strcpy(booth_conf->name, "booth"); } return 0; err: out: log_error("%s in config file line %d", error, lineno); free(booth_conf); booth_conf = NULL; return -1; } int check_config(int type) { if (!booth_conf) return -1; return 0; } int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type) { struct booth_site *n; int i; if (!booth_conf) return 0; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if ((n->type == SITE || any_type) && strcmp(n->addr_string, site) == 0) { *node = n; return 1; } } return 0; } int find_site_by_id(uint32_t site_id, struct booth_site **node) { struct booth_site *n; int i; if (site_id == NO_OWNER) { *node = NULL; return 1; } if (!booth_conf) return 0; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if (n->site_id == site_id) { *node = n; return 1; } } return 0; } const char *type_to_string(int type) { switch (type) { case ARBITRATOR: return "arbitrator"; case SITE: return "site"; case CLIENT: return "client"; } return "??invalid-type??"; } diff --git a/src/config.h b/src/config.h index 0a9124e..b3d8b49 100644 --- a/src/config.h +++ b/src/config.h @@ -1,146 +1,147 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _CONFIG_H #define _CONFIG_H #include #include "booth.h" #include "transport.h" /** @{ */ /** Definitions for in-RAM data. */ #define MAX_NODES 16 #define TICKET_ALLOC 16 -#define RETRIES 10 - struct ticket_config { /** \name Configuration items. * @{ */ /** Name of ticket. */ boothc_ticket name; /** How many seconds until expiration. */ int expiry; /** Network related timeouts. */ int timeout; + /** Retries before giving up. */ + int retries; + /** If >0, time to wait for a site to get fenced. * The ticket may be acquired after that timespan by * another site. */ int acquire_after; /** Node weights. */ int weight[MAX_NODES]; /** @} */ /** \name Runtime values. * @{ */ /** Current state. */ cmd_request_t state; /** When something has to be done */ struct timeval next_cron; /** Current owner of ticket. */ struct booth_site *owner; /** Timestamp of expiration. */ time_t expires; /** Last ballot number that was agreed on. */ uint32_t last_ack_ballot; /** @} */ /** \name Needed while proposals are being done. * @{ */ /** Who tries to change the current status. */ struct booth_site *proposer; /** Current owner of ticket. */ struct booth_site *proposed_owner; /** New/current ballot number. * Might be < prev_ballot if overflown. * This only every goes "up" (logically). */ uint32_t new_ballot; /** Bitmap of sites that acknowledge that state. */ uint64_t proposal_acknowledges; /** When an incompletely acknowledged proposal gets done. * If all peers agree, that happens sooner. * See switch_state_to(). */ struct timeval proposal_switch; /** Timestamp of proposal expiration. */ time_t proposal_expires; /** Number of send retries left. * Used on the new owner. * Starts at 0, counts up. */ int retry_number; /** @} */ }; struct booth_config { char name[BOOTH_NAME_LEN]; transport_layer_t proto; uint16_t port; /** Stores the OR of the individual host bitmasks. */ uint64_t site_bits; int site_count; struct booth_site site[MAX_NODES]; int ticket_count; int ticket_allocated; struct ticket_config *ticket; }; extern struct booth_config *booth_conf; int read_config(const char *path); int check_config(int type); int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type); int find_site_by_id(uint32_t site_id, struct booth_site **node); const char *type_to_string(int type); #define STATE_STRING(s_) ({ union { cmd_request_t s; char c[5]; } d; d.s = htonl(s_); d.c[4] = 0; d.c; }) #endif /* _CONFIG_H */ diff --git a/src/inline-fn.h b/src/inline-fn.h index 87f52a2..453c9c5 100644 --- a/src/inline-fn.h +++ b/src/inline-fn.h @@ -1,267 +1,267 @@ /* * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _INLINE_FN_H #define _INLINE_FN_H #include #include #include #include #include "config.h" #include "transport.h" inline static uint32_t get_local_id(void) { return local ? local->site_id : -1; } inline static uint32_t get_node_id(struct booth_site *node) { return node ? node->site_id : NO_OWNER; } inline static int ticket_valid_for(const struct ticket_config *tk) { int left; left = tk->expires - time(NULL); return (left < 0) ? 0 : left; } /** Returns number of seconds left, if any. */ inline static int owner_and_valid(const struct ticket_config *tk) { if (tk->owner != local) return 0; return ticket_valid_for(tk); } static inline void init_header_bare(struct boothc_header *h) { h->magic = htonl(BOOTHC_MAGIC); h->version = htonl(BOOTHC_VERSION); h->from = htonl(local->site_id); } static inline void init_header(struct boothc_header *h, int cmd, int result, int data_len) { init_header_bare(h); h->length = htonl(data_len); h->cmd = htonl(cmd); h->result = htonl(result); } static inline void init_ticket_site_header(struct boothc_ticket_msg *msg, int cmd) { init_header(&msg->header, cmd, 0, sizeof(*msg)); } static inline void init_ticket_msg(struct boothc_ticket_msg *msg, int cmd, int rv, struct ticket_config *tk) { assert(sizeof(msg->ticket.id) == sizeof(tk->name)); init_header(&msg->header, cmd, rv, sizeof(*msg)); if (!tk) { memset(&msg->ticket, 0, sizeof(msg->ticket)); } else { memcpy(msg->ticket.id, tk->name, sizeof(msg->ticket.id)); msg->ticket.expiry = htonl(ticket_valid_for(tk)); msg->ticket.owner = htonl(get_node_id(tk->owner)); msg->ticket.ballot = htonl(tk->new_ballot); msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); } } static inline struct booth_transport const *transport(void) { return booth_transport + booth_conf->proto; } static inline const char *ticket_owner_string(struct booth_site *site) { return site ? site->addr_string : "NONE"; } static inline void disown_ticket(struct ticket_config *tk) { tk->owner = NULL; tk->proposed_owner = NULL; time(&tk->expires); } static inline void disown_if_expired(struct ticket_config *tk) { if (time(NULL) >= tk->expires || !tk->proposed_owner) disown_ticket(tk); } static inline int all_agree(struct ticket_config *tk) { return tk->proposal_acknowledges == booth_conf->site_bits; } static inline int majority_agree(struct ticket_config *tk) { /* Use ">" to get majority decision, even for an even number * of participants. */ return __builtin_popcount(tk->proposal_acknowledges) * 2 > booth_conf->site_count; } /* We allow half of the uint32_t to be used; * half of that below, half of that above the current known "good" value. * 0 UINT32_MAX * |--------------------------+----------------+------------| * | | | * |--------+-------| allowed range * | * current ballot * * So, on overflow it looks like that: * UINT32_MAX 0 * |--------------------------+-----------||---+------------| * | | | * |--------+-------| allowed range * | * current ballot * * This should be possible by using the same datatype and relying * on the under/overflow semantics. */ static inline int ballot_is_higher_than(uint32_t b_high, uint32_t b_low) { uint32_t diff; if (b_high == b_low) return 0; diff = b_high - b_low; if (diff < UINT32_MAX/4) return 1; diff = b_low - b_high; if (diff < UINT32_MAX/4) return 0; assert(!"ballot out of range - invalid"); } static inline uint32_t ballot_max2(uint32_t a, uint32_t b) { return ballot_is_higher_than(a, b) ? a : b; } static inline uint32_t ballot_max3(uint32_t a, uint32_t b, uint32_t c) { return ballot_max2( ballot_max2(a, b), c); } static inline double timeval_to_float(struct timeval tv) { return tv.tv_sec + tv.tv_usec*(double)1.0e-6; } static inline int timeval_msec(struct timeval tv) { int m; m = tv.tv_usec / 1000; if (m >= 1000) m = 999; return m; } static inline int timeval_compare(struct timeval tv1, struct timeval tv2) { if (tv1.tv_sec < tv2.tv_sec) return -1; if (tv1.tv_sec > tv2.tv_sec) return +1; if (tv1.tv_usec < tv2.tv_usec) return -1; if (tv1.tv_usec > tv2.tv_usec) return +1; return 0; } static inline int timeval_in_past(struct timeval which) { struct timeval tv; gettimeofday(&tv, NULL); return timeval_compare(tv, which) > 0; } static inline time_t next_renewal_starts_at(struct ticket_config *tk) { time_t half_exp, retries_needed; /* If not owner, don't renew. */ if (tk->owner != local) return 0; /* Try to renew at half of expiry time. */ half_exp = tk->expires - tk->expiry/2; /* Also start renewal if we couldn't get * a few message retransmission in the alloted * expiry time. */ - retries_needed = tk->expires - tk->timeout * RETRIES/2; + retries_needed = tk->expires - tk->timeout * tk->retries/2; /* Return earlier timestamp. */ return half_exp < retries_needed ? half_exp : retries_needed; } static inline int should_start_renewal(struct ticket_config *tk) { time_t now, when; when = next_renewal_starts_at(tk); if (!when) return 0; time(&now); return when >= now; } #endif diff --git a/src/paxos.c b/src/paxos.c index fa1d2d2..96d4a0f 100644 --- a/src/paxos.c +++ b/src/paxos.c @@ -1,483 +1,483 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include "booth.h" #include "transport.h" #include "inline-fn.h" #include "config.h" #include "paxos.h" #include "log.h" static uint32_t next_ballot_number(struct ticket_config *tk) { uint32_t b; /* TODO: getenv() for debugging */ b = tk->new_ballot; /* + unique number */ b += local->bitmask; /* + weight */ b += booth_conf->site_bits * tk->weight[ local->index ]; return b; } static inline void set_proposal_in_ticket(struct ticket_config *tk, struct booth_site *from, uint32_t ballot, struct booth_site *new_owner) { tk->proposer = from; tk->new_ballot = ballot; tk->proposed_owner = new_owner; tk->proposal_expires = 0; // TODO - needed? tk->proposal_acknowledges = from->bitmask | local->bitmask; /* We lose (?) */ tk->state = ST_STABLE; } int should_switch_state_p(struct ticket_config *tk) { if (all_agree(tk)) { log_debug("all agree"); return 1; } if (majority_agree(tk)) { /* Time passed, and more than half agree. */ if (timeval_in_past(tk->proposal_switch)) { log_debug("majority, and enough time passed"); return 2; } if (!tk->proposal_switch.tv_sec) { log_debug("majority, wait half a second"); /* Wait half a second before doing the state change. */ ticket_next_cron_in(tk, 0.5); tk->proposal_switch = tk->next_cron; } } return 0; } static int retries_exceeded(struct ticket_config *tk) { tk->retry_number ++; - if (tk->retry_number > RETRIES) { + if (tk->retry_number > tk->retries) { log_info("ABORT %s for ticket \"%s\" - " "not enough answers after %d retries", tk->state == OP_PREPARING ? "prepare" : "propose", tk->name, tk->retry_number); abort_proposal(tk); } else { /* We ask others for a change; retry to get * consensus. * But don't ask again immediately after a * query, give the peers time to answer. */ if (timeval_in_past(tk->proposal_switch)) { ticket_broadcast_proposed_state(tk, tk->state); ticket_activate_timeout(tk); } } return 0; } static inline void change_ticket_owner(struct ticket_config *tk, uint32_t ballot, struct booth_site *new_owner) { /* set "previous" value for next round */ tk->last_ack_ballot = tk->new_ballot = ballot; tk->owner = new_owner; tk->expires = time(NULL) + tk->expiry; tk->proposer = NULL; tk->state = ST_STABLE; set_ticket_wakeup(tk); log_info("Now actively COMMITTED for \"%s\": new owner %s, ballot %d", tk->name, ticket_owner_string(tk->owner), ballot); ticket_write(tk); } void abort_proposal(struct ticket_config *tk) { log_info("ABORTing proposal."); tk->proposer = NULL; tk->proposed_owner = tk->owner; tk->retry_number = 0; /* Ask others (repeatedly) until we know the new owner. */ tk->state = ST_INIT; } int PROPOSE_to_COMMIT(struct ticket_config *tk) { int rv; if (should_switch_state_p(tk)) { change_ticket_owner(tk, tk->new_ballot, tk->proposed_owner); rv = ticket_broadcast_proposed_state(tk, OP_COMMITTED); tk->state = ST_STABLE; return rv; } return retries_exceeded(tk); } int PREPARE_to_PROPOSE(struct ticket_config *tk) { if (should_switch_state_p(tk)) { return ticket_broadcast_proposed_state(tk, OP_PROPOSING); } return retries_exceeded(tk); } /** \defgroup msghdl Message handling functions. * * Not all use all arguments; but to keep the interface the same, * they're all just passed everything we have. * * See also enum \ref cmd_request_t. * @{ */ /** Start a PAXOS round, by sending out an OP_PREPARING. */ int paxos_start_round(struct ticket_config *tk, struct booth_site *new_owner) { if (tk->state != ST_STABLE) return RLT_BUSY; /* This may not be called repeatedly from cron, * because the ballot number would simply * get counted up without any benefit. * The message may get retransmitted, though. * Normal retry behaviour gets achieved during * state OP_PREPARING anyway. */ tk->proposer = local; tk->new_ballot = next_ballot_number(tk); tk->proposed_owner = new_owner; tk->retry_number = 0; ticket_activate_timeout(tk); /* TODO: shorten renew exchange by just sending * a new proposal? Ballot numbers should still be the * same everywhere, owner doesn't change. */ return ticket_broadcast_proposed_state(tk, OP_PREPARING); } /** Answering OP_PREPARING means sending out OP_PROMISING. */ inline static int answer_PREP( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { if (!(local->role & ACCEPTOR)) return 0; /* Ignore if packet is too late, and state is already active. */ if (tk->owner == new_owner && ballot == tk->last_ack_ballot) return 0; /* We have to be careful here. * Getting multiple copies of the same message must not trigger * rejections, but only repeated promises. */ if (from == tk->proposer && ballot == tk->new_ballot) goto promise; /* It doesn't matter whether it's the same or another host; * the only distinction is the ballot number. */ if (ballot > tk->new_ballot) { promise: msg->header.cmd = htonl(OP_PROMISING); msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); set_proposal_in_ticket(tk, from, ballot, new_owner); log_info("PROMISING for ticket \"%s\" (by %s) for %d", tk->name, from->addr_string, ballot); } else { msg->header.cmd = htonl(OP_REJECTED); msg->ticket.ballot = htonl(tk->new_ballot); msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); log_info("REJECTING (prep) for ticket \"%s\" from %s - have %d, wanted %d", tk->name, from->addr_string, tk->new_ballot, ballot); } init_header_bare(&msg->header); return booth_udp_send(from, msg, sizeof(*msg)); } /** Getting OP_REJECTED means abandoning the current operation. */ inline static int handle_REJ( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { if (tk->last_ack_ballot == ballot) { log_debug("got a late REJECTED; ignored, as " "ballot %d is already active.", tk->last_ack_ballot); return 0; } log_info("got REJECTED for ticket \"%s\", ballot %d (has %d), from %s", tk->name, tk->new_ballot, ballot, from->addr_string); abort_proposal(tk); /* TODO: should we check whether that sequence is increasing? */ tk->new_ballot = ballot_max2(tk->new_ballot, ballot); tk->last_ack_ballot = ballot_max2(tk->last_ack_ballot, ntohl(msg->ticket.prev_ballot)); /* No need to ask the others. */ tk->state = ST_STABLE; return 0; } /** After a few OP_PROMISING replies we can send out OP_PROPOSING. */ inline static int got_a_PROM( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { int had_that; if (tk->proposer == local && tk->state == OP_PREPARING && tk->new_ballot == ballot) { had_that = tk->proposal_acknowledges & from->bitmask; tk->proposal_acknowledges |= from->bitmask; log_info("Got PROMISE from %s for \"%s\", for %d, acks now 0x%" PRIx64, from->addr_string, tk->name, tk->new_ballot, tk->proposal_acknowledges); if (had_that) return 0; return PREPARE_to_PROPOSE(tk); } /* Packet just delayed? Silently ignore. */ if (ballot == tk->last_ack_ballot && (new_owner == tk->owner || new_owner == tk->proposed_owner)) return 0; /* Message sent to wrong host? */ log_debug("got unexpected PROMISE from %s for \"%s\"", from->addr_string, tk->name); return 0; } /** Answering OP_PROPOSING means sending out OP_ACCEPTING. */ inline static int answer_PROP( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { if (!(local->role & ACCEPTOR)) return 0; /* Repeated packet. */ if (new_owner == tk->owner && ballot == tk->new_ballot) goto accepting; /* If packet is late, ie. we already have that state, * just repeat the ack - perhaps it got lost. */ if (new_owner == tk->owner && ballot == tk->last_ack_ballot) goto accepting; /* We have to be careful here. * Getting multiple copies of the same message must not trigger * rejections, but only repeated OP_ACCEPTING messages. */ if (ballot > tk->last_ack_ballot && ballot == tk->new_ballot && ntohl(msg->ticket.prev_ballot) == tk->last_ack_ballot) { if (tk->proposer) { /* Send OP_REJECTED to previous proposer? */ log_info("new PROPOSAL for ticket \"%s\" overriding older one from %s", tk->name, from->addr_string); } tk->proposer = from; accepting: init_ticket_msg(msg, OP_ACCEPTING, RLT_SUCCESS, tk); log_info("sending ACCEPT for ticket \"%s\" (by %s) for %d - new owner %s", tk->name, from->addr_string, ballot, ticket_owner_string(new_owner)); change_ticket_owner(tk, ballot, new_owner); } else if (ballot == tk->last_ack_ballot && ballot == tk->new_ballot && ntohl(msg->ticket.prev_ballot) == tk->last_ack_ballot) { /* Silently ignore delayed messages. */ } else { msg->header.cmd = htonl(OP_REJECTED); msg->ticket.ballot = htonl(tk->new_ballot); msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); log_info("REJECTING (prop) for ticket \"%s\" from %s - have %d, wanted %d", tk->name, from->addr_string, tk->new_ballot, ballot); } init_header_bare(&msg->header); return booth_udp_send(from, msg, sizeof(*msg)); } /** After enough OP_ACCEPTING we can do the change, and send an OP_COMMITTED. */ inline static int got_an_ACC( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { if (tk->proposer == local && tk->state == OP_PROPOSING) { tk->proposal_acknowledges |= from->bitmask; log_info("Got ACCEPTING from %s for \"%s\", acks now 0x%" PRIx64, from->addr_string, tk->name, tk->proposal_acknowledges); return PROPOSE_to_COMMIT(tk); } return 0; } /** An OP_COMMITTED gets no answer; just record the new state. */ inline static int answer_COMM( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { /* We cannot check whether the packet is from an expected proposer - * perhaps this is the _only_ message of the whole handshake? */ if (ballot > tk->new_ballot && ntohl(msg->ticket.prev_ballot) == tk->last_ack_ballot) { change_ticket_owner(tk, ballot, new_owner); } /* Send ack? */ return 0; } /** @} */ int paxos_answer( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner_p) { int cmd; cmd = ntohl(msg->header.cmd); /* These are in roughly chronological order. * What the first machine sends is an OP_PREPARING * (see paxos_start_round()), which gets received * (below) from the others ... */ switch (cmd) { case OP_PREPARING: return answer_PREP(tk, from, msg, ballot, new_owner_p); case OP_REJECTED: return handle_REJ(tk, from, msg, ballot, new_owner_p); case OP_PROMISING: return got_a_PROM(tk, from, msg, ballot, new_owner_p); case OP_PROPOSING: return answer_PROP(tk, from, msg, ballot, new_owner_p); case OP_ACCEPTING: return got_an_ACC(tk, from, msg, ballot, new_owner_p); case OP_COMMITTED: return answer_COMM(tk, from, msg, ballot, new_owner_p); default: log_error("unprocessed message, cmd %x", cmd); return -EINVAL; } } diff --git a/src/ticket.h b/src/ticket.h index 5bd60ad..6a44051 100644 --- a/src/ticket.h +++ b/src/ticket.h @@ -1,93 +1,94 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _TICKET_H #define _TICKET_H #include #include #include #include "config.h" #define DEFAULT_TICKET_EXPIRY 600 #define DEFAULT_TICKET_TIMEOUT 10 +#define DEFAULT_RETRIES 10 #define foreach_ticket(i_,t_) for(i=0; (t_=booth_conf->ticket+i, iticket_count); i++) #define foreach_node(i_,n_) for(i=0; (n_=booth_conf->site+i, isite_count); i++) int check_ticket(char *ticket, struct ticket_config **tc); int check_site(char *site, int *local); int do_grant_ticket(struct ticket_config *ticket); int revoke_ticket(struct ticket_config *ticket); int list_ticket(char **pdata, unsigned int *len); int message_recv(struct boothc_ticket_msg *msg, int msglen); int setup_ticket(void); int check_max_len_valid(const char *s, int max); int do_grant_ticket(struct ticket_config *tk); int do_revoke_ticket(struct ticket_config *tk); int find_ticket_by_name(const char *ticket, struct ticket_config **found); void set_ticket_wakeup(struct ticket_config *tk); int ticket_answer_list(int fd, struct boothc_ticket_msg *msg); int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg); int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg); int ticket_broadcast_proposed_state(struct ticket_config *tk, cmd_request_t state); int ticket_write(struct ticket_config *tk); void process_tickets(void); void tickets_log_info(void); static inline void ticket_next_cron_at(struct ticket_config *tk, struct timeval when) { tk->next_cron = when; } static inline void ticket_next_cron_in(struct ticket_config *tk, float seconds) { struct timeval tv; gettimeofday(&tv, NULL); tv.tv_sec += trunc(seconds); tv.tv_usec += (seconds - trunc(seconds)) * 1e6; ticket_next_cron_at(tk, tv); } static inline void ticket_activate_timeout(struct ticket_config *tk) { /* TODO: increase timeout when no answers */ ticket_next_cron_in(tk, tk->timeout); tk->retry_number ++; } #endif /* _TICKET_H */