diff --git a/src/inline-fn.h b/src/inline-fn.h index fa80d4c..03e7c7a 100644 --- a/src/inline-fn.h +++ b/src/inline-fn.h @@ -1,313 +1,313 @@ /* * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _INLINE_FN_H #define _INLINE_FN_H #include #include #include #include #include "config.h" #include "ticket.h" #include "transport.h" inline static uint32_t get_local_id(void) { return local ? local->site_id : -1; } inline static uint32_t get_node_id(struct booth_site *node) { return node ? node->site_id : NO_ONE; } -inline static int term_valid_for(const struct ticket_config *tk) +inline static int term_time_left(const struct ticket_config *tk) { int left; left = tk->term_expires - time(NULL); return (left < 0) ? 0 : left; } /** Returns number of seconds left, if any. */ inline static int leader_and_valid(const struct ticket_config *tk) { if (tk->leader != local) return 0; - return term_valid_for(tk); + return term_time_left(tk); } /** Is this some leader? */ inline static int is_owned(const struct ticket_config *tk) { return (tk->leader && tk->leader != no_leader); } static inline void init_header_bare(struct boothc_header *h) { h->magic = htonl(BOOTHC_MAGIC); h->version = htonl(BOOTHC_VERSION); h->from = htonl(local->site_id); h->iv = htonl(0); h->auth1 = htonl(0); h->auth2 = htonl(0); } static inline void init_header(struct boothc_header *h, int cmd, int result, int data_len) { init_header_bare(h); h->length = htonl(data_len); h->cmd = htonl(cmd); h->result = htonl(result); } static inline void init_ticket_site_header(struct boothc_ticket_msg *msg, int cmd) { init_header(&msg->header, cmd, 0, sizeof(*msg)); } static inline void init_ticket_msg(struct boothc_ticket_msg *msg, int cmd, int rv, struct ticket_config *tk) { assert(sizeof(msg->ticket.id) == sizeof(tk->name)); init_header(&msg->header, cmd, rv, sizeof(*msg)); if (!tk) { memset(&msg->ticket, 0, sizeof(msg->ticket)); } else { memcpy(msg->ticket.id, tk->name, sizeof(msg->ticket.id)); msg->ticket.leader = htonl(get_node_id(tk->leader ? tk->leader : tk->voted_for)); msg->ticket.term = htonl(tk->current_term); - msg->ticket.term_valid_for = htonl(term_valid_for(tk)); + msg->ticket.term_valid_for = htonl(term_time_left(tk)); msg->ticket.leader_commit = htonl(tk->commit_index); } } static inline struct booth_transport const *transport(void) { return booth_transport + booth_conf->proto; } static inline const char *site_string(struct booth_site *site) { return site ? site->addr_string : "NONE"; } static inline const char *ticket_leader_string(struct ticket_config *tk) { return site_string(tk->leader); } static inline void disown_ticket(struct ticket_config *tk) { tk->leader = NULL; tk->is_granted = 0; time(&tk->term_expires); } static inline int disown_if_expired(struct ticket_config *tk) { if (time(NULL) >= tk->term_expires || !tk->leader) { disown_ticket(tk); return 1; } return 0; } /* We allow half of the uint32_t to be used; * half of that below, half of that above the current known "good" value. * 0 UINT32_MAX * |--------------------------+----------------+------------| * | | | * |--------+-------| allowed range * | * current commit index * * So, on overflow it looks like that: * UINT32_MAX 0 * |--------------------------+-----------||---+------------| * | | | * |--------+-------| allowed range * | * current commit index * * This should be possible by using the same datatype and relying * on the under/overflow semantics. * * * Having 30 bits available, and assuming an expire time of * one minute and a (high) commit index step of 64 == 2^6 (because * of weights), we get 2^24 minutes of range - which is ~750 * years. "Should be enough for everybody." */ static inline int index_is_higher_than(uint32_t c_high, uint32_t c_low) { uint32_t diff; if (c_high == c_low) return 0; diff = c_high - c_low; if (diff < UINT32_MAX/4) return 1; diff = c_low - c_high; if (diff < UINT32_MAX/4) return 0; assert(!"commit index out of range - invalid"); } static inline uint32_t index_max2(uint32_t a, uint32_t b) { return index_is_higher_than(a, b) ? a : b; } static inline uint32_t index_max3(uint32_t a, uint32_t b, uint32_t c) { return index_max2( index_max2(a, b), c); } static inline double timeval_to_float(struct timeval tv) { return tv.tv_sec + tv.tv_usec*(double)1.0e-6; } static inline int timeval_msec(struct timeval tv) { int m; m = tv.tv_usec / 1000; if (m >= 1000) m = 999; return m; } static inline int timeval_compare(struct timeval tv1, struct timeval tv2) { if (tv1.tv_sec < tv2.tv_sec) return -1; if (tv1.tv_sec > tv2.tv_sec) return +1; if (tv1.tv_usec < tv2.tv_usec) return -1; if (tv1.tv_usec > tv2.tv_usec) return +1; return 0; } static inline int timeval_in_past(struct timeval which) { struct timeval tv; gettimeofday(&tv, NULL); return timeval_compare(tv, which) > 0; } static inline time_t next_vote_starts_at(struct ticket_config *tk) { time_t half_exp, retries_needed; /* If not owner, don't renew. */ if (tk->leader != local) return 0; /* Try to renew at half of expiry time. */ half_exp = tk->term_expires - tk->term_duration/2; /* Also start renewal if we couldn't get * a few message retransmission in the alloted * expiry time. */ retries_needed = tk->term_expires - tk->timeout * tk->retries/2; /* Return earlier timestamp. */ return half_exp < retries_needed ? half_exp : retries_needed; } static inline int should_start_renewal(struct ticket_config *tk) { time_t now, when; when = next_vote_starts_at(tk); if (!when) return 0; time(&now); return when <= now; } static inline int send_heartbeat(struct ticket_config *tk) { tk->hb_received = local->bitmask; tk->hb_sent_at = time(NULL); tk->majority_acks_received = 0; return ticket_broadcast(tk, OP_HEARTBEAT, RLT_SUCCESS); } static inline struct booth_site *my_vote(struct ticket_config *tk) { return tk->votes_for[ local->index ]; } static inline int count_bits(uint64_t val) { return __builtin_popcount(val); } static inline int majority_of_bits(struct ticket_config *tk, uint64_t val) { /* Use ">" to get majority decision, even for an even number * of participants. */ return count_bits(val) * 2 > booth_conf->site_count; } #endif diff --git a/src/raft.c b/src/raft.c index 96514d2..2a9371b 100644 --- a/src/raft.c +++ b/src/raft.c @@ -1,514 +1,514 @@ /* * Copyright (C) 2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include "booth.h" #include "transport.h" #include "inline-fn.h" #include "config.h" #include "raft.h" #include "ticket.h" #include "log.h" inline static void clear_election(struct ticket_config *tk) { int i; struct booth_site *site; log_info("clear election"); tk->votes_received = 0; foreach_node(i, site) tk->votes_for[site->index] = NULL; } inline static void site_voted_for(struct ticket_config *tk, struct booth_site *who, struct booth_site *vote) { log_info("site \"%s\" votes for \"%s\"", site_string(who), site_string(vote)); if (!tk->votes_for[who->index]) { tk->votes_for[who->index] = vote; tk->votes_received |= who->bitmask; } else { if (tk->votes_for[who->index] != vote) log_error("voted previously (but in same term!) for \"%s\"...", tk->votes_for[who->index]->addr_string); } } static void become_follower(struct ticket_config *tk, struct boothc_ticket_msg *msg) { uint32_t i; int duration; tk->state = ST_FOLLOWER; duration = tk->term_duration; if (msg) duration = min(duration, ntohl(msg->ticket.term_valid_for)); tk->term_expires = time(NULL) + duration; if (msg) { i = ntohl(msg->ticket.term); tk->current_term = max(i, tk->current_term); /* § 5.3 */ i = ntohl(msg->ticket.leader_commit); tk->commit_index = max(i, tk->commit_index); } ticket_write(tk); } static struct booth_site *majority_votes(struct ticket_config *tk) { int i, n; struct booth_site *v; int count[MAX_NODES] = { 0, }; for(i=0; isite_count; i++) { v = tk->votes_for[i]; if (!v) continue; n = v->index; count[n]++; log_info("Majority: %d \"%s\" wants %d \"%s\" => %d", i, booth_conf->site[i].addr_string, n, v->addr_string, count[n]); if (count[n]*2 <= booth_conf->site_count) continue; log_info("Majority reached: %d of %d for \"%s\"", count[n], booth_conf->site_count, v->addr_string); return v; } return NULL; } static int newer_term(struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg) { uint32_t term; term = ntohl(msg->ticket.term); /* §5.1 */ if (term > tk->current_term) { tk->state = ST_FOLLOWER; tk->leader = leader; log_info("higher term %d vs. %d, following \"%s\"", term, tk->current_term, ticket_leader_string(tk)); tk->term_expires = time(NULL) + tk->term_duration; tk->current_term = term; return 1; } return 0; } static int term_too_low(struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg) { uint32_t term; term = ntohl(msg->ticket.term); /* §5.1 */ if (term < tk->current_term) { log_info("sending REJECT, term too low."); send_reject(sender, tk, RLT_TERM_OUTDATED); return 1; } return 0; } /* For follower. */ static int answer_HEARTBEAT ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; struct boothc_ticket_msg omsg; term = ntohl(msg->ticket.term); log_debug("leader: %s, have %s; term %d vs %d", site_string(leader), ticket_leader_string(tk), term, tk->current_term); /* No reject. (?) */ if (term < tk->current_term) { log_info("ignoring lower term %d vs. %d, from \"%s\"", term, tk->current_term, ticket_leader_string(tk)); return 0; } /* Needed? */ newer_term(tk, sender, leader, msg); become_follower(tk, msg); /* Racy??? */ assert(sender == leader || !leader); tk->leader = leader; /* run ticket_cron if the ticket expires */ set_ticket_wakeup(tk); /* Ack the heartbeat (we comply). */ init_ticket_msg(&omsg, OP_HEARTBEAT, RLT_SUCCESS, tk); return booth_udp_send(sender, &omsg, sizeof(omsg)); } /* For leader. */ static int process_HEARTBEAT( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; if (newer_term(tk, sender, leader, msg)) { /* Uh oh. Higher term?? Should we simply believe that? */ log_error("Got higher term number from"); return 0; } term = ntohl(msg->ticket.term); /* Don't send a reject. */ if (term < tk->current_term) { /* Doesn't know what he's talking about - perhaps * doesn't receive our packets? */ log_error("Stale/wrong heartbeat from \"%s\": " "term %d instead of %d", site_string(sender), term, tk->current_term); return 0; } if (term == tk->current_term && leader == tk->leader) { /* Hooray, an ACK! */ /* So at least _someone_ is listening. */ tk->hb_received |= sender->bitmask; log_debug("Got heartbeat ACK from \"%s\", %d/%d agree.", site_string(sender), count_bits(tk->hb_received), booth_conf->site_count); if (majority_of_bits(tk, tk->hb_received)) { /* OK, at least half of the nodes are reachable; * no need to do anything until * the next heartbeat should be sent. */ if( !tk->majority_acks_received ) { /* Write the ticket to the CIB and set the next * wakeup time (but do that only once) */ tk->majority_acks_received = 1; set_ticket_wakeup(tk); tk->retry_number = 0; ticket_write(tk); } } } return 0; } static int process_VOTE_FOR( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; struct booth_site *new_leader; term = ntohl(msg->ticket.term); if (term_too_low(tk, sender, leader, msg)) return 0; if (term == tk->current_term && tk->election_end < time(NULL)) { /* Election already ended - either by time or majority. * Ignore. */ return 0; } if (newer_term(tk, sender, leader, msg)) { clear_election(tk); } site_voted_for(tk, sender, leader); /* §5.2 */ new_leader = majority_votes(tk); if (new_leader) { tk->leader = new_leader; tk->term_expires = time(NULL) + tk->term_duration; tk->election_end = 0; tk->voted_for = NULL; if ( new_leader == local) { tk->commit_index++; // ?? tk->state = ST_LEADER; send_heartbeat(tk); } else become_follower(tk, NULL); } set_ticket_wakeup(tk); return 0; } static int process_REJECTED( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t rv; rv = ntohl(msg->header.result); if (tk->state == ST_CANDIDATE && rv == RLT_TERM_OUTDATED) { log_info("Am out of date, become follower."); tk->leader = leader; become_follower(tk, msg); return 0; } if (tk->state == ST_CANDIDATE && rv == RLT_TERM_STILL_VALID) { log_error("There's a leader that I don't see: \"%s\"", site_string(leader)); tk->leader = leader; become_follower(tk, msg); return 0; } log_error("unhandled reject: in state %s, got %s.", state_to_string(tk->state), state_to_string(rv)); tk->leader = leader; become_follower(tk, msg); return 0; } /* §5.2 */ static int answer_REQ_VOTE( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; int valid; struct boothc_ticket_msg omsg; if (leader == no_leader) return 0; if (term_too_low(tk, sender, leader, msg)) return 0; if (newer_term(tk, sender, leader, msg)) goto vote_for_sender; term = ntohl(msg->ticket.term); /* Important: Ignore duplicated packets! */ - valid = term_valid_for(tk); + valid = term_time_left(tk); if (valid && term == tk->current_term && sender == tk->leader) { log_debug("Duplicate OP_VOTE_FOR ignored."); return 0; } if (valid) { log_debug("no election allowed, term valid for %d??", valid); return send_reject(sender, tk, RLT_TERM_STILL_VALID); } /* §5.2, §5.4 */ if (!tk->voted_for) { vote_for_sender: tk->voted_for = sender; site_voted_for(tk, sender, leader); goto yes_you_can; } yes_you_can: init_ticket_msg(&omsg, OP_VOTE_FOR, RLT_SUCCESS, tk); omsg.ticket.leader = htonl(get_node_id(tk->voted_for)); return transport()->broadcast(&omsg, sizeof(omsg)); } int new_election(struct ticket_config *tk, struct booth_site *preference) { struct booth_site *new_leader; time_t now; time(&now); log_debug("start new election?, now=%" PRIi64 ", end %" PRIi64, (int64_t)now, (int64_t)(tk->election_end)); if (now <= tk->election_end) return 0; /* §5.2 */ /* If there was _no_ answer, don't keep incrementing the term number * indefinitely. If there was no peer, there'll probably be no one * listening now either. However, we don't know if we were * invoked due to a timeout. It's up to the caller to * increment the term! tk->current_term++; */ tk->term_expires = 0; tk->election_end = now + tk->term_duration; log_debug("start new election! term=%d, until %" PRIi64, tk->current_term, (int64_t)tk->election_end); clear_election(tk); if(preference) new_leader = preference; else new_leader = (local->type == SITE) ? local : NULL; site_voted_for(tk, local, new_leader); tk->voted_for = new_leader; tk->state = ST_CANDIDATE; ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS); ticket_activate_timeout(tk); return 0; } int raft_answer( struct ticket_config *tk, struct booth_site *from, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int cmd; int rv; rv = 0; cmd = ntohl(msg->header.cmd); R(tk); log_debug("got message %s from \"%s\"", state_to_string(cmd), from->addr_string); switch (cmd) { case OP_REQ_VOTE: rv = answer_REQ_VOTE(tk, from, leader, msg); break; case OP_VOTE_FOR: rv = process_VOTE_FOR(tk, from, leader, msg); break; case OP_HEARTBEAT: if (tk->leader == local && tk->state == ST_LEADER) rv = process_HEARTBEAT(tk, from, leader, msg); else if (tk->leader != local && tk->state == ST_FOLLOWER) rv = answer_HEARTBEAT(tk, from, leader, msg); else assert("invalid combination - leader, follower"); break; case OP_REJECTED: rv = process_REJECTED(tk, from, leader, msg); break; default: log_error("unprocessed message, cmd %x", cmd); rv = -EINVAL; } R(tk); return rv; }