Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/src/booth.h b/src/booth.h
index edec609..5b0cc8c 100644
--- a/src/booth.h
+++ b/src/booth.h
@@ -1,258 +1,258 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _BOOTH_H
#define _BOOTH_H
#include <stdint.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#define BOOTH_RUN_DIR "/var/run/booth/"
#define BOOTH_LOG_DIR "/var/log"
#define BOOTH_LOGFILE_NAME "booth.log"
#define BOOTH_DEFAULT_CONF_DIR "/etc/booth/"
#define BOOTH_DEFAULT_CONF_NAME "booth"
#define BOOTH_DEFAULT_CONF_EXT ".conf"
#define BOOTH_DEFAULT_CONF \
BOOTH_DEFAULT_CONF_DIR BOOTH_DEFAULT_CONF_NAME BOOTH_DEFAULT_CONF_EXT
#define DAEMON_NAME "boothd"
#define BOOTH_PATH_LEN 127
#define BOOTH_DEFAULT_PORT 9929
/* TODO: remove */
#define BOOTH_PROTO_FAMILY AF_INET
#define BOOTHC_MAGIC 0x5F1BA08C
#define BOOTHC_VERSION 0x00010002
/** Timeout value for poll().
* Determines frequency of periodic jobs, eg. when send-retries are done.
* See process_tickets(). */
#define POLL_TIMEOUT 1000
/** @{ */
/** The on-network data structures and constants. */
#define BOOTH_NAME_LEN 64
#define CHAR2CONST(a,b,c,d) ((a << 24) | (b << 16) | (c << 8) | d)
/* Says that the ticket shouldn't be active anywhere.
* NONE wouldn't be specific enough. */
#define NO_ONE ((uint32_t)-1)
/* Says that another one should recover. */
#define TICKET_LOST CHAR2CONST('L', 'O', 'S', 'T')
typedef unsigned char boothc_site [BOOTH_NAME_LEN];
typedef unsigned char boothc_ticket[BOOTH_NAME_LEN];
struct boothc_header {
/** Authentication data; not used now. */
uint32_t iv;
uint32_t auth1;
uint32_t auth2;
/** BOOTHC_MAGIC */
uint32_t magic;
/** BOOTHC_VERSION */
uint32_t version;
/** Packet source; site_id. See add_site(). */
uint32_t from;
/** Length including header */
uint32_t length;
/** The command respectively protocol state. See cmd_request_t. */
uint32_t cmd;
/** Result of operation. 0 == OK */
uint32_t result;
char data[0];
} __attribute__((packed));
struct ticket_msg {
/** Ticket name. */
boothc_ticket id;
/** Current leader. May be NO_ONE. See add_site().
* For a OP_REQ_VOTE this is */
uint32_t leader;
/** Current term. */
uint32_t term;
uint32_t term_valid_for;
/* Perhaps we need to send a status along, too - like
* starting, running, stopping, error, ...? */
uint32_t leader_commit; // TODO: NEEDED?
} __attribute__((packed));
struct boothc_ticket_msg {
struct boothc_header header;
struct ticket_msg ticket;
} __attribute__((packed));
typedef enum {
/* 0x43 = "C"ommands */
CMD_LIST = CHAR2CONST('C', 'L', 's', 't'),
CMD_GRANT = CHAR2CONST('C', 'G', 'n', 't'),
CMD_REVOKE = CHAR2CONST('C', 'R', 'v', 'k'),
/* Replies */
CMR_GENERAL = CHAR2CONST('G', 'n', 'l', 'R'), // Increase distance to CMR_GRANT
CMR_LIST = CHAR2CONST('R', 'L', 's', 't'),
CMR_GRANT = CHAR2CONST('R', 'G', 'n', 't'),
CMR_REVOKE = CHAR2CONST('R', 'R', 'v', 'k'),
/* get status from another server */
OP_STATUS = CHAR2CONST('S', 't', 'a', 't'),
- OP_MY_INDEX = CHAR2CONST('M', 'I', 'd', 'x'), /* Answer to status */
+ OP_MY_INDEX = CHAR2CONST('M', 'I', 'd', 'x'), /* reply to status */
/* Raft */
- OP_REQ_VOTE = CHAR2CONST('R', 'V', 'o', 't'),
- OP_VOTE_FOR = CHAR2CONST('V', 't', 'F', 'r'),
+ OP_REQ_VOTE = CHAR2CONST('R', 'V', 'o', 't'), /* start election */
+ OP_VOTE_FOR = CHAR2CONST('V', 't', 'F', 'r'), /* reply to REQ_VOTE */
OP_HEARTBEAT= CHAR2CONST('H', 'r', 't', 'B'), /* Heartbeat */
OP_UPDATE = CHAR2CONST('U', 'p', 'd', 'E'), /* Update ticket */
OP_REVOKE = CHAR2CONST('R', 'e', 'v', 'k'), /* Revoke ticket */
OP_REJECTED = CHAR2CONST('R', 'J', 'C', '!'),
} cmd_request_t;
/* TODO: make readable constants */
typedef enum {
/* for compatibility with other functions */
RLT_SUCCESS = 0,
RLT_ASYNC = CHAR2CONST('A', 's', 'y', 'n'),
RLT_SYNC_SUCC = CHAR2CONST('S', 'c', 'c', 's'),
RLT_SYNC_FAIL = CHAR2CONST('F', 'a', 'i', 'l'),
RLT_INVALID_ARG = CHAR2CONST('I', 'A', 'r', 'g'),
RLT_EXT_FAILED = CHAR2CONST('X', 'P', 'r', 'g'),
RLT_OVERGRANT = CHAR2CONST('O', 'v', 'e', 'r'),
RLT_PROBABLY_SUCCESS = CHAR2CONST('S', 'u', 'c', '?'),
RLT_BUSY = CHAR2CONST('B', 'u', 's', 'y'),
RLT_TERM_OUTDATED = CHAR2CONST('T', 'O', 'd', 't'),
RLT_TERM_STILL_VALID = CHAR2CONST('T', 'V', 'l', 'd'),
RLT_REDIRECT = CHAR2CONST('R', 'e', 'd', 'r'),
} cmd_result_t;
/** @} */
/** @{ */
struct booth_site {
/** Calculated ID. See add_site(). */
int site_id;
int type;
int local;
/** Roles, like ACCEPTOR, PROPOSER, or LEARNER. Not really used ATM. */
int role;
char addr_string[BOOTH_NAME_LEN];
int tcp_fd;
int udp_fd;
/* 0-based, used for indexing into per-ticket weights */
int index;
uint64_t bitmask;
unsigned short family;
union {
struct sockaddr_in sa4;
struct sockaddr_in6 sa6;
};
int saddrlen;
int addrlen;
} __attribute__((packed));
extern struct booth_site *local;
extern struct booth_site * no_leader;
/** @} */
struct booth_transport;
struct client {
int fd;
const struct booth_transport *transport;
void (*workfn)(int);
void (*deadfn)(int);
};
extern struct client *clients;
extern struct pollfd *pollfds;
int client_add(int fd, const struct booth_transport *tpt,
void (*workfn)(int ci), void (*deadfn)(int ci));
int do_read(int fd, void *buf, size_t count);
int do_write(int fd, void *buf, size_t count);
void process_connection(int ci);
void safe_copy(char *dest, char *value, size_t buflen, const char *description);
struct command_line {
int type; /* ACT_ */
int op; /* OP_ */
char configfile[BOOTH_PATH_LEN];
char lockfile[BOOTH_PATH_LEN];
char site[BOOTH_NAME_LEN];
struct boothc_ticket_msg msg;
};
extern struct command_line cl;
/* http://gcc.gnu.org/onlinedocs/gcc/Typeof.html */
#define min(a__,b__) \
({ typeof (a__) _a = (a__); \
typeof (b__) _b = (b__); \
_a < _b ? _a : _b; })
#define max(a__,b__) \
({ typeof (a__) _a = (a__); \
typeof (b__) _b = (b__); \
_a > _b ? _a : _b; })
#endif /* _BOOTH_H */
diff --git a/src/config.h b/src/config.h
index 9530006..5fb23a7 100644
--- a/src/config.h
+++ b/src/config.h
@@ -1,191 +1,192 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _CONFIG_H
#define _CONFIG_H
#include <stdint.h>
#include "booth.h"
#include "raft.h"
#include "transport.h"
/** @{ */
/** Definitions for in-RAM data. */
#define MAX_NODES 16
#define TICKET_ALLOC 16
struct ticket_config {
/** \name Configuration items.
* @{ */
/** Name of ticket. */
boothc_ticket name;
/** How many seconds a term lasts (if not refreshed). */
int term_duration;
/** Network related timeouts. */
int timeout;
/** Retries before giving up. */
int retries;
/** If >0, time to wait for a site to get fenced.
* The ticket may be acquired after that timespan by
* another site. */
int acquire_after; /* TODO: needed? */
/* Program to ask whether it makes sense to
* acquire the ticket */
char *ext_verifier;
/** Node weights. */
int weight[MAX_NODES];
/** @} */
/** \name Runtime values.
* @{ */
/** Current state. */
server_state_e state;
/** When something has to be done */
struct timeval next_cron;
/** Current leader. This is effectively the log[] in Raft. */
struct booth_site *leader;
/** Is the ticket granted? */
int is_granted;
/** Timestamp of leadership expiration */
time_t term_expires;
/** End of election period */
time_t election_end;
struct booth_site *voted_for;
/** Who the various sites vote for.
* NO_OWNER = no vote yet. */
struct booth_site *votes_for[MAX_NODES];
/* bitmap */
uint64_t votes_received;
/** Last voting round that was seen. */
uint32_t current_term;
/** Do ticket updates whenever we get enough heartbeats.
* But do that only once.
* This is reset to 0 whenever we broadcast heartbeat and set
* to 1 once enough acks are received.
*/
uint32_t majority_acks_received;
/** @} */
/** */
uint32_t commit_index;
/** */
uint32_t last_applied;
uint32_t next_index[MAX_NODES];
uint32_t match_index[MAX_NODES];
+ uint32_t acks_expected;
uint64_t acks_received;
- time_t hb_sent_at;
+ time_t req_sent_at;
/** \name Needed while proposals are being done.
* @{ */
/** Whom to vote for the next time.
* Needed to push a ticket to someone else. */
#if 0
/** Bitmap of sites that acknowledge that state. */
uint64_t proposal_acknowledges;
/** When an incompletely acknowledged proposal gets done.
* If all peers agree, that happens sooner.
* See switch_state_to(). */
struct timeval proposal_switch;
/** Timestamp of proposal expiration. */
time_t proposal_expires;
#endif
/** Number of send retries left.
* Used on the new owner.
* Starts at 0, counts up. */
int retry_number;
/** @} */
};
struct booth_config {
char name[BOOTH_NAME_LEN];
transport_layer_t proto;
uint16_t port;
/** Stores the OR of the individual host bitmasks. */
uint64_t site_bits;
char site_user[BOOTH_NAME_LEN];
char site_group[BOOTH_NAME_LEN];
char arb_user[BOOTH_NAME_LEN];
char arb_group[BOOTH_NAME_LEN];
uid_t uid;
gid_t gid;
int site_count;
struct booth_site site[MAX_NODES];
int ticket_count;
int ticket_allocated;
struct ticket_config *ticket;
};
extern struct booth_config *booth_conf;
int read_config(const char *path);
int check_config(int type);
int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type);
int find_site_by_id(uint32_t site_id, struct booth_site **node);
const char *type_to_string(int type);
#include <stdio.h>
#define R(tk_) printf("## %12s:%3d state %s, %d:%d, " \
"leader %s, exp %s", __FILE__, __LINE__, \
state_to_string(tk_->state), tk_->current_term, \
tk_->commit_index, site_string(tk_->leader), ctime(&tk_->term_expires))
#endif /* _CONFIG_H */
diff --git a/src/inline-fn.h b/src/inline-fn.h
index aa6578b..f969fd8 100644
--- a/src/inline-fn.h
+++ b/src/inline-fn.h
@@ -1,314 +1,320 @@
/*
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _INLINE_FN_H
#define _INLINE_FN_H
#include <time.h>
#include <sys/time.h>
#include <assert.h>
#include <string.h>
#include "config.h"
#include "ticket.h"
#include "transport.h"
inline static uint32_t get_local_id(void)
{
return local ? local->site_id : -1;
}
inline static uint32_t get_node_id(struct booth_site *node)
{
return node ? node->site_id : NO_ONE;
}
inline static int term_time_left(const struct ticket_config *tk)
{
int left;
left = tk->term_expires - time(NULL);
return (left < 0) ? 0 : left;
}
/** Returns number of seconds left, if any. */
inline static int leader_and_valid(const struct ticket_config *tk)
{
if (tk->leader != local)
return 0;
return term_time_left(tk);
}
/** Is this some leader? */
inline static int is_owned(const struct ticket_config *tk)
{
return (tk->leader && tk->leader != no_leader);
}
static inline void init_header_bare(struct boothc_header *h) {
h->magic = htonl(BOOTHC_MAGIC);
h->version = htonl(BOOTHC_VERSION);
h->from = htonl(local->site_id);
h->iv = htonl(0);
h->auth1 = htonl(0);
h->auth2 = htonl(0);
}
static inline void init_header(struct boothc_header *h, int cmd,
int result, int data_len)
{
init_header_bare(h);
h->length = htonl(data_len);
h->cmd = htonl(cmd);
h->result = htonl(result);
}
static inline void init_ticket_site_header(struct boothc_ticket_msg *msg, int cmd)
{
init_header(&msg->header, cmd, 0, sizeof(*msg));
}
static inline void init_ticket_msg(struct boothc_ticket_msg *msg,
int cmd, int rv,
struct ticket_config *tk)
{
assert(sizeof(msg->ticket.id) == sizeof(tk->name));
init_header(&msg->header, cmd, rv, sizeof(*msg));
if (!tk) {
memset(&msg->ticket, 0, sizeof(msg->ticket));
} else {
memcpy(msg->ticket.id, tk->name, sizeof(msg->ticket.id));
msg->ticket.leader = htonl(get_node_id(
(tk->leader && tk->leader != no_leader) ? tk->leader : tk->voted_for));
msg->ticket.term = htonl(tk->current_term);
msg->ticket.term_valid_for = htonl(term_time_left(tk));
msg->ticket.leader_commit = htonl(tk->commit_index);
}
}
static inline struct booth_transport const *transport(void)
{
return booth_transport + booth_conf->proto;
}
static inline const char *site_string(struct booth_site *site)
{
return site ? site->addr_string : "NONE";
}
static inline const char *ticket_leader_string(struct ticket_config *tk)
{
return site_string(tk->leader);
}
static inline void disown_ticket(struct ticket_config *tk)
{
tk->leader = no_leader;
tk->is_granted = 0;
time(&tk->term_expires);
}
static inline int disown_if_expired(struct ticket_config *tk)
{
if (time(NULL) >= tk->term_expires ||
!tk->leader) {
disown_ticket(tk);
return 1;
}
return 0;
}
/* We allow half of the uint32_t to be used;
* half of that below, half of that above the current known "good" value.
* 0 UINT32_MAX
* |--------------------------+----------------+------------|
* | | |
* |--------+-------| allowed range
* |
* current commit index
*
* So, on overflow it looks like that:
* UINT32_MAX 0
* |--------------------------+-----------||---+------------|
* | | |
* |--------+-------| allowed range
* |
* current commit index
*
* This should be possible by using the same datatype and relying
* on the under/overflow semantics.
*
*
* Having 30 bits available, and assuming an expire time of
* one minute and a (high) commit index step of 64 == 2^6 (because
* of weights), we get 2^24 minutes of range - which is ~750
* years. "Should be enough for everybody."
*/
static inline int index_is_higher_than(uint32_t c_high, uint32_t c_low)
{
uint32_t diff;
if (c_high == c_low)
return 0;
diff = c_high - c_low;
if (diff < UINT32_MAX/4)
return 1;
diff = c_low - c_high;
if (diff < UINT32_MAX/4)
return 0;
assert(!"commit index out of range - invalid");
}
static inline uint32_t index_max2(uint32_t a, uint32_t b)
{
return index_is_higher_than(a, b) ? a : b;
}
static inline uint32_t index_max3(uint32_t a, uint32_t b, uint32_t c)
{
return index_max2( index_max2(a, b), c);
}
static inline double timeval_to_float(struct timeval tv)
{
return tv.tv_sec + tv.tv_usec*(double)1.0e-6;
}
static inline int timeval_msec(struct timeval tv)
{
int m;
m = tv.tv_usec / 1000;
if (m >= 1000)
m = 999;
return m;
}
static inline int timeval_compare(struct timeval tv1, struct timeval tv2)
{
if (tv1.tv_sec < tv2.tv_sec)
return -1;
if (tv1.tv_sec > tv2.tv_sec)
return +1;
if (tv1.tv_usec < tv2.tv_usec)
return -1;
if (tv1.tv_usec > tv2.tv_usec)
return +1;
return 0;
}
static inline int timeval_in_past(struct timeval which)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return timeval_compare(tv, which) > 0;
}
static inline time_t next_vote_starts_at(struct ticket_config *tk)
{
time_t half_exp, retries_needed;
/* If not owner, don't renew. */
if (tk->leader != local)
return 0;
/* Try to renew at half of expiry time. */
half_exp = tk->term_expires - tk->term_duration/2;
/* Also start renewal if we couldn't get
* a few message retransmission in the alloted
* expiry time. */
retries_needed = tk->term_expires - tk->timeout * tk->retries/2;
/* Return earlier timestamp. */
return half_exp < retries_needed
? half_exp
: retries_needed;
}
static inline int should_start_renewal(struct ticket_config *tk)
{
time_t now, when;
when = next_vote_starts_at(tk);
if (!when)
return 0;
time(&now);
return when <= now;
}
-
-static inline int send_heartbeat(struct ticket_config *tk)
+static inline void expect_replies(struct ticket_config *tk,
+ int reply_type)
{
+ tk->acks_expected = reply_type;
tk->acks_received = local->bitmask;
- tk->hb_sent_at = time(NULL);
+ tk->req_sent_at = time(NULL);
tk->majority_acks_received = 0;
+}
+
+static inline int send_heartbeat(struct ticket_config *tk)
+{
+ expect_replies(tk, OP_HEARTBEAT);
return ticket_broadcast(tk, OP_HEARTBEAT, RLT_SUCCESS);
}
static inline struct booth_site *my_vote(struct ticket_config *tk)
{
return tk->votes_for[ local->index ];
}
static inline int count_bits(uint64_t val) {
return __builtin_popcount(val);
}
static inline int majority_of_bits(struct ticket_config *tk, uint64_t val)
{
/* Use ">" to get majority decision, even for an even number
* of participants. */
return count_bits(val) * 2 >
booth_conf->site_count;
}
#endif
diff --git a/src/raft.c b/src/raft.c
index d3265b5..dcfe2e7 100644
--- a/src/raft.c
+++ b/src/raft.c
@@ -1,791 +1,770 @@
/*
* Copyright (C) 2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <inttypes.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include "booth.h"
#include "transport.h"
#include "inline-fn.h"
#include "config.h"
#include "raft.h"
#include "ticket.h"
#include "log.h"
inline static void clear_election(struct ticket_config *tk)
{
int i;
struct booth_site *site;
log_debug("clear election");
tk->votes_received = 0;
foreach_node(i, site)
tk->votes_for[site->index] = NULL;
}
inline static void record_vote(struct ticket_config *tk,
struct booth_site *who,
struct booth_site *vote)
{
log_debug("site %s votes for %s",
site_string(who),
site_string(vote));
if (!tk->votes_for[who->index]) {
tk->votes_for[who->index] = vote;
tk->votes_received |= who->bitmask;
} else {
if (tk->votes_for[who->index] != vote)
log_warn("voted previously (but in same term!) for %s...",
site_string(tk->votes_for[who->index]));
}
}
static int cmp_msg_ticket(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
if (tk->current_term != ntohl(msg->ticket.term)) {
return tk->current_term - ntohl(msg->ticket.term);
}
return tk->commit_index - ntohl(msg->ticket.leader_commit);
}
static void update_term_from_msg(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
uint32_t i;
i = ntohl(msg->ticket.term);
/* if we failed to start the election, then accept the term
* from the leader
* */
if (tk->state == ST_CANDIDATE) {
tk->current_term = i;
} else {
tk->current_term = max(i, tk->current_term);
}
/* § 5.3 */
i = ntohl(msg->ticket.leader_commit);
tk->commit_index = max(i, tk->commit_index);
}
static void update_ticket_from_msg(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
int duration;
duration = tk->term_duration;
if (msg)
duration = min(duration, ntohl(msg->ticket.term_valid_for));
tk->term_expires = time(NULL) + duration;
if (msg) {
update_term_from_msg(tk, msg);
}
}
static void become_follower(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
update_ticket_from_msg(tk, msg);
tk->state = ST_FOLLOWER;
}
struct booth_site *majority_votes(struct ticket_config *tk)
{
int i, n;
struct booth_site *v;
int count[MAX_NODES] = { 0, };
for(i=0; i<booth_conf->site_count; i++) {
v = tk->votes_for[i];
if (!v)
continue;
n = v->index;
count[n]++;
log_debug("Majority: %d %s wants %d %s => %d",
i, site_string(&booth_conf->site[i]),
n, site_string(v),
count[n]);
if (count[n]*2 <= booth_conf->site_count)
continue;
log_debug("Majority reached: %d of %d for %s",
count[n], booth_conf->site_count,
site_string(v));
return v;
}
return NULL;
}
-static int all_voted(struct ticket_config *tk)
-{
- int i, cnt = 0;
-
- for(i=0; i<booth_conf->site_count; i++) {
- if (tk->votes_for[i]) {
- cnt++;
- }
- }
-
- return (cnt == booth_conf->site_count);
-}
-
-
static int newer_term(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg,
int in_election)
{
uint32_t term;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (term > tk->current_term) {
tk->state = ST_FOLLOWER;
if (!in_election) {
tk->leader = leader;
log_debug("from %s: higher term %d vs. %d, following %s",
site_string(sender),
term, tk->current_term,
ticket_leader_string(tk));
} else {
tk->leader = no_leader;
log_debug("from %s: higher term %d vs. %d (election)",
site_string(sender),
term, tk->current_term);
}
tk->term_expires = time(NULL) + tk->term_duration;
tk->current_term = term;
return 1;
}
return 0;
}
static int term_too_low(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg)
{
uint32_t term;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (term < tk->current_term) {
log_info("sending reject to %s, its term too low "
"(%d vs. %d)", site_string(leader),
term, tk->current_term
);
send_reject(sender, tk, RLT_TERM_OUTDATED);
return 1;
}
return 0;
}
/* For follower. */
static int answer_HEARTBEAT (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
struct boothc_ticket_msg omsg;
term = ntohl(msg->ticket.term);
log_debug("leader: %s, have %s; term %d vs %d",
site_string(leader), ticket_leader_string(tk),
term, tk->current_term);
/* if we're candidate, it may be that we got a heartbeat from
* a legitimate leader, so don't ignore a lower term
*/
if (tk->state != ST_CANDIDATE && term < tk->current_term) {
log_info("ignoring lower term %d vs. %d, from %s",
term, tk->current_term,
ticket_leader_string(tk));
return 0;
}
/* Needed? */
newer_term(tk, sender, leader, msg, 0);
become_follower(tk, msg);
/* Racy??? */
assert(sender == leader || !leader);
tk->leader = leader;
/* Ack the heartbeat (we comply). */
init_ticket_msg(&omsg, OP_HEARTBEAT, RLT_SUCCESS, tk);
return booth_udp_send(sender, &omsg, sizeof(omsg));
}
static int process_UPDATE (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
term = ntohl(msg->ticket.term);
log_debug("leader: %s, have %s; term %d vs %d",
site_string(leader), ticket_leader_string(tk),
term, tk->current_term);
/* No reject. (?) */
if (term < tk->current_term) {
log_info("ignoring lower term %d vs. %d, from %s",
term, tk->current_term,
ticket_leader_string(tk));
return 0;
}
update_ticket_from_msg(tk, msg);
ticket_write(tk);
/* run ticket_cron if the ticket expires */
set_ticket_wakeup(tk);
return 0;
}
static int process_REVOKE (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
if (tk->leader != sender) {
log_error("from %s: non-leader wants to revoke ticket %s (ignoring)",
site_string(sender), tk->name);
return 1;
} else if (tk->state != ST_FOLLOWER) {
log_error("from %s: unexpected ticket %s revoke in state %s (ignoring)",
site_string(sender),
state_to_string(tk->state),
tk->name);
return 1;
} else {
log_info("from %s: leader revokes ticket %s",
site_string(sender), tk->name);
reset_ticket(tk);
ticket_write(tk);
}
return 0;
}
/* update the ticket on the leader, write it to the CIB, and
send out the update message to others with the new expiry
time
*/
static int leader_update_ticket(struct ticket_config *tk)
{
struct boothc_ticket_msg msg;
tk->term_expires = time(NULL) + tk->term_duration;
tk->retry_number = 0;
ticket_write(tk);
set_ticket_wakeup(tk);
init_ticket_msg(&msg, OP_UPDATE, RLT_SUCCESS, tk);
return transport()->broadcast(&msg, sizeof(msg));
}
/* For leader. */
static int process_HEARTBEAT(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
term = ntohl(msg->ticket.term);
if (newer_term(tk, sender, leader, msg, 0)) {
/* unexpected higher term */
log_warn("got higher term from %s (%d vs. %d)",
site_string(sender),
term, tk->current_term);
return 0;
}
/* Don't send a reject. */
if (term < tk->current_term) {
/* Doesn't know what he's talking about - perhaps
* doesn't receive our packets? */
log_warn("from %s: unexpected "
"term %d instead of %d (ignoring)",
site_string(sender),
term, tk->current_term);
return 0;
}
if (term == tk->current_term &&
leader == tk->leader) {
- /* got an ack! */
- tk->acks_received |= sender->bitmask;
-
- log_debug("got heartbeat ACK from %s, %d/%d agree.",
- site_string(sender),
- count_bits(tk->acks_received),
- booth_conf->site_count);
-
if (majority_of_bits(tk, tk->acks_received)) {
/* OK, at least half of the nodes are reachable;
* Update the ticket and send update messages out
*/
if( !tk->majority_acks_received ) {
/* Write the ticket to the CIB and set the next
* wakeup time (but do that only once) */
tk->majority_acks_received = 1;
return leader_update_ticket(tk);
}
}
}
return 0;
}
void leader_elected(
struct ticket_config *tk,
struct booth_site *new_leader
)
{
if (new_leader) {
tk->leader = new_leader;
tk->term_expires = time(NULL) + tk->term_duration;
tk->election_end = 0;
tk->voted_for = NULL;
if (new_leader == local) {
tk->commit_index++;
tk->state = ST_LEADER;
send_heartbeat(tk);
ticket_activate_timeout(tk);
} else {
become_follower(tk, NULL);
set_ticket_wakeup(tk);
}
}
}
static int process_VOTE_FOR(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
if (term_too_low(tk, sender, leader, msg))
return 0;
if (newer_term(tk, sender, leader, msg, 0)) {
clear_election(tk);
}
/* leader wants to step down? */
if (leader == no_leader && sender == tk->leader &&
(tk->state == ST_FOLLOWER || tk->state == ST_CANDIDATE)) {
log_info("ticket %s owner %s wants to step down",
tk->name, site_string(tk->leader));
return new_round(tk);
}
record_vote(tk, sender, leader);
if (tk->state != ST_CANDIDATE) {
/* lost candidate status, somebody rejected our proposal */
return 0;
}
/* only if all voted can we take the ticket now, otherwise
* wait for timeout in ticket_cron */
- if (all_voted(tk)) {
+ if (!tk->acks_expected) {
/* §5.2 */
leader_elected(tk, majority_votes(tk));
}
return 0;
}
static int process_REJECTED(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t rv;
rv = ntohl(msg->header.result);
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_OUTDATED) {
log_warn("from %s: ticket %s outdated (term %d), following %s",
site_string(sender),
tk->name, ntohl(msg->ticket.term),
site_string(leader)
);
tk->leader = leader;
become_follower(tk, msg);
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_STILL_VALID) {
log_warn("from %s: there's a leader I didn't see: %s, following",
site_string(sender),
site_string(leader));
tk->leader = leader;
become_follower(tk, msg);
return 0;
}
log_warn("from %s: in state %s, got %s (unexpected reject)",
site_string(sender),
state_to_string(tk->state),
state_to_string(rv));
tk->leader = leader;
become_follower(tk, msg);
return 0;
}
static int send_ticket (
int cmd,
struct ticket_config *tk,
struct booth_site *to_site
)
{
struct boothc_ticket_msg omsg;
init_ticket_msg(&omsg, cmd, RLT_SUCCESS, tk);
return booth_udp_send(to_site, &omsg, sizeof(omsg));
}
/* §5.2 */
static int answer_REQ_VOTE(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
int valid;
struct boothc_ticket_msg omsg;
term = ntohl(msg->ticket.term);
/* Important: Ignore duplicated packets! */
valid = term_time_left(tk);
if (valid &&
term == tk->current_term &&
sender == tk->leader) {
log_debug("Duplicate OP_VOTE_FOR ignored.");
return 0;
}
if (valid) {
log_warn("no election allowed for %s, term still valid for %d",
tk->name, valid);
return send_reject(sender, tk, RLT_TERM_STILL_VALID);
}
if (term_too_low(tk, sender, leader, msg))
return 0;
/* if it's a newer term or ... */
if (newer_term(tk, sender, leader, msg, 1)) {
clear_election(tk);
goto vote_for_sender;
}
/* ... we didn't vote yet, then vote for the sender */
/* §5.2, §5.4 */
if (!tk->voted_for) {
vote_for_sender:
tk->voted_for = sender;
record_vote(tk, sender, leader);
}
init_ticket_msg(&omsg, OP_VOTE_FOR, RLT_SUCCESS, tk);
omsg.ticket.leader = htonl(get_node_id(tk->voted_for));
return booth_udp_send(sender, &omsg, sizeof(omsg));
}
int new_election(struct ticket_config *tk,
struct booth_site *preference, int update_term)
{
struct booth_site *new_leader;
time_t now;
time(&now);
log_debug("start new election?, now=%" PRIi64 ", end %" PRIi64,
(int64_t)now, (int64_t)(tk->election_end));
if (now <= tk->election_end)
return 0;
/* §5.2 */
/* If there was _no_ answer, don't keep incrementing the term number
* indefinitely. If there was no peer, there'll probably be no one
* listening now either. However, we don't know if we were
* invoked due to a timeout (caller does).
*/
if (update_term)
tk->current_term++;
tk->term_expires = 0;
tk->election_end = now + tk->term_duration;
log_info("starting new election, term=%d, until %" PRIi64,
tk->current_term, (int64_t)tk->election_end);
clear_election(tk);
if(preference)
new_leader = preference;
else
new_leader = (local->type == SITE) ? local : NULL;
record_vote(tk, local, new_leader);
tk->voted_for = new_leader;
tk->state = ST_CANDIDATE;
+ expect_replies(tk, OP_VOTE_FOR);
ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS);
ticket_activate_timeout(tk);
return 0;
}
/* we were a leader and somebody says that they have a more up
* to date ticket
* there was probably connectivity loss
* tricky
*/
static int leader_handle_newer_ticket(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
if (leader == no_leader || !leader || leader == local) {
/* at least nobody else owns the ticket */
/* it is not kosher to update from their copy, but since
* they don't own the ticket, nothing bad can happen
*/
update_term_from_msg(tk, msg);
/* get the ticket again, if we can
*/
return acquire_ticket(tk);
}
/* eek, two leaders, split brain */
/* normally shouldn't happen; run election */
log_error("from %s: ticket %s at %s! (disowning ticket)",
site_string(sender),
tk->name, site_string(leader)
);
log_error("Two ticket owners! Possible bug. Please report at https://github.com/ClusterLabs/booth/issues/new.");
return new_round(tk);
}
/* reply to STATUS */
static int process_MY_INDEX (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int i;
int rv;
if (!msg->ticket.term_valid_for) {
/* ticket not valid */
return 0;
}
i = cmp_msg_ticket(tk, msg);
if (i > 0) {
/* let them know about our newer ticket */
send_ticket(OP_MY_INDEX, tk, sender);
if (tk->state == ST_LEADER)
return send_ticket(OP_UPDATE, tk, sender);
}
if (i == 0) {
return 0;
}
/* they have a newer ticket, trouble if we're already leader
* for it */
if (tk->state == ST_LEADER) {
log_warn("from %s: more uptodate ticket %s at %s",
site_string(sender),
tk->name,
site_string(leader)
);
return leader_handle_newer_ticket(tk, sender, leader, msg);
}
update_ticket_from_msg(tk, msg);
tk->leader = leader;
if (leader == local) {
rv = test_external_prog(tk, 1);
if (!rv) {
/* if we were the leader but we rebooted in the
* meantime; try to get the ticket again
*/
tk->state = ST_LEADER;
rv = send_heartbeat(tk);
ticket_activate_timeout(tk);
}
return rv;
} else {
tk->state = (!leader || leader == no_leader) ?
ST_INIT : ST_FOLLOWER;
}
return 0;
}
int raft_answer(
struct ticket_config *tk,
struct booth_site *from,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int cmd;
int rv;
rv = 0;
cmd = ntohl(msg->header.cmd);
R(tk);
log_debug("got message %s from %s",
state_to_string(cmd),
site_string(from));
switch (cmd) {
case OP_REQ_VOTE:
rv = answer_REQ_VOTE(tk, from, leader, msg);
break;
case OP_VOTE_FOR:
rv = process_VOTE_FOR(tk, from, leader, msg);
break;
case OP_HEARTBEAT:
if (tk->leader == local &&
tk->state == ST_LEADER)
rv = process_HEARTBEAT(tk, from, leader, msg);
else if (tk->leader != local &&
(tk->state == ST_FOLLOWER ||
tk->state == ST_CANDIDATE))
rv = answer_HEARTBEAT(tk, from, leader, msg);
else {
log_warn("unexpected message %s, from %s",
state_to_string(cmd),
site_string(from));
rv = -EINVAL;
}
break;
case OP_UPDATE:
if (tk->leader != local && tk->state == ST_FOLLOWER) {
rv = process_UPDATE(tk, from, leader, msg);
} else {
log_warn("unexpected message %s, from %s",
state_to_string(cmd),
site_string(from));
rv = -EINVAL;
}
break;
case OP_REJECTED:
rv = process_REJECTED(tk, from, leader, msg);
break;
case OP_REVOKE:
rv = process_REVOKE(tk, from, leader, msg);
break;
case OP_MY_INDEX:
rv = process_MY_INDEX(tk, from, leader, msg);
break;
case OP_STATUS:
rv = send_ticket(OP_MY_INDEX, tk, from);
break;
default:
log_error("unknown message %s, from %s",
state_to_string(cmd), site_string(from));
rv = -EINVAL;
}
R(tk);
return rv;
}
diff --git a/src/ticket.c b/src/ticket.c
index 03cc3c1..f11ea38 100644
--- a/src/ticket.c
+++ b/src/ticket.c
@@ -1,664 +1,697 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <inttypes.h>
#include <stdio.h>
#include <assert.h>
#include <time.h>
#include "ticket.h"
#include "config.h"
#include "pacemaker.h"
#include "inline-fn.h"
#include "log.h"
#include "booth.h"
#include "raft.h"
#include "handler.h"
#define TK_LINE 256
/* Untrusted input, must fit (incl. \0) in a buffer of max chars. */
int check_max_len_valid(const char *s, int max)
{
int i;
for(i=0; i<max; i++)
if (s[i] == 0)
return 1;
return 0;
}
int find_ticket_by_name(const char *ticket, struct ticket_config **found)
{
int i;
if (found)
*found = NULL;
for (i = 0; i < booth_conf->ticket_count; i++) {
if (!strcmp(booth_conf->ticket[i].name, ticket)) {
if (found)
*found = booth_conf->ticket + i;
return 1;
}
}
return 0;
}
int check_ticket(char *ticket, struct ticket_config **found)
{
if (found)
*found = NULL;
if (!booth_conf)
return 0;
if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name)))
return 0;
return find_ticket_by_name(ticket, found);
}
int check_site(char *site, int *is_local)
{
struct booth_site *node;
if (!check_max_len_valid(site, sizeof(node->addr_string)))
return 0;
if (find_site_by_name(site, &node, 0)) {
*is_local = node->local;
return 1;
}
return 0;
}
#if 0
/** Find out what others think about this ticket.
*
* If we're a SITE, we can ask (and have to tell) Pacemaker.
* An ARBITRATOR can only ask others. */
static int ticket_send_catchup(struct ticket_config *tk)
{
int i, rv = 0;
struct booth_site *site;
struct boothc_ticket_msg msg;
foreach_node(i, site) {
if (!site->local) {
init_ticket_msg(&msg, CMD_CATCHUP, RLT_SUCCESS, tk);
log_debug("attempting catchup from %s", site->addr_string);
rv = booth_udp_send(site, &msg, sizeof(msg));
}
}
ticket_activate_timeout(tk);
return rv;
}
#endif
int ticket_write(struct ticket_config *tk)
{
if (local->type != SITE)
return -EINVAL;
disown_if_expired(tk);
if (tk->leader == local) {
pcmk_handler.grant_ticket(tk);
} else {
pcmk_handler.revoke_ticket(tk);
}
return 0;
}
/* Ask an external program whether getting the ticket
* makes sense.
* Eg. if the services have a failcount of INFINITY,
* we can't serve here anyway. */
int test_external_prog(struct ticket_config *tk,
int start_election)
{
int rv;
rv = run_handler(tk, tk->ext_verifier, 1);
if (rv) {
log_warn("we are not allowed to acquire ticket %s",
tk->name);
/* Give it to somebody else.
- * Just send a commit message, as the
- * others couldn't help anyway. */
+ * Just send a VOTE_FOR message, so the
+ * others can start elections. */
if (leader_and_valid(tk)) {
disown_ticket(tk);
if (start_election) {
ticket_broadcast(tk, OP_VOTE_FOR, RLT_SUCCESS);
}
}
}
return rv;
}
/* Try to acquire a ticket
* Could be manual grant or after ticket loss
*/
int acquire_ticket(struct ticket_config *tk)
{
if (test_external_prog(tk, 0))
return RLT_EXT_FAILED;
return new_election(tk, local, 1);
}
/** Try to get the ticket for the local site.
* */
int do_grant_ticket(struct ticket_config *tk)
{
int rv;
if (tk->leader == local)
return RLT_SUCCESS;
if (is_owned(tk))
return RLT_OVERGRANT;
rv = acquire_ticket(tk);
return rv;
}
/** Ticket revoke.
* Only to be started from the leader. */
int do_revoke_ticket(struct ticket_config *tk)
{
log_info("revoking ticket %s", tk->name);
reset_ticket(tk);
ticket_write(tk);
return ticket_broadcast(tk, OP_REVOKE, RLT_SUCCESS);
}
int list_ticket(char **pdata, unsigned int *len)
{
struct ticket_config *tk;
char timeout_str[64];
char *data, *cp;
int i, alloc;
*pdata = NULL;
*len = 0;
alloc = 256 +
booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128);
data = malloc(alloc);
if (!data)
return -ENOMEM;
cp = data;
foreach_ticket(i, tk) {
if (tk->term_expires != 0)
strftime(timeout_str, sizeof(timeout_str), "%F %T",
localtime(&tk->term_expires));
else
strcpy(timeout_str, "INF");
cp += sprintf(cp,
"ticket: %s, leader: %s, expires: %s, commit: %d\n",
tk->name,
ticket_leader_string(tk),
timeout_str,
tk->commit_index);
*len = cp - data;
assert(*len < alloc);
}
*pdata = data;
return 0;
}
void reset_ticket(struct ticket_config *tk)
{
disown_ticket(tk);
tk->current_term = 0;
tk->commit_index = 0;
tk->state = ST_INIT;
}
int setup_ticket(void)
{
struct ticket_config *tk;
int i;
foreach_ticket(i, tk) {
reset_ticket(tk);
if (local->type == SITE) {
pcmk_handler.load_ticket(tk);
if (time(NULL) >= tk->term_expires) {
reset_ticket(tk);
ticket_write(tk);
}
}
/* if the ticket is uptodate and belongs to us, try with
* the heartbeat
*/
if (tk->is_granted && tk->leader == local) {
if (!test_external_prog(tk, 1)) {
tk->state = ST_LEADER;
send_heartbeat(tk);
ticket_activate_timeout(tk);
}
} else {
/* otherwise, query status */
ticket_broadcast(tk, OP_STATUS, RLT_SUCCESS);
}
}
return 0;
}
int ticket_answer_list(int fd, struct boothc_ticket_msg *msg)
{
char *data;
int olen, rv;
struct boothc_header hdr;
rv = list_ticket(&data, &olen);
if (rv < 0)
return rv;
init_header(&hdr, CMR_LIST, RLT_SUCCESS, sizeof(hdr) + olen);
return send_header_plus(fd, &hdr, data, olen);
}
int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg)
{
int rv;
struct ticket_config *tk;
if (!check_ticket(msg->ticket.id, &tk)) {
log_warn("client asked to grant unknown ticket %s",
msg->ticket.id);
rv = RLT_INVALID_ARG;
goto reply;
}
if (is_owned(tk)) {
log_warn("client wants to grant an (already granted!) ticket %s",
msg->ticket.id);
rv = RLT_OVERGRANT;
goto reply;
}
rv = do_grant_ticket(tk);
reply:
init_header(&msg->header, CMR_GRANT, rv ?: RLT_ASYNC, sizeof(*msg));
return send_ticket_msg(fd, msg);
}
int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg)
{
int rv;
struct ticket_config *tk;
if (!check_ticket(msg->ticket.id, &tk)) {
log_warn("client wants to revoke an unknown ticket %s",
msg->ticket.id);
rv = RLT_INVALID_ARG;
goto reply;
}
if (!is_owned(tk)) {
log_info("client wants to revoke a free ticket %s",
msg->ticket.id);
/* Return a different result code? */
rv = RLT_SUCCESS;
goto reply;
}
if (tk->leader != local) {
log_info("we do not own the ticket %s, "
"redirect to leader %s",
msg->ticket.id, ticket_leader_string(tk));
rv = RLT_REDIRECT;
goto reply;
}
rv = do_revoke_ticket(tk);
if (rv == 0)
rv = RLT_ASYNC;
reply:
init_ticket_msg(msg, CMR_REVOKE, rv, tk);
return send_ticket_msg(fd, msg);
}
int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_result_t res)
{
struct boothc_ticket_msg msg;
init_ticket_msg(&msg, cmd, res, tk);
log_debug("broadcasting '%s' for ticket %s (term=%d, valid=%d)",
state_to_string(cmd), tk->name,
ntohl(msg.ticket.term),
ntohl(msg.ticket.term_valid_for));
return transport()->broadcast(&msg, sizeof(msg));
}
int new_round(struct ticket_config *tk)
{
int rv = 0;
disown_ticket(tk);
/* New vote round; §5.2 */
if (local->type == SITE) {
rv = new_election(tk, NULL, 1);
ticket_write(tk);
}
return rv;
}
static void ticket_cron(struct ticket_config *tk)
{
time_t now;
int vote_cnt;
struct booth_site *new_leader;
now = time(NULL);
R(tk);
/* Has an owner, has an expiry date, and expiry date in the past?
* Losing the ticket must happen in _every_ state. */
if (tk->term_expires &&
is_owned(tk) &&
now >= tk->term_expires) {
log_warn("LOST ticket: %s no longer at %s",
tk->name,
ticket_leader_string(tk));
/* Couldn't renew in time - ticket lost. */
new_round(tk);
return;
}
R(tk);
switch(tk->state) {
case ST_INIT:
/* init state, nothing to do */
break;
case ST_FOLLOWER:
/* nothing here either, ticket loss is caught earlier
* */
break;
case ST_CANDIDATE:
/* §5.2 */
/* not everybody answered, but if we have majority... */
new_leader = majority_votes(tk);
if (new_leader) {
leader_elected(tk, new_leader);
} else if (now > tk->election_end) {
/* This is previous election timed out */
new_election(tk, NULL, 0);
}
break;
case ST_LEADER:
/* we get here after we broadcasted a heartbeat;
* by this time all sites should've acked the heartbeat
*/
- vote_cnt = count_bits(tk->acks_received) - 1;
- if ((vote_cnt+1) < booth_conf->site_count) {
+ if (tk->acks_expected) {
if (!majority_of_bits(tk, tk->acks_received)) {
tk->retry_number ++;
+ vote_cnt = count_bits(tk->acks_received) - 1;
if (!vote_cnt) {
log_warn("no answers to heartbeat for ticket %s on try #%d, "
"we are alone",
tk->name,
tk->retry_number);
} else {
log_warn("not enough answers to heartbeat for ticket %s on try #%d: "
"only got %d answers",
tk->name,
tk->retry_number,
vote_cnt);
}
/* Don't give up, though - there's still some time until leadership is lost. */
}
}
send_heartbeat(tk);
if (tk->retry_number < tk->retries) {
ticket_activate_timeout(tk);
} else {
set_ticket_wakeup(tk);
}
break;
default:
break;
}
R(tk);
}
void process_tickets(void)
{
struct ticket_config *tk;
int i;
struct timeval now;
float sec_until;
gettimeofday(&now, NULL);
foreach_ticket(i, tk) {
sec_until = timeval_to_float(tk->next_cron) - timeval_to_float(now);
if (0)
log_debug("ticket %s next cron %" PRIx64 ".%03d, "
"now %" PRIx64 "%03d, in %f",
tk->name,
(uint64_t)tk->next_cron.tv_sec, timeval_msec(tk->next_cron),
(uint64_t)now.tv_sec, timeval_msec(now),
sec_until);
if (sec_until > 0.0)
continue;
log_debug("ticket cron: doing %s", tk->name);
/* Set next value, handler may override.
* This should already be handled via the state logic;
* but to be on the safe side the renew repetition is
* duplicated here, too. */
set_ticket_wakeup(tk);
ticket_cron(tk);
}
}
void tickets_log_info(void)
{
struct ticket_config *tk;
int i;
foreach_ticket(i, tk) {
log_info("Ticket %s: state '%s' "
"commit index %d "
"leader %s "
"expires %-24.24s",
tk->name,
state_to_string(tk->state),
tk->commit_index,
ticket_leader_string(tk),
ctime(&tk->term_expires));
}
}
+static int all_replied(struct ticket_config *tk)
+{
+ return (count_bits(tk->acks_received) == booth_conf->site_count);
+}
+
+
+static void update_acks(
+ struct ticket_config *tk,
+ struct booth_site *sender,
+ struct booth_site *leader,
+ struct boothc_ticket_msg *msg
+ )
+{
+ uint32_t cmd;
+
+ cmd = ntohl(msg->header.cmd);
+ if (tk->acks_expected != cmd)
+ return;
+
+ /* got an ack! */
+ tk->acks_received |= sender->bitmask;
+
+ log_debug("got ACK from %s, %d/%d agree.",
+ site_string(sender),
+ count_bits(tk->acks_received),
+ booth_conf->site_count);
+
+ if (all_replied(tk)) {
+ tk->acks_expected = 0;
+ }
+}
+
/* UDP message receiver. */
int message_recv(struct boothc_ticket_msg *msg, int msglen)
{
uint32_t from;
struct booth_site *source;
struct ticket_config *tk;
struct booth_site *leader;
uint32_t leader_u;
if (check_boothc_header(&msg->header, sizeof(*msg)) < 0 ||
msglen != sizeof(*msg)) {
log_error("message receive error");
return -1;
}
from = ntohl(msg->header.from);
if (!find_site_by_id(from, &source) || !source) {
log_error("unknown sender: %08x", from);
return -1;
}
if (!check_ticket(msg->ticket.id, &tk)) {
log_warn("got invalid ticket name %s from %s",
msg->ticket.id, site_string(source));
return -EINVAL;
}
leader_u = ntohl(msg->ticket.leader);
if (!find_site_by_id(leader_u, &leader)) {
log_error("Message with unknown owner %u received", leader_u);
return -EINVAL;
}
+ update_acks(tk, source, leader, msg);
return raft_answer(tk, source, leader, msg);
}
void set_ticket_wakeup(struct ticket_config *tk)
{
struct timeval tv, now;
/* At least every hour, perhaps sooner. */
ticket_next_cron_in(tk, 3600);
switch (tk->state) {
case ST_LEADER:
assert(tk->leader == local);
gettimeofday(&now, NULL);
tv = now;
tv.tv_sec = next_vote_starts_at(tk);
/* If timestamp is in the past, look again in one second. */
if (timeval_compare(tv, now) <= 0)
tv.tv_sec = now.tv_sec + 1;
ticket_next_cron_at(tk, tv);
break;
case ST_CANDIDATE:
assert(tk->election_end);
ticket_next_cron_at_coarse(tk, tk->election_end);
break;
case ST_INIT:
case ST_FOLLOWER:
/* If there is (or should be) some owner, check on it later on.
* If no one is interested - don't care. */
if (is_owned(tk) &&
(local->type == SITE))
ticket_next_cron_at_coarse(tk,
tk->term_expires + tk->acquire_after);
break;
default:
log_error("unknown ticket state: %d", tk->state);
}
}
/* Given a state (in host byte order), return a human-readable (char*).
* An array is used so that multiple states can be printed in a single printf(). */
char *state_to_string(uint32_t state_ho)
{
union mu { cmd_request_t s; char c[5]; };
static union mu cache[6] = { { 0 } }, *cur;
static int current = 0;
current ++;
if (current >= sizeof(cache)/sizeof(cache[0]))
current = 0;
cur = cache + current;
cur->s = htonl(state_ho);
/* Shouldn't be necessary, union array is initialized with zeroes, and
* these bytes never get written. */
cur->c[4] = 0;
return cur->c;
}
int send_reject(struct booth_site *dest, struct ticket_config *tk, cmd_result_t code)
{
struct boothc_ticket_msg msg;
init_ticket_msg(&msg, OP_REJECTED, code, tk);
return booth_udp_send(dest, &msg, sizeof(msg));
}

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 5:02 PM (11 h, 2 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1019033
Default Alt Text
(52 KB)

Event Timeline