diff --git a/src/raft.c b/src/raft.c index a22478f..f392dac 100644 --- a/src/raft.c +++ b/src/raft.c @@ -1,570 +1,594 @@ /* * Copyright (C) 2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include "booth.h" #include "transport.h" #include "inline-fn.h" #include "config.h" #include "raft.h" #include "ticket.h" #include "log.h" inline static void clear_election(struct ticket_config *tk) { int i; struct booth_site *site; log_info("clear election"); tk->votes_received = 0; foreach_node(i, site) tk->votes_for[site->index] = NULL; } -inline static void site_voted_for(struct ticket_config *tk, +inline static void record_vote(struct ticket_config *tk, struct booth_site *who, struct booth_site *vote) { log_info("site \"%s\" votes for \"%s\"", site_string(who), site_string(vote)); if (!tk->votes_for[who->index]) { tk->votes_for[who->index] = vote; tk->votes_received |= who->bitmask; } else { if (tk->votes_for[who->index] != vote) log_error("voted previously (but in same term!) for \"%s\"...", tk->votes_for[who->index]->addr_string); } } static void follower_update_ticket(struct ticket_config *tk, struct boothc_ticket_msg *msg) { uint32_t i; int duration; duration = tk->term_duration; if (msg) duration = min(duration, ntohl(msg->ticket.term_valid_for)); tk->term_expires = time(NULL) + duration; if (msg) { i = ntohl(msg->ticket.term); tk->current_term = max(i, tk->current_term); /* § 5.3 */ i = ntohl(msg->ticket.leader_commit); tk->commit_index = max(i, tk->commit_index); } } static void become_follower(struct ticket_config *tk, struct boothc_ticket_msg *msg) { tk->state = ST_FOLLOWER; follower_update_ticket(tk, msg); } -static struct booth_site *majority_votes(struct ticket_config *tk) +struct booth_site *majority_votes(struct ticket_config *tk) { int i, n; struct booth_site *v; int count[MAX_NODES] = { 0, }; for(i=0; isite_count; i++) { v = tk->votes_for[i]; if (!v) continue; n = v->index; count[n]++; log_info("Majority: %d \"%s\" wants %d \"%s\" => %d", i, booth_conf->site[i].addr_string, n, v->addr_string, count[n]); if (count[n]*2 <= booth_conf->site_count) continue; log_info("Majority reached: %d of %d for \"%s\"", count[n], booth_conf->site_count, v->addr_string); return v; } return NULL; } +static int all_voted(struct ticket_config *tk) +{ + int i, cnt = 0; + + for(i=0; isite_count; i++) { + if (tk->votes_for[i]) { + cnt++; + } + } + + return (cnt == booth_conf->site_count); +} + + static int newer_term(struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg) { uint32_t term; term = ntohl(msg->ticket.term); /* §5.1 */ if (term > tk->current_term) { tk->state = ST_FOLLOWER; tk->leader = leader; log_info("higher term %d vs. %d, following \"%s\"", term, tk->current_term, ticket_leader_string(tk)); tk->term_expires = time(NULL) + tk->term_duration; tk->current_term = term; return 1; } return 0; } static int term_too_low(struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg) { uint32_t term; term = ntohl(msg->ticket.term); /* §5.1 */ if (term < tk->current_term) { log_info("sending REJECT, term too low."); send_reject(sender, tk, RLT_TERM_OUTDATED); return 1; } return 0; } /* For follower. */ static int answer_HEARTBEAT ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; struct boothc_ticket_msg omsg; term = ntohl(msg->ticket.term); log_debug("leader: %s, have %s; term %d vs %d", site_string(leader), ticket_leader_string(tk), term, tk->current_term); /* No reject. (?) */ if (term < tk->current_term) { log_info("ignoring lower term %d vs. %d, from \"%s\"", term, tk->current_term, ticket_leader_string(tk)); return 0; } /* Needed? */ newer_term(tk, sender, leader, msg); become_follower(tk, msg); /* Racy??? */ assert(sender == leader || !leader); tk->leader = leader; /* Ack the heartbeat (we comply). */ init_ticket_msg(&omsg, OP_HEARTBEAT, RLT_SUCCESS, tk); return booth_udp_send(sender, &omsg, sizeof(omsg)); } static int process_UPDATE ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; term = ntohl(msg->ticket.term); log_debug("leader: %s, have %s; term %d vs %d", site_string(leader), ticket_leader_string(tk), term, tk->current_term); /* No reject. (?) */ if (term < tk->current_term) { log_warn("ignoring lower term %d vs. %d, from \"%s\"", term, tk->current_term, ticket_leader_string(tk)); return 0; } follower_update_ticket(tk, msg); ticket_write(tk); /* run ticket_cron if the ticket expires */ set_ticket_wakeup(tk); return 0; } /* update the ticket on the leader, write it to the CIB, and send out the update message to others with the new expiry time */ static int leader_update_ticket(struct ticket_config *tk) { struct boothc_ticket_msg msg; tk->term_expires = time(NULL) + tk->term_duration; tk->retry_number = 0; ticket_write(tk); set_ticket_wakeup(tk); init_ticket_msg(&msg, OP_UPDATE, RLT_SUCCESS, tk); return transport()->broadcast(&msg, sizeof(msg)); } /* For leader. */ static int process_HEARTBEAT( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; if (newer_term(tk, sender, leader, msg)) { /* Uh oh. Higher term?? Should we simply believe that? */ log_error("Got higher term number from"); return 0; } term = ntohl(msg->ticket.term); /* Don't send a reject. */ if (term < tk->current_term) { /* Doesn't know what he's talking about - perhaps * doesn't receive our packets? */ log_error("Stale/wrong heartbeat from \"%s\": " "term %d instead of %d", site_string(sender), term, tk->current_term); return 0; } if (term == tk->current_term && leader == tk->leader) { /* Hooray, an ACK! */ /* So at least _someone_ is listening. */ tk->hb_received |= sender->bitmask; log_debug("Got heartbeat ACK from \"%s\", %d/%d agree.", site_string(sender), count_bits(tk->hb_received), booth_conf->site_count); if (majority_of_bits(tk, tk->hb_received)) { /* OK, at least half of the nodes are reachable; * Update the ticket and send update messages out */ if( !tk->majority_acks_received ) { /* Write the ticket to the CIB and set the next * wakeup time (but do that only once) */ tk->majority_acks_received = 1; return leader_update_ticket(tk); } } } return 0; } +void leader_elected( + struct ticket_config *tk, + struct booth_site *new_leader + ) +{ + if (new_leader) { + tk->leader = new_leader; + + tk->term_expires = time(NULL) + tk->term_duration; + tk->election_end = 0; + tk->voted_for = NULL; + + if (new_leader == local) { + tk->commit_index++; // ?? + tk->state = ST_LEADER; + send_heartbeat(tk); + } + else + become_follower(tk, NULL); + } +} + + static int process_VOTE_FOR( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { - uint32_t term; - struct booth_site *new_leader; - - - term = ntohl(msg->ticket.term); if (term_too_low(tk, sender, leader, msg)) return 0; - - if (term == tk->current_term && - tk->election_end < time(NULL)) { - /* Election already ended - either by time or majority. - * Ignore. */ - return 0; - } - - if (newer_term(tk, sender, leader, msg)) { clear_election(tk); } - site_voted_for(tk, sender, leader); + record_vote(tk, sender, leader); - /* §5.2 */ - new_leader = majority_votes(tk); - if (new_leader) { - tk->leader = new_leader; + if (tk->state != ST_CANDIDATE) { + /* lost candidate status, somebody rejected our proposal */ + return 0; + } - tk->term_expires = time(NULL) + tk->term_duration; - tk->election_end = 0; - tk->voted_for = NULL; - if ( new_leader == local) { - tk->commit_index++; // ?? - tk->state = ST_LEADER; - send_heartbeat(tk); - } - else - become_follower(tk, NULL); + /* only if all voted can we take the ticket now, otherwise + * wait for timeout in ticket_cron */ + if (all_voted(tk)) { + /* §5.2 */ + leader_elected(tk, majority_votes(tk)); + set_ticket_wakeup(tk); } - set_ticket_wakeup(tk); return 0; } static int process_REJECTED( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t rv; rv = ntohl(msg->header.result); if (tk->state == ST_CANDIDATE && rv == RLT_TERM_OUTDATED) { - log_info("Am out of date, become follower."); + log_warn("from %s: ticket %s outdated (term %d), following %s", + site_string(sender), + tk->name, ntohl(msg->ticket.term), + site_string(leader) + ); tk->leader = leader; become_follower(tk, msg); return 0; } if (tk->state == ST_CANDIDATE && rv == RLT_TERM_STILL_VALID) { - log_error("There's a leader that I don't see: \"%s\"", + log_warn("from %s: there's a leader that I don't see: \"%s\"", + site_string(sender), site_string(leader)); tk->leader = leader; become_follower(tk, msg); return 0; } - log_error("unhandled reject: in state %s, got %s.", + log_warn("from %s: in state %s, got %s (unexpected reject)", + site_string(sender), state_to_string(tk->state), state_to_string(rv)); tk->leader = leader; become_follower(tk, msg); return 0; } /* §5.2 */ static int answer_REQ_VOTE( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; int valid; struct boothc_ticket_msg omsg; if (term_too_low(tk, sender, leader, msg)) return 0; if (newer_term(tk, sender, leader, msg)) { clear_election(tk); goto vote_for_sender; } term = ntohl(msg->ticket.term); /* Important: Ignore duplicated packets! */ valid = term_time_left(tk); if (valid && term == tk->current_term && sender == tk->leader) { log_debug("Duplicate OP_VOTE_FOR ignored."); return 0; } if (valid) { log_debug("no election allowed, term valid for %d??", valid); return send_reject(sender, tk, RLT_TERM_STILL_VALID); } /* §5.2, §5.4 */ if (!tk->voted_for) { vote_for_sender: tk->voted_for = sender; - site_voted_for(tk, sender, leader); + record_vote(tk, sender, leader); goto yes_you_can; } yes_you_can: init_ticket_msg(&omsg, OP_VOTE_FOR, RLT_SUCCESS, tk); omsg.ticket.leader = htonl(get_node_id(tk->voted_for)); return transport()->broadcast(&omsg, sizeof(omsg)); } int new_election(struct ticket_config *tk, struct booth_site *preference) { struct booth_site *new_leader; time_t now; time(&now); log_debug("start new election?, now=%" PRIi64 ", end %" PRIi64, (int64_t)now, (int64_t)(tk->election_end)); if (now <= tk->election_end) return 0; /* §5.2 */ /* If there was _no_ answer, don't keep incrementing the term number * indefinitely. If there was no peer, there'll probably be no one * listening now either. However, we don't know if we were * invoked due to a timeout. It's up to the caller to * increment the term! tk->current_term++; */ tk->term_expires = 0; tk->election_end = now + tk->term_duration; log_debug("start new election! term=%d, until %" PRIi64, tk->current_term, (int64_t)tk->election_end); clear_election(tk); if(preference) new_leader = preference; else new_leader = (local->type == SITE) ? local : NULL; - site_voted_for(tk, local, new_leader); + record_vote(tk, local, new_leader); tk->voted_for = new_leader; tk->state = ST_CANDIDATE; ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS); ticket_activate_timeout(tk); return 0; } int raft_answer( struct ticket_config *tk, struct booth_site *from, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int cmd; int rv; rv = 0; cmd = ntohl(msg->header.cmd); R(tk); log_debug("got message %s from \"%s\"", state_to_string(cmd), from->addr_string); switch (cmd) { case OP_REQ_VOTE: rv = answer_REQ_VOTE(tk, from, leader, msg); break; case OP_VOTE_FOR: rv = process_VOTE_FOR(tk, from, leader, msg); break; case OP_HEARTBEAT: if (tk->leader == local && tk->state == ST_LEADER) rv = process_HEARTBEAT(tk, from, leader, msg); else if (tk->leader != local && tk->state == ST_FOLLOWER) rv = answer_HEARTBEAT(tk, from, leader, msg); else assert("invalid combination - leader, follower"); break; case OP_UPDATE: if (tk->leader != local && tk->state == ST_FOLLOWER) { rv = process_UPDATE(tk, from, leader, msg); } else { log_error("unexpected message, cmd %s, from %s", state_to_string(cmd), from->addr_string); rv = -EINVAL; } break; case OP_REJECTED: rv = process_REJECTED(tk, from, leader, msg); break; default: log_error("unprocessed message, cmd %s, from %s", state_to_string(cmd), from->addr_string); rv = -EINVAL; } R(tk); return rv; } diff --git a/src/raft.h b/src/raft.h index b8016c6..17c8f69 100644 --- a/src/raft.h +++ b/src/raft.h @@ -1,43 +1,47 @@ /* * Copyright (C) 2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _RAFT_H #define _RAFT_H #include "booth.h" typedef enum { ST_INIT = CHAR2CONST('I', 'n', 'i', 't'), ST_FOLLOWER = CHAR2CONST('F', 'l', 'l', 'w'), ST_CANDIDATE = CHAR2CONST('C', 'n', 'd', 'i'), ST_LEADER = CHAR2CONST('L', 'e', 'a', 'd'), } server_state_e; - struct ticket_config; +void leader_elected(struct ticket_config *tk, + struct booth_site *new_leader); + +struct booth_site *majority_votes(struct ticket_config *tk); + int raft_answer(struct ticket_config *tk, struct booth_site *from, struct booth_site *leader, struct boothc_ticket_msg *msg); int new_election(struct ticket_config *tk, struct booth_site *new_leader); #endif /* _RAFT_H */ diff --git a/src/ticket.c b/src/ticket.c index 649c881..f8d0c22 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -1,668 +1,675 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include "ticket.h" #include "config.h" #include "pacemaker.h" #include "inline-fn.h" #include "log.h" #include "booth.h" #include "raft.h" #include "handler.h" #define TK_LINE 256 /* Untrusted input, must fit (incl. \0) in a buffer of max chars. */ int check_max_len_valid(const char *s, int max) { int i; for(i=0; iticket_count; i++) { if (!strcmp(booth_conf->ticket[i].name, ticket)) { if (found) *found = booth_conf->ticket + i; return 1; } } return 0; } int check_ticket(char *ticket, struct ticket_config **found) { if (found) *found = NULL; if (!booth_conf) return 0; if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name))) return 0; return find_ticket_by_name(ticket, found); } int check_site(char *site, int *is_local) { struct booth_site *node; if (!check_max_len_valid(site, sizeof(node->addr_string))) return 0; if (find_site_by_name(site, &node, 0)) { *is_local = node->local; return 1; } return 0; } #if 0 /** Find out what others think about this ticket. * * If we're a SITE, we can ask (and have to tell) Pacemaker. * An ARBITRATOR can only ask others. */ static int ticket_send_catchup(struct ticket_config *tk) { int i, rv = 0; struct booth_site *site; struct boothc_ticket_msg msg; foreach_node(i, site) { if (!site->local) { init_ticket_msg(&msg, CMD_CATCHUP, RLT_SUCCESS, tk); log_debug("attempting catchup from %s", site->addr_string); rv = booth_udp_send(site, &msg, sizeof(msg)); } } ticket_activate_timeout(tk); return rv; } #endif int ticket_write(struct ticket_config *tk) { if (local->type != SITE) return -EINVAL; disown_if_expired(tk); if (tk->leader == local) { pcmk_handler.grant_ticket(tk); } else { pcmk_handler.revoke_ticket(tk); } return 0; } /* Ask an external program whether getting the ticket * makes sense. * Eg. if the services have a failcount of INFINITY, * we can't serve here anyway. */ int get_ticket_locally_if_allowed(struct ticket_config *tk) { int rv; if (!tk->ext_verifier) goto get_it; rv = run_handler(tk, tk->ext_verifier, 1); if (rv) { log_error("May not acquire ticket."); /* Give it to somebody else. * Just send a commit message, as the * others couldn't help anyway. */ if (leader_and_valid(tk)) { disown_ticket(tk); #if 0 tk->proposed_owner = NULL; /* Just go one further - others may easily override. */ tk->new_ballot++; ticket_broadcast_proposed_state(tk, OP_COMMITTED); tk->state = ST_STABLE; #endif ticket_broadcast(tk, OP_VOTE_FOR, RLT_SUCCESS); } return rv; } else { log_info("May get/keep ticket."); } get_it: if (leader_and_valid(tk)) { return send_heartbeat(tk); } else { /* Ticket should now become active locally, wasn't before. */ tk->current_term++; new_election(tk, local); return rv; } } /** Try to get the ticket for the local site. * */ int do_grant_ticket(struct ticket_config *tk) { int rv; if (tk->leader == local) return RLT_SUCCESS; if (is_owned(tk)) return RLT_OVERGRANT; rv = get_ticket_locally_if_allowed(tk); return rv; } /** Ticket revoke. * Only to be started from the leader. */ int do_revoke_ticket(struct ticket_config *tk) { log_info("revoke ticket %s", tk->name); disown_ticket(tk); tk->voted_for = no_leader; ticket_write(tk); // only when majority wants that? or, if tk->leader was == local, in every case, because the ticket shouldn't be here anymore? /* 1) lose ticket * 2) if majority is available, "none" gets it * 3) if majority not available, they might have voted for somebody else in the meantime anyway */ tk->state = ST_FOLLOWER; /* Start a new vote round, with a new term number, to let * everybody know that the ticket got revoked */ tk->current_term++; return ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS); } int list_ticket(char **pdata, unsigned int *len) { struct ticket_config *tk; char timeout_str[64]; char *data, *cp; int i, alloc; *pdata = NULL; *len = 0; alloc = 256 + booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128); data = malloc(alloc); if (!data) return -ENOMEM; cp = data; foreach_ticket(i, tk) { if (tk->term_expires != 0) strftime(timeout_str, sizeof(timeout_str), "%F %T", localtime(&tk->term_expires)); else strcpy(timeout_str, "INF"); cp += sprintf(cp, "ticket: %s, leader: %s, expires: %s, commit: %d\n", tk->name, ticket_leader_string(tk), timeout_str, tk->commit_index); *len = cp - data; assert(*len < alloc); } *pdata = data; return 0; } int setup_ticket(void) { struct ticket_config *tk; int i; foreach_ticket(i, tk) { tk->leader = NULL; tk->term_expires = 0; // abort_proposal(tk); if (local->type == SITE) { pcmk_handler.load_ticket(tk); } /* We'll start the election immediately if the ticket * belonged to us */ /* tk->term_expires = time(NULL) + tk->term_duration;*/ tk->state = ST_FOLLOWER; /* TODO: send query packet to see sooner who's online. */ } return 0; } int ticket_answer_list(int fd, struct boothc_ticket_msg *msg) { char *data; int olen, rv; struct boothc_header hdr; rv = list_ticket(&data, &olen); if (rv < 0) return rv; init_header(&hdr, CMR_LIST, RLT_SUCCESS, sizeof(hdr) + olen); return send_header_plus(fd, &hdr, data, olen); } int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { log_error("Client asked to grant unknown ticket"); rv = RLT_INVALID_ARG; goto reply; } if (is_owned(tk)) { log_error("client wants to get an (already granted!) ticket \"%s\"", msg->ticket.id); rv = RLT_OVERGRANT; goto reply; } rv = do_grant_ticket(tk); reply: init_header(&msg->header, CMR_GRANT, rv ?: RLT_ASYNC, sizeof(*msg)); return send_ticket_msg(fd, msg); } int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { log_error("Client wants to revoke an unknown ticket %s", msg->ticket.id); rv = RLT_INVALID_ARG; goto reply; } if (!is_owned(tk)) { log_info("client wants to revoke a free ticket \"%s\"", msg->ticket.id); /* Return a different result code? */ rv = RLT_SUCCESS; goto reply; } if (tk->leader != local) { log_info("we do not own the ticket \"%s\", " "redirect to leader %s", msg->ticket.id, ticket_leader_string(tk)); /* Return a different result code? */ rv = RLT_REDIRECT; goto reply; } rv = do_revoke_ticket(tk); if (rv == 0) rv = RLT_ASYNC; reply: init_ticket_msg(msg, CMR_REVOKE, rv, tk); return send_ticket_msg(fd, msg); } int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_result_t res) { struct boothc_ticket_msg msg; init_ticket_msg(&msg, cmd, res, tk); log_debug("broadcasting '%s' for ticket \"%s\" (term=%d, valid=%d)", state_to_string(cmd), tk->name, ntohl(msg.ticket.term), ntohl(msg.ticket.term_valid_for)); return transport()->broadcast(&msg, sizeof(msg)); } static void ticket_cron(struct ticket_config *tk) { time_t now; int rv; int vote_cnt; + struct booth_site *new_leader; now = time(NULL); R(tk); /* Has an owner, has an expiry date, and expiry date in the past? * Losing the ticket must happen in _every_ state. */ if (tk->term_expires && is_owned(tk) && now >= tk->term_expires) { log_info("LOST ticket: \"%s\" no longer at %s", tk->name, ticket_leader_string(tk)); /* Couldn't renew in time - ticket lost. */ disown_ticket(tk); /* New vote round; §5.2 */ if (local->type == SITE) { tk->current_term++; new_election(tk, NULL); } ticket_write(tk); /* May not try to re-acquire now, need to find out * what others think. */ return; } R(tk); switch(tk->state) { case ST_INIT: /* Unknown state, ask others. */ // ticket_send_catchup(tk); break; case ST_FOLLOWER: /* in case we got restarted and this ticket belongs to * us */ if (tk->is_granted && tk->leader == local) { tk->current_term++; new_election(tk, NULL); } break; case ST_CANDIDATE: /* §5.2 */ - /* This is previous election timed out */ - if (now > tk->election_end) + /* not everybody answered, but if we have majority... */ + new_leader = majority_votes(tk); + if (new_leader) { + leader_elected(tk, new_leader); + set_ticket_wakeup(tk); + } else if (now > tk->election_end) { + /* This is previous election timed out */ new_election(tk, NULL); + } break; case ST_LEADER: /* we get here after we broadcasted a heartbeat; * by this time all sites should've acked the heartbeat */ /* if (tk->hb_sent_at + tk->timeout <= now) { */ vote_cnt = count_bits(tk->hb_received) - 1; if ((vote_cnt+1) < booth_conf->site_count) { if (!majority_of_bits(tk, tk->hb_received)) { tk->retry_number ++; if (!vote_cnt) { log_warn("no answers to heartbeat for ticket %s on try #%d, " "we are most probably alone!", tk->name, tk->retry_number); } else { log_warn("not enough answers to heartbeat for ticket %s on try #%d: " "only got %d answers (mask 0x%" PRIx64 ")!", tk->name, tk->retry_number, vote_cnt, tk->hb_received); } /* Don't give up, though - there's still some time until leadership is lost. */ } } rv = run_handler(tk, tk->ext_verifier, 1); if (rv) { tk->state = ST_FOLLOWER; disown_ticket(tk); // resp. no owner anymore, new takers? ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS); ticket_write(tk); } else { send_heartbeat(tk); ticket_activate_timeout(tk); } break; default: break; } R(tk); } void process_tickets(void) { struct ticket_config *tk; int i; struct timeval now; float sec_until; gettimeofday(&now, NULL); foreach_ticket(i, tk) { sec_until = timeval_to_float(tk->next_cron) - timeval_to_float(now); if (0) log_debug("ticket %s next cron %" PRIx64 ".%03d, " "now %" PRIx64 "%03d, in %f", tk->name, (uint64_t)tk->next_cron.tv_sec, timeval_msec(tk->next_cron), (uint64_t)now.tv_sec, timeval_msec(now), sec_until); if (sec_until > 0.0) continue; log_debug("ticket cron: doing %s", tk->name); /* Set next value, handler may override. * This should already be handled via the state logic; * but to be on the safe side the renew repetition is * duplicated here, too. */ set_ticket_wakeup(tk); ticket_cron(tk); } } void tickets_log_info(void) { struct ticket_config *tk; int i; foreach_ticket(i, tk) { log_info("Ticket %s: state '%s' " "commit index %d " "leader \"%s\" " "expires %-24.24s", tk->name, state_to_string(tk->state), tk->commit_index, ticket_leader_string(tk), ctime(&tk->term_expires)); } } /* UDP message receiver. */ int message_recv(struct boothc_ticket_msg *msg, int msglen) { uint32_t from; struct booth_site *source; struct ticket_config *tk; struct booth_site *leader; uint32_t leader_u; if (check_boothc_header(&msg->header, sizeof(*msg)) < 0 || msglen != sizeof(*msg)) { log_error("message receive error"); return -1; } from = ntohl(msg->header.from); if (!find_site_by_id(from, &source) || !source) { log_error("unknown sender: %08x", from); return -1; } if (!check_ticket(msg->ticket.id, &tk)) { log_error("got invalid ticket name \"%s\" from %s", msg->ticket.id, source->addr_string); return -EINVAL; } leader_u = ntohl(msg->ticket.leader); if (!find_site_by_id(leader_u, &leader)) { log_error("Message with unknown owner %x received", leader_u); return -EINVAL; } return raft_answer(tk, source, leader, msg); } void set_ticket_wakeup(struct ticket_config *tk) { struct timeval tv, now; /* At least every hour, perhaps sooner. */ ticket_next_cron_in(tk, 3600); switch (tk->state) { case ST_LEADER: assert(tk->leader == local); gettimeofday(&now, NULL); tv = now; tv.tv_sec = next_vote_starts_at(tk); /* If timestamp is in the past, look again in one second. */ if (timeval_compare(tv, now) <= 0) tv.tv_sec = now.tv_sec + 1; ticket_next_cron_at(tk, tv); break; case ST_CANDIDATE: assert(tk->election_end); ticket_next_cron_at_coarse(tk, tk->election_end); break; case ST_INIT: case ST_FOLLOWER: /* If there is (or should be) some owner, check on it later on. * If no one is interested - don't care. */ if (is_owned(tk) && (local->type == SITE)) ticket_next_cron_at_coarse(tk, tk->term_expires + tk->acquire_after); break; default: log_error("why here?"); } } /* Given a state (in host byte order), return a human-readable (char*). * An array is used so that multiple states can be printed in a single printf(). */ char *state_to_string(uint32_t state_ho) { union mu { cmd_request_t s; char c[5]; }; static union mu cache[6] = { { 0 } }, *cur; static int current = 0; current ++; if (current >= sizeof(cache)/sizeof(cache[0])) current = 0; cur = cache + current; cur->s = htonl(state_ho); /* Shouldn't be necessary, union array is initialized with zeroes, and * these bytes never get written. */ cur->c[4] = 0; return cur->c; } int send_reject(struct booth_site *dest, struct ticket_config *tk, cmd_result_t code) { struct boothc_ticket_msg msg; init_ticket_msg(&msg, OP_REJECTED, code, tk); return booth_udp_send(dest, &msg, sizeof(msg)); }