Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/src/config.c b/src/config.c
index a08bc51..058a98d 100644
--- a/src/config.c
+++ b/src/config.c
@@ -1,734 +1,740 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdio.h>
#include <ctype.h>
#include <stdlib.h>
#include <assert.h>
#include <zlib.h>
#include <sys/types.h>
#include <pwd.h>
#include <grp.h>
#include <errno.h>
#include <string.h>
#include "booth.h"
#include "config.h"
#include "raft.h"
#include "ticket.h"
#include "log.h"
static int ticket_size = 0;
static int ticket_realloc(void)
{
const int added = 5;
int had, want;
void *p;
had = booth_conf->ticket_allocated;
want = had + added;
p = realloc(booth_conf->ticket,
sizeof(struct ticket_config) * want);
if (!p) {
log_error("can't alloc more tickets");
return -ENOMEM;
}
booth_conf->ticket = p;
memset(booth_conf->ticket + had, 0,
sizeof(struct ticket_config) * added);
booth_conf->ticket_allocated = want;
return 0;
}
int add_site(char *address, int type);
int add_site(char *addr_string, int type)
{
int rv;
struct booth_site *site;
uLong nid;
uint32_t mask;
int i;
rv = 1;
if (booth_conf->site_count == MAX_NODES) {
log_error("too many nodes");
goto out;
}
if (strlen(addr_string)+1 >= sizeof(booth_conf->site[0].addr_string)) {
log_error("site address \"%s\" too long", addr_string);
goto out;
}
site = booth_conf->site + booth_conf->site_count;
site->family = BOOTH_PROTO_FAMILY;
site->type = type;
/* Make site_id start at a non-zero point.
* Perhaps use hash over string or address? */
strcpy(site->addr_string, addr_string);
site->index = booth_conf->site_count;
site->bitmask = 1 << booth_conf->site_count;
/* Catch site overflow */
assert(site->bitmask);
booth_conf->all_bits |= site->bitmask;
if (type == SITE)
booth_conf->sites_bits |= site->bitmask;
site->tcp_fd = -1;
booth_conf->site_count++;
rv = 0;
memset(&site->sa6, 0, sizeof(site->sa6));
nid = crc32(0L, NULL, 0);
/* Using the ASCII representation in site->addr_string (both sizeof()
* and strlen()) gives quite a lot of collisions; a brute-force run
* from 0.0.0.0 to 24.0.0.0 gives ~4% collisions, and this tends to
* increase even more.
* Whether there'll be a collision in real-life, with 3 or 5 nodes, is
* another question ... but for now get the ID from the binary
* representation - that had *no* collisions up to 32.0.0.0. */
if (inet_pton(AF_INET,
site->addr_string,
&site->sa4.sin_addr) > 0) {
site->family = AF_INET;
site->sa4.sin_family = site->family;
site->sa4.sin_port = htons(booth_conf->port);
site->saddrlen = sizeof(site->sa4);
site->addrlen = sizeof(site->sa4.sin_addr);
site->site_id = crc32(nid, (void*)&site->sa4.sin_addr, site->addrlen);
} else if (inet_pton(AF_INET6,
site->addr_string,
&site->sa6.sin6_addr) > 0) {
site->family = AF_INET6;
site->sa6.sin6_family = site->family;
site->sa6.sin6_flowinfo = 0;
site->sa6.sin6_port = htons(booth_conf->port);
site->saddrlen = sizeof(site->sa6);
site->addrlen = sizeof(site->sa6.sin6_addr);
site->site_id = crc32(nid, (void*)&site->sa6.sin6_addr, site->addrlen);
} else {
log_error("Address string \"%s\" is bad", site->addr_string);
rv = EINVAL;
}
/* Make sure we will never collide with NO_ONE,
* or be negative (to get "get_local_id() < 0" working). */
mask = 1 << (sizeof(site->site_id)*8 -1);
assert(NO_ONE & mask);
site->site_id &= ~mask;
/* Test for collisions with other sites */
for(i=0; i<site->index; i++)
if (booth_conf->site[i].site_id == site->site_id) {
log_error("Got a site-ID collision. Please file a bug on https://github.com/ClusterLabs/booth/issues/new, attaching the configuration file.");
exit(1);
}
out:
return rv;
}
inline static char *skip_while_in(const char *cp, int (*fn)(int), const char *allowed)
{
/* strchr() returns a pointer to the terminator if *cp == 0. */
while (*cp &&
(fn(*cp) ||
strchr(allowed, *cp)))
cp++;
/* discard "const" qualifier */
return (char*)cp;
}
inline static char *skip_while(char *cp, int (*fn)(int))
{
while (fn(*cp))
cp++;
return cp;
}
inline static char *skip_until(char *cp, char expected)
{
while (*cp && *cp != expected)
cp++;
return cp;
}
static inline int is_end_of_line(char *cp)
{
char c = *cp;
return c == '\n' || c == 0 || c == '#';
}
static int add_ticket(const char *name, struct ticket_config **tkp,
const struct ticket_config *def)
{
int rv;
struct ticket_config *tk;
if (booth_conf->ticket_count == booth_conf->ticket_allocated) {
rv = ticket_realloc();
if (rv < 0)
return rv;
}
tk = booth_conf->ticket + booth_conf->ticket_count;
booth_conf->ticket_count++;
+ tk->last_valid_tk = malloc(sizeof(struct ticket_config));
+ if (!tk->last_valid_tk) {
+ log_error("out of memory");
+ return -ENOMEM;
+ }
+ memset(tk->last_valid_tk, 0, sizeof(struct ticket_config));
if (!check_max_len_valid(name, sizeof(tk->name))) {
log_error("ticket name \"%s\" too long.", name);
return -EINVAL;
}
if (find_ticket_by_name(name, NULL)) {
log_error("ticket name \"%s\" used again.", name);
return -EINVAL;
}
if (* skip_while_in(name, isalnum, "-/")) {
log_error("ticket name \"%s\" invalid; only alphanumeric names.", name);
return -EINVAL;
}
strcpy(tk->name, name);
tk->timeout = def->timeout;
tk->term_duration = def->term_duration;
tk->retries = def->retries;
memcpy(tk->weight, def->weight, sizeof(tk->weight));
if (tkp)
*tkp = tk;
return 0;
}
static int validate_ticket(struct ticket_config *tk)
{
if (tk->timeout*(tk->retries+1) >= tk->term_duration/2) {
tk_log_error("total amount of time to "
"retry sending packets cannot exceed "
"half of the expiry time "
"(%d*(%d+1) >= %d/2)",
tk->timeout, tk->retries, tk->term_duration);
return 0;
}
return 1;
}
/* returns number of weights, or -1 on bad input. */
static int parse_weights(const char *input, int weights[MAX_NODES])
{
int i, v;
char *cp;
for(i=0; i<MAX_NODES; i++) {
/* End of input? */
if (*input == 0)
break;
v = strtol(input, &cp, 0);
if (input == cp) {
log_error("No integer weight value at \"%s\"", input);
return -1;
}
weights[i] = v;
while (*cp) {
/* Separator characters */
if (isspace(*cp) ||
strchr(",;:-+", *cp))
cp++;
/* Next weight */
else if (isdigit(*cp))
break;
/* Rest */
else {
log_error("Invalid character at \"%s\"", cp);
return -1;
}
}
input = cp;
}
/* Fill rest of vector. */
for(v=i; v<MAX_NODES; v++) {
weights[v] = 0;
}
return i;
}
int read_config(const char *path, int type)
{
char line[1024];
FILE *fp;
char *s, *key, *val, *end_of_key;
const char *cp, *error;
int i;
int lineno = 0;
int got_transport = 0;
struct ticket_config defaults = { { 0 } };
struct ticket_config *current_tk = NULL;
fp = fopen(path, "r");
if (!fp) {
log_error("failed to open %s: %s", path, strerror(errno));
return -1;
}
booth_conf = malloc(sizeof(struct booth_config)
+ TICKET_ALLOC * sizeof(struct ticket_config));
if (!booth_conf) {
log_error("failed to alloc memory for booth config");
return -ENOMEM;
}
memset(booth_conf, 0, sizeof(struct booth_config)
+ TICKET_ALLOC * sizeof(struct ticket_config));
ticket_size = TICKET_ALLOC;
booth_conf->proto = UDP;
booth_conf->port = BOOTH_DEFAULT_PORT;
/* Provide safe defaults. -1 is reserved, though. */
booth_conf->uid = -2;
booth_conf->gid = -2;
strcpy(booth_conf->site_user, "hacluster");
strcpy(booth_conf->site_group, "haclient");
strcpy(booth_conf->arb_user, "nobody");
strcpy(booth_conf->arb_group, "nobody");
parse_weights("", defaults.weight);
defaults.ext_verifier = NULL;
defaults.term_duration = DEFAULT_TICKET_EXPIRY;
defaults.timeout = DEFAULT_TICKET_TIMEOUT;
defaults.retries = DEFAULT_RETRIES;
defaults.acquire_after = 0;
error = "";
log_debug("reading config file %s", path);
while (fgets(line, sizeof(line), fp)) {
lineno++;
s = skip_while(line, isspace);
if (is_end_of_line(s))
continue;
key = s;
/* Key */
end_of_key = skip_while_in(key, isalnum, "-_");
if (end_of_key == key) {
error = "No key";
goto err;
}
if (!*end_of_key)
goto exp_equal;
/* whitespace, and something else but nothing more? */
s = skip_while(end_of_key, isspace);
if (*s != '=') {
exp_equal:
error = "Expected '=' after key";
goto err;
}
s++;
/* It's my buffer, and I terminate if I want to. */
/* But not earlier than that, because we had to check for = */
*end_of_key = 0;
/* Value tokenizing */
s = skip_while(s, isspace);
switch (*s) {
case '"':
case '\'':
val = s+1;
s = skip_until(val, *s);
/* Terminate value */
if (!*s) {
error = "Unterminated quoted string";
goto err;
}
/* Remove and skip quote */
*s = 0;
s++;
if (* skip_while(s, isspace)) {
error = "Surplus data after value";
goto err;
}
*s = 0;
break;
case 0:
no_value:
error = "No value";
goto err;
break;
default:
val = s;
/* Rest of line. */
i = strlen(s);
/* i > 0 because of "case 0" above. */
while (i > 0 && isspace(s[i-1]))
i--;
s += i;
*s = 0;
}
if (val == s)
goto no_value;
if (strlen(key) > BOOTH_NAME_LEN
|| strlen(val) > BOOTH_NAME_LEN) {
error = "key/value too long";
goto err;
}
if (strcmp(key, "transport") == 0) {
if (got_transport) {
error = "config file has multiple transport lines";
goto err;
}
if (strcasecmp(val, "UDP") == 0)
booth_conf->proto = UDP;
else if (strcasecmp(val, "SCTP") == 0)
booth_conf->proto = SCTP;
else {
error = "invalid transport protocol";
goto err;
}
got_transport = 1;
continue;
}
if (strcmp(key, "port") == 0) {
booth_conf->port = atoi(val);
continue;
}
if (strcmp(key, "name") == 0) {
safe_copy(booth_conf->name,
val, BOOTH_NAME_LEN,
"name");
continue;
}
if (strcmp(key, "site") == 0) {
if (add_site(val, SITE))
goto out;
continue;
}
if (strcmp(key, "arbitrator") == 0) {
if (add_site(val, ARBITRATOR))
goto out;
continue;
}
if (strcmp(key, "site-user") == 0) {
safe_copy(booth_conf->site_user, optarg, BOOTH_NAME_LEN,
"site-user");
continue;
}
if (strcmp(key, "site-group") == 0) {
safe_copy(booth_conf->site_group, optarg, BOOTH_NAME_LEN,
"site-group");
continue;
}
if (strcmp(key, "arbitrator-user") == 0) {
safe_copy(booth_conf->arb_user, optarg, BOOTH_NAME_LEN,
"arbitrator-user");
continue;
}
if (strcmp(key, "arbitrator-group") == 0) {
safe_copy(booth_conf->arb_group, optarg, BOOTH_NAME_LEN,
"arbitrator-group");
continue;
}
if (strcmp(key, "debug") == 0) {
if (type != CLIENT)
debug_level = max(debug_level, atoi(val));
continue;
}
if (strcmp(key, "ticket") == 0) {
if (current_tk && strcmp(current_tk->name, "__defaults__")) {
if (!validate_ticket(current_tk)) {
goto out;
}
}
if (!strcmp(val, "__defaults__")) {
current_tk = &defaults;
} else if (add_ticket(val, &current_tk, &defaults)) {
goto out;
}
/* current_tk is valid until another one is needed -
* and then it already has the new address and
* is valid again. */
continue;
}
if (strcmp(key, "expire") == 0) {
current_tk->term_duration = strtol(val, &s, 0);
if (*s || s == val || current_tk->term_duration<10) {
error = "Expected plain integer value >=10 for expire";
goto err;
}
continue;
}
if (strcmp(key, "timeout") == 0) {
current_tk->timeout = strtol(val, &s, 0);
if (*s || s == val || current_tk->timeout<1) {
error = "Expected plain integer value >=1 for timeout";
goto err;
}
continue;
}
if (strcmp(key, "retries") == 0) {
current_tk->retries = strtol(val, &s, 0);
if (*s || s == val ||
current_tk->retries<3 || current_tk->retries > 100) {
error = "Expected plain integer value in the range [3, 100] for retries";
goto err;
}
continue;
}
if (strcmp(key, "acquire-after") == 0) {
current_tk->acquire_after = strtol(val, &s, 0);
if (*s || s == val || current_tk->acquire_after<0) {
error = "Expected plain integer value >=1 for acquire-after";
goto err;
}
continue;
}
if (strcmp(key, "before-acquire-handler") == 0) {
if (current_tk->ext_verifier) {
free(current_tk->ext_verifier);
}
current_tk->ext_verifier = strdup(val);
if (!current_tk->ext_verifier) {
error = "Out of memory";
goto err;
}
continue;
}
if (strcmp(key, "weights") == 0) {
if (parse_weights(val, current_tk->weight) < 0)
goto out;
continue;
}
error = "Unknown item";
goto out;
}
if ((booth_conf->site_count % 2) == 0) {
log_warn("An odd number of nodes is strongly recommended!");
}
/* Default: make config name match config filename. */
if (!booth_conf->name[0]) {
cp = strrchr(path, '/');
if (!cp)
cp = path;
/* TODO: locale? */
/* NUL-termination by memset. */
for(i=0; i<BOOTH_NAME_LEN-1 && isalnum(*cp); i++)
booth_conf->name[i] = *(cp++);
/* Last resort. */
if (!booth_conf->name[0])
strcpy(booth_conf->name, "booth");
}
return 0;
err:
out:
log_error("%s in config file line %d",
error, lineno);
free(booth_conf);
booth_conf = NULL;
return -1;
}
int check_config(int type)
{
struct passwd *pw;
struct group *gr;
char *cp, *input;
if (!booth_conf)
return -1;
input = (type == ARBITRATOR)
? booth_conf->arb_user
: booth_conf->site_user;
if (!*input)
goto u_inval;
if (isdigit(input[0])) {
booth_conf->uid = strtol(input, &cp, 0);
if (*cp != 0) {
u_inval:
log_error("User \"%s\" cannot be resolved into a UID.", input);
return ENOENT;
}
}
else {
pw = getpwnam(input);
if (!pw)
goto u_inval;
booth_conf->uid = pw->pw_uid;
}
input = (type == ARBITRATOR)
? booth_conf->arb_group
: booth_conf->site_group;
if (!*input)
goto g_inval;
if (isdigit(input[0])) {
booth_conf->gid = strtol(input, &cp, 0);
if (*cp != 0) {
g_inval:
log_error("Group \"%s\" cannot be resolved into a UID.", input);
return ENOENT;
}
}
else {
gr = getgrnam(input);
if (!gr)
goto g_inval;
booth_conf->gid = gr->gr_gid;
}
/* TODO: check whether uid or gid is 0 again?
* The admin may shoot himself in the foot, though. */
return 0;
}
int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type)
{
struct booth_site *n;
int i;
if (!booth_conf)
return 0;
for (i = 0; i < booth_conf->site_count; i++) {
n = booth_conf->site + i;
if ((n->type == SITE || any_type) &&
strcmp(n->addr_string, site) == 0) {
*node = n;
return 1;
}
}
return 0;
}
int find_site_by_id(uint32_t site_id, struct booth_site **node)
{
struct booth_site *n;
int i;
if (site_id == NO_ONE) {
*node = no_leader;
return 1;
}
if (!booth_conf)
return 0;
for (i = 0; i < booth_conf->site_count; i++) {
n = booth_conf->site + i;
if (n->site_id == site_id) {
*node = n;
return 1;
}
}
return 0;
}
const char *type_to_string(int type)
{
switch (type)
{
case ARBITRATOR: return "arbitrator";
case SITE: return "site";
case CLIENT: return "client";
}
return "??invalid-type??";
}
diff --git a/src/config.h b/src/config.h
index e76efd2..1b346b1 100644
--- a/src/config.h
+++ b/src/config.h
@@ -1,236 +1,245 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _CONFIG_H
#define _CONFIG_H
#include <stdint.h>
#include "booth.h"
#include "raft.h"
#include "transport.h"
/** @{ */
/** Definitions for in-RAM data. */
#define MAX_NODES 16
#define TICKET_ALLOC 16
struct ticket_config {
/** \name Configuration items.
* @{ */
/** Name of ticket. */
boothc_ticket name;
/** How many seconds a term lasts (if not refreshed). */
int term_duration;
/** Network related timeouts. */
int timeout;
/** Retries before giving up. */
int retries;
/** If >0, time to wait for a site to get fenced.
* The ticket may be acquired after that timespan by
* another site. */
int acquire_after; /* TODO: needed? */
/* Program to ask whether it makes sense to
* acquire the ticket */
char *ext_verifier;
/** Node weights. */
int weight[MAX_NODES];
/** @} */
/** \name Runtime values.
* @{ */
/** Current state. */
server_state_e state;
/** Next state. Used at startup. */
server_state_e next_state;
/** When something has to be done */
struct timeval next_cron;
/** Current leader. This is effectively the log[] in Raft. */
struct booth_site *leader;
/** Leader that got lost. */
struct booth_site *lost_leader;
/** Is the ticket granted? */
int is_granted;
/** Timestamp of leadership expiration */
time_t term_expires;
/** End of election period */
time_t election_end;
struct booth_site *voted_for;
/** Who the various sites vote for.
* NO_OWNER = no vote yet. */
struct booth_site *votes_for[MAX_NODES];
/* bitmap */
uint64_t votes_received;
/** Last voting round that was seen. */
uint32_t current_term;
/** Do ticket updates whenever we get enough heartbeats.
* But do that only once.
* This is reset to 0 whenever we broadcast heartbeat and set
* to 1 once enough acks are received.
* Increased to 2 when the ticket is commited to the CIB (see
* delay_commit).
*/
uint32_t ticket_updated;
/** @} */
/** */
uint32_t commit_index;
/** */
uint32_t last_applied;
uint32_t next_index[MAX_NODES];
uint32_t match_index[MAX_NODES];
/* Why did we start the elections?
*/
cmd_reason_t election_reason;
/* if it is potentially dangerous to grant the ticket
* immediately, then this is set to some point in time,
* usually (now + term_duration + acquire_after)
*/
time_t delay_commit;
/* the last request RPC we sent
*/
uint32_t last_request;
/* if we expect some acks, then set this to the id of
* the RPC which others will send us; it is cleared once all
* replies were received
*/
uint32_t acks_expected;
/* bitmask of servers which sent acks
*/
uint64_t acks_received;
/* timestamp of the request, currently unused */
time_t req_sent_at;
/* we need to wait for MY_INDEX from other servers,
* hold the ticket processing for a while until they reply
*/
int start_postpone;
/* Do we need to update the copy in the CIB?
* Normally, the ticket is written only when it changes via
* the UPDATE RPC (for followers) and on expiration update
* (for leaders)
*/
int update_cib;
+ /* Is this ticket in election?
+ */
+ int in_election;
+
/* don't log warnings unnecessarily
*/
int expect_more_rejects;
/** \name Needed while proposals are being done.
* @{ */
+ /* Need to keep the previous valid ticket in case we moved to
+ * start new elections and another server asks for the ticket
+ * status. It would be wrong to send our candidate ticket.
+ */
+ struct ticket_config *last_valid_tk;
+
/** Whom to vote for the next time.
* Needed to push a ticket to someone else. */
#if 0
/** Bitmap of sites that acknowledge that state. */
uint64_t proposal_acknowledges;
/** When an incompletely acknowledged proposal gets done.
* If all peers agree, that happens sooner.
* See switch_state_to(). */
struct timeval proposal_switch;
/** Timestamp of proposal expiration. */
time_t proposal_expires;
#endif
/** Number of send retries left.
* Used on the new owner.
* Starts at 0, counts up. */
int retry_number;
/** @} */
};
-
struct booth_config {
char name[BOOTH_NAME_LEN];
transport_layer_t proto;
uint16_t port;
/** Stores the OR of sites bitmasks. */
uint64_t sites_bits;
/** Stores the OR of all members' bitmasks. */
uint64_t all_bits;
char site_user[BOOTH_NAME_LEN];
char site_group[BOOTH_NAME_LEN];
char arb_user[BOOTH_NAME_LEN];
char arb_group[BOOTH_NAME_LEN];
uid_t uid;
gid_t gid;
int site_count;
struct booth_site site[MAX_NODES];
int ticket_count;
int ticket_allocated;
struct ticket_config *ticket;
};
extern struct booth_config *booth_conf;
int read_config(const char *path, int type);
int check_config(int type);
int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type);
int find_site_by_id(uint32_t site_id, struct booth_site **node);
const char *type_to_string(int type);
#include <stdio.h>
#define R(tk_) do { if (ANYDEBUG) printf("## %12s:%3d state %s, %d:%d, " \
"leader %s, exp %s", __FILE__, __LINE__, \
state_to_string(tk_->state), tk_->current_term, \
tk_->commit_index, site_string(tk_->leader), ctime(&tk_->term_expires)); } while(0)
#endif /* _CONFIG_H */
diff --git a/src/inline-fn.h b/src/inline-fn.h
index bdb690e..b1491dd 100644
--- a/src/inline-fn.h
+++ b/src/inline-fn.h
@@ -1,331 +1,335 @@
/*
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _INLINE_FN_H
#define _INLINE_FN_H
#include <time.h>
#include <sys/time.h>
#include <assert.h>
#include <string.h>
#include "config.h"
#include "transport.h"
inline static uint32_t get_local_id(void)
{
return local ? local->site_id : -1;
}
inline static uint32_t get_node_id(struct booth_site *node)
{
return node ? node->site_id : NO_ONE;
}
inline static int term_time_left(const struct ticket_config *tk)
{
int left;
left = tk->term_expires - time(NULL);
return (left < 0) ? 0 : left;
}
/** Returns number of seconds left, if any. */
inline static int leader_and_valid(const struct ticket_config *tk)
{
if (tk->leader != local)
return 0;
return term_time_left(tk);
}
/** Is this some leader? */
inline static int is_owned(const struct ticket_config *tk)
{
return (tk->leader && tk->leader != no_leader);
}
static inline void init_header_bare(struct boothc_header *h) {
h->magic = htonl(BOOTHC_MAGIC);
h->version = htonl(BOOTHC_VERSION);
h->from = htonl(local->site_id);
h->iv = htonl(0);
h->auth1 = htonl(0);
h->auth2 = htonl(0);
}
static inline void init_header(struct boothc_header *h,
int cmd, int options,
int result, int reason, int data_len)
{
init_header_bare(h);
h->length = htonl(data_len);
h->cmd = htonl(cmd);
h->options = htonl(options);
h->result = htonl(result);
h->reason = htonl(reason);
}
static inline void init_ticket_site_header(struct boothc_ticket_msg *msg, int cmd)
{
init_header(&msg->header, cmd, 0, 0, 0, sizeof(*msg));
}
+#define my_last_term(tk) \
+ (((tk)->state == ST_CANDIDATE && (tk)->last_valid_tk->current_term) ? \
+ (tk)->last_valid_tk->current_term : (tk)->current_term)
+
static inline void init_ticket_msg(struct boothc_ticket_msg *msg,
int cmd, int rv, int reason,
struct ticket_config *tk)
{
assert(sizeof(msg->ticket.id) == sizeof(tk->name));
init_header(&msg->header, cmd, 0, rv, reason, sizeof(*msg));
if (!tk) {
memset(&msg->ticket, 0, sizeof(msg->ticket));
} else {
memcpy(msg->ticket.id, tk->name, sizeof(msg->ticket.id));
msg->ticket.leader = htonl(get_node_id(
(tk->leader && tk->leader != no_leader) ? tk->leader : tk->voted_for));
msg->ticket.term = htonl(tk->current_term);
msg->ticket.term_valid_for = htonl(term_time_left(tk));
msg->ticket.leader_commit = htonl(tk->commit_index);
}
}
static inline struct booth_transport const *transport(void)
{
return booth_transport + booth_conf->proto;
}
static inline const char *site_string(struct booth_site *site)
{
return site ? site->addr_string : "NONE";
}
static inline const char *ticket_leader_string(struct ticket_config *tk)
{
return site_string(tk->leader);
}
static inline void disown_ticket(struct ticket_config *tk)
{
tk->leader = NULL;
tk->is_granted = 0;
time(&tk->term_expires);
}
static inline int disown_if_expired(struct ticket_config *tk)
{
if (time(NULL) >= tk->term_expires ||
!tk->leader) {
disown_ticket(tk);
return 1;
}
return 0;
}
/* We allow half of the uint32_t to be used;
* half of that below, half of that above the current known "good" value.
* 0 UINT32_MAX
* |--------------------------+----------------+------------|
* | | |
* |--------+-------| allowed range
* |
* current commit index
*
* So, on overflow it looks like that:
* UINT32_MAX 0
* |--------------------------+-----------||---+------------|
* | | |
* |--------+-------| allowed range
* |
* current commit index
*
* This should be possible by using the same datatype and relying
* on the under/overflow semantics.
*
*
* Having 30 bits available, and assuming an expire time of
* one minute and a (high) commit index step of 64 == 2^6 (because
* of weights), we get 2^24 minutes of range - which is ~750
* years. "Should be enough for everybody."
*/
static inline int index_is_higher_than(uint32_t c_high, uint32_t c_low)
{
uint32_t diff;
if (c_high == c_low)
return 0;
diff = c_high - c_low;
if (diff < UINT32_MAX/4)
return 1;
diff = c_low - c_high;
if (diff < UINT32_MAX/4)
return 0;
assert(!"commit index out of range - invalid");
}
static inline uint32_t index_max2(uint32_t a, uint32_t b)
{
return index_is_higher_than(a, b) ? a : b;
}
static inline uint32_t index_max3(uint32_t a, uint32_t b, uint32_t c)
{
return index_max2( index_max2(a, b), c);
}
static inline double timeval_to_float(struct timeval tv)
{
return tv.tv_sec + tv.tv_usec*(double)1.0e-6;
}
static inline int timeval_msec(struct timeval tv)
{
int m;
m = tv.tv_usec / 1000;
if (m >= 1000)
m = 999;
return m;
}
static inline int timeval_compare(struct timeval tv1, struct timeval tv2)
{
if (tv1.tv_sec < tv2.tv_sec)
return -1;
if (tv1.tv_sec > tv2.tv_sec)
return +1;
if (tv1.tv_usec < tv2.tv_usec)
return -1;
if (tv1.tv_usec > tv2.tv_usec)
return +1;
return 0;
}
static inline int timeval_in_past(struct timeval which)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return timeval_compare(tv, which) > 0;
}
static inline time_t next_vote_starts_at(struct ticket_config *tk)
{
time_t half_exp, retries_needed, t;
/* If not owner, don't renew. */
if (tk->leader != local)
return 0;
/* Try to renew at half of expiry time. */
half_exp = tk->term_expires - tk->term_duration/2;
/* Also start renewal if we couldn't get
* a few message retransmission in the alloted
* expiry time. */
retries_needed = tk->term_expires - tk->timeout * tk->retries/2;
/* Return earlier timestamp. */
t = min(half_exp, retries_needed);
return t;
}
static inline int should_start_renewal(struct ticket_config *tk)
{
time_t now, when;
when = next_vote_starts_at(tk);
if (!when)
return 0;
time(&now);
return when <= now;
}
static inline void expect_replies(struct ticket_config *tk,
int reply_type)
{
tk->retry_number = 0;
tk->acks_expected = reply_type;
tk->acks_received = local->bitmask;
tk->req_sent_at = time(NULL);
tk->ticket_updated = 0;
}
static inline void no_resends(struct ticket_config *tk)
{
tk->retry_number = 0;
tk->acks_expected = 0;
}
static inline struct booth_site *my_vote(struct ticket_config *tk)
{
return tk->votes_for[ local->index ];
}
static inline int count_bits(uint64_t val) {
return __builtin_popcount(val);
}
static inline int majority_of_bits(struct ticket_config *tk, uint64_t val)
{
/* Use ">" to get majority decision, even for an even number
* of participants. */
return count_bits(val) * 2 >
booth_conf->site_count;
}
static inline int all_replied(struct ticket_config *tk)
{
return !(tk->acks_received ^ booth_conf->all_bits);
}
static inline int all_sites_replied(struct ticket_config *tk)
{
return !((tk->acks_received & booth_conf->sites_bits) ^ booth_conf->sites_bits);
}
#endif
diff --git a/src/raft.c b/src/raft.c
index dc7fb71..960e758 100644
--- a/src/raft.c
+++ b/src/raft.c
@@ -1,925 +1,941 @@
/*
* Copyright (C) 2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <inttypes.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <clplumbing/cl_random.h>
#include "booth.h"
#include "transport.h"
#include "inline-fn.h"
#include "config.h"
#include "raft.h"
#include "ticket.h"
#include "log.h"
inline static void clear_election(struct ticket_config *tk)
{
int i;
struct booth_site *site;
tk_log_debug("clear election");
tk->votes_received = 0;
foreach_node(i, site)
tk->votes_for[site->index] = NULL;
}
inline static void record_vote(struct ticket_config *tk,
struct booth_site *who,
struct booth_site *vote)
{
tk_log_debug("site %s votes for %s",
site_string(who),
site_string(vote));
if (!tk->votes_for[who->index]) {
tk->votes_for[who->index] = vote;
tk->votes_received |= who->bitmask;
} else {
if (tk->votes_for[who->index] != vote)
tk_log_warn("%s voted previously "
"for %s and now wants to vote for %s (ignored)",
site_string(who),
site_string(tk->votes_for[who->index]),
site_string(vote));
}
}
static int cmp_msg_ticket(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg)
{
- if (tk->current_term != ntohl(msg->ticket.term)) {
- return tk->current_term - ntohl(msg->ticket.term);
+ if (my_last_term(tk) != ntohl(msg->ticket.term)) {
+ return my_last_term(tk) - ntohl(msg->ticket.term);
}
/* compare commit_index only from the leader */
if (sender == leader) {
return tk->commit_index - ntohl(msg->ticket.leader_commit);
}
return 0;
}
static void update_term_from_msg(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
uint32_t i;
i = ntohl(msg->ticket.term);
/* if we failed to start the election, then accept the term
* from the leader
* */
if (tk->state == ST_CANDIDATE) {
tk->current_term = i;
} else {
tk->current_term = max(i, tk->current_term);
}
/* § 5.3 */
i = ntohl(msg->ticket.leader_commit);
tk->commit_index = max(i, tk->commit_index);
}
static void update_ticket_from_msg(struct ticket_config *tk,
struct booth_site *sender,
struct boothc_ticket_msg *msg)
{
int duration;
tk_log_debug("updating from %s (%d/%d)",
site_string(sender),
ntohl(msg->ticket.term), ntohl(msg->ticket.term_valid_for));
duration = min(tk->term_duration, ntohl(msg->ticket.term_valid_for));
tk->term_expires = time(NULL) + duration;
update_term_from_msg(tk, msg);
}
static void copy_ticket_from_msg(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
tk->term_expires = time(NULL) + ntohl(msg->ticket.term_valid_for);
tk->current_term = ntohl(msg->ticket.term);
tk->commit_index = ntohl(msg->ticket.leader_commit);
}
static void become_follower(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
copy_ticket_from_msg(tk, msg);
tk->state = ST_FOLLOWER;
tk->delay_commit = 0;
/* if we're following and the ticket was granted here
* then commit to CIB right away (we're probably restarting)
*/
if (tk->is_granted) {
disown_ticket(tk);
ticket_write(tk);
}
}
static void won_elections(struct ticket_config *tk)
{
tk->leader = local;
tk->state = ST_LEADER;
tk->term_expires = time(NULL) + tk->term_duration;
tk->election_end = 0;
tk->voted_for = NULL;
tk->commit_index++;
ticket_broadcast(tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0);
ticket_activate_timeout(tk);
}
static int is_tie(struct ticket_config *tk)
{
int i;
struct booth_site *v;
int count[MAX_NODES] = { 0, };
int max_votes = 0, max_cnt = 0;
for(i=0; i<booth_conf->site_count; i++) {
v = tk->votes_for[i];
if (!v)
continue;
count[v->index]++;
max_votes = max(max_votes, count[v->index]);
}
for(i=0; i<booth_conf->site_count; i++) {
if (count[i] == max_votes)
max_cnt++;
}
return max_cnt > 1;
}
static struct booth_site *majority_votes(struct ticket_config *tk)
{
int i, n;
struct booth_site *v;
int count[MAX_NODES] = { 0, };
for(i=0; i<booth_conf->site_count; i++) {
v = tk->votes_for[i];
if (!v)
continue;
n = v->index;
count[n]++;
tk_log_debug("Majority: %d %s wants %d %s => %d",
i, site_string(&booth_conf->site[i]),
n, site_string(v),
count[n]);
if (count[n]*2 <= booth_conf->site_count)
continue;
tk_log_debug("Majority reached: %d of %d for %s",
count[n], booth_conf->site_count,
site_string(v));
return v;
}
return NULL;
}
void elections_end(struct ticket_config *tk)
{
time_t now;
struct booth_site *new_leader;
now = time(NULL);
if (now > tk->election_end) {
/* This is previous election timed out */
tk_log_info("election timed out");
}
new_leader = majority_votes(tk);
if (new_leader == local) {
tk_log_info("granted successfully here");
won_elections(tk);
} else if (new_leader) {
tk_log_info("ticket granted at %s",
site_string(new_leader));
} else {
tk_log_info("nobody won elections, new elections");
new_election(tk, NULL, is_tie(tk), OR_AGAIN);
}
}
static int newer_term(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg,
int in_election)
{
uint32_t term;
/* it may happen that we hear about our newer term */
if (leader == local)
return 0;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (term > tk->current_term) {
tk->state = ST_FOLLOWER;
if (!in_election) {
tk->leader = leader;
tk_log_info("from %s: higher term %d vs. %d, following %s",
site_string(sender),
term, tk->current_term,
ticket_leader_string(tk));
} else {
tk_log_debug("from %s: higher term %d vs. %d (election)",
site_string(sender),
term, tk->current_term);
}
tk->current_term = term;
return 1;
}
return 0;
}
static int term_too_low(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg)
{
uint32_t term;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (term < tk->current_term) {
tk_log_info("sending reject to %s, its term too low "
"(%d vs. %d)", site_string(sender),
term, tk->current_term
);
send_reject(sender, tk, RLT_TERM_OUTDATED);
return 1;
}
return 0;
}
/* For follower. */
static int answer_HEARTBEAT (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
term = ntohl(msg->ticket.term);
tk_log_debug("heartbeat from leader: %s, have %s; term %d vs %d",
site_string(leader), ticket_leader_string(tk),
term, tk->current_term);
if (term < tk->current_term) {
if (sender == tk->leader) {
tk_log_info("trusting leader %s with a lower term (%d vs %d)",
site_string(leader), term, tk->current_term);
} else if (is_owned(tk)) {
tk_log_warn("different leader %s with a lower term "
"(%d vs %d), sending reject",
site_string(leader), term, tk->current_term);
return send_reject(sender, tk, RLT_TERM_OUTDATED);
}
}
/* got heartbeat, no rejects expected anymore */
tk->expect_more_rejects = 0;
+ /* and certainly not in election */
+ tk->in_election = 0;
+
/* Needed? */
newer_term(tk, sender, leader, msg, 0);
become_follower(tk, msg);
/* Racy??? */
assert(sender == leader || !leader);
tk->leader = leader;
/* Ack the heartbeat (we comply). */
return send_msg(OP_ACK, tk, sender);
}
static int process_UPDATE (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
if (is_owned(tk) && sender != tk->leader) {
tk_log_warn("different leader %s wants to update "
"our ticket, sending reject",
site_string(leader));
return send_reject(sender, tk, RLT_TERM_OUTDATED);
}
tk_log_debug("leader %s wants to update our ticket",
site_string(leader));
copy_ticket_from_msg(tk, msg);
ticket_write(tk);
/* run ticket_cron if the ticket expires */
set_ticket_wakeup(tk);
return 0;
}
static int process_REVOKE (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int rv;
if (tk->state == ST_INIT && tk->leader == no_leader) {
/* assume that our ack got lost */
rv = send_msg(OP_ACK, tk, sender);
} else if (tk->leader != sender) {
tk_log_error("%s wants to revoke ticket, "
"but it is not granted there (ignoring)",
site_string(sender));
return 1;
} else if (tk->state != ST_FOLLOWER) {
tk_log_error("unexpected ticket revoke from %s "
"(in state %s) (ignoring)",
site_string(sender),
state_to_string(tk->state));
return 1;
} else {
tk_log_info("%s revokes ticket",
site_string(tk->leader));
reset_ticket(tk);
tk->leader = no_leader;
ticket_write(tk);
rv = send_msg(OP_ACK, tk, sender);
}
return rv;
}
/* For leader. */
static int process_ACK(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
term = ntohl(msg->ticket.term);
if (newer_term(tk, sender, leader, msg, 0)) {
/* unexpected higher term */
tk_log_warn("got higher term from %s (%d vs. %d)",
site_string(sender),
term, tk->current_term);
return 0;
}
/* Don't send a reject. */
if (term < tk->current_term) {
/* Doesn't know what he's talking about - perhaps
* doesn't receive our packets? */
tk_log_warn("unexpected term "
"from %s (%d vs. %d) (ignoring)",
site_string(sender),
term, tk->current_term);
return 0;
}
/* if the ticket is to be revoked, further processing is not
* interesting (and dangerous) */
if (tk->next_state == ST_INIT || tk->state == ST_INIT)
return 0;
if (term == tk->current_term &&
leader == tk->leader) {
if (majority_of_bits(tk, tk->acks_received)) {
/* OK, at least half of the nodes are reachable;
* Update the ticket and send update messages out
*/
return leader_update_ticket(tk);
}
}
return 0;
}
static int process_VOTE_FOR(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
/* leader wants to step down? */
if (leader == no_leader && sender == tk->leader &&
(tk->state == ST_FOLLOWER || tk->state == ST_CANDIDATE)) {
tk_log_info("%s wants to give the ticket away",
site_string(tk->leader));
time(&tk->term_expires);
return new_round(tk, OR_STEPDOWN);
}
if (tk->state != ST_CANDIDATE) {
/* lost candidate status, somebody rejected our proposal */
tk_log_debug("candidate status lost, ignoring vote_for from %s",
site_string(sender));
return 0;
}
if (term_too_low(tk, sender, leader, msg))
return 0;
if (newer_term(tk, sender, leader, msg, 0)) {
clear_election(tk);
}
record_vote(tk, sender, leader);
/* only if all voted can we take the ticket now, otherwise
* wait for timeout in ticket_cron */
if (!tk->acks_expected) {
/* §5.2 */
elections_end(tk);
}
return 0;
}
static int process_REJECTED(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t rv;
rv = ntohl(msg->header.result);
if (tk->state == ST_CANDIDATE &&
leader == local) {
/* the sender has us as the leader (!)
* the elections will time out, then we can try again
*/
tk_log_warn("ticket was granted to us "
"(and we didn't know)");
tk->expect_more_rejects = 1;
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_OUTDATED) {
tk_log_warn("ticket outdated (term %d), granted to %s",
ntohl(msg->ticket.term),
site_string(leader)
);
tk->leader = leader;
tk->expect_more_rejects = 1;
become_follower(tk, msg);
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_STILL_VALID) {
if (tk->lost_leader == leader) {
if (tk->election_reason == OR_TKT_LOST) {
tk_log_warn("%s still has the ticket valid, "
"we'll backup a bit",
site_string(sender));
} else {
tk_log_warn("%s unexpecetedly rejects elections",
site_string(sender));
}
} else {
tk_log_warn("ticket was granted to %s "
"(and we didn't know)",
site_string(leader));
}
tk->leader = leader;
become_follower(tk, msg);
tk->expect_more_rejects = 1;
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_YOU_OUTDATED) {
tk->leader = leader;
tk->expect_more_rejects = 1;
if (leader && leader != no_leader) {
tk_log_warn("our ticket is outdated, granted to %s",
site_string(leader));
become_follower(tk, msg);
} else {
tk_log_warn("our ticket is outdated and revoked");
update_ticket_from_msg(tk, sender, msg);
tk->state = ST_INIT;
}
return 0;
}
if (!tk->expect_more_rejects) {
tk_log_warn("from %s: in state %s, got %s (unexpected reject)",
site_string(sender),
state_to_string(tk->state),
state_to_string(rv));
}
return 0;
}
static int ticket_seems_ok(struct ticket_config *tk)
{
int time_left;
time_left = term_time_left(tk);
if (!time_left)
return 0; /* quite sure */
if (tk->state == ST_CANDIDATE)
return 0; /* in state of flux */
if (tk->state == ST_LEADER)
return 1; /* quite sure */
if (tk->state == ST_FOLLOWER &&
time_left >= tk->term_duration/3)
return 1; /* almost quite sure */
return 0;
}
static int test_reason(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int reason;
reason = ntohl(msg->header.reason);
if (reason == OR_TKT_LOST) {
if (tk->state == ST_INIT &&
tk->leader == no_leader) {
tk_log_warn("%s claims that the ticket is lost, "
"but it's in %s state (reject sent)",
site_string(sender),
state_to_string(tk->state)
);
return RLT_YOU_OUTDATED;
}
if (ticket_seems_ok(tk)) {
tk_log_warn("%s claims that the ticket is lost, "
"but it is ok here (reject sent)",
site_string(sender));
return RLT_TERM_STILL_VALID;
}
}
return 0;
}
/* §5.2 */
static int answer_REQ_VOTE(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int valid;
struct boothc_ticket_msg omsg;
cmd_result_t inappr_reason;
inappr_reason = test_reason(tk, sender, leader, msg);
if (inappr_reason)
return send_reject(sender, tk, inappr_reason);
valid = term_time_left(tk);
/* allow the leader to start new elections on valid tickets */
if (sender != tk->leader && valid) {
tk_log_warn("election from %s rejected "
"(we have %s as ticket owner), ticket still valid for %ds",
site_string(sender), site_string(tk->leader), valid);
return send_reject(sender, tk, RLT_TERM_STILL_VALID);
}
if (term_too_low(tk, sender, leader, msg))
return 0;
+ /* set this, so that we know not to send status for the
+ * ticket */
+ tk->in_election = 1;
+
/* if it's a newer term or ... */
if (newer_term(tk, sender, leader, msg, 1)) {
clear_election(tk);
goto vote_for_sender;
}
/* ... we didn't vote yet, then vote for the sender */
/* §5.2, §5.4 */
if (!tk->voted_for) {
vote_for_sender:
tk->voted_for = sender;
record_vote(tk, sender, leader);
}
init_ticket_msg(&omsg, OP_VOTE_FOR, RLT_SUCCESS, 0, tk);
omsg.ticket.leader = htonl(get_node_id(tk->voted_for));
return booth_udp_send(sender, &omsg, sizeof(omsg));
}
int new_election(struct ticket_config *tk,
struct booth_site *preference, int update_term, cmd_reason_t reason)
{
struct booth_site *new_leader;
time_t now;
time(&now);
tk_log_debug("start new election?, now=%" PRIi64 ", end %" PRIi64,
(int64_t)now, (int64_t)(tk->election_end));
if (now <= tk->election_end)
return 0;
-
/* §5.2 */
/* If there was _no_ answer, don't keep incrementing the term number
* indefinitely. If there was no peer, there'll probably be no one
* listening now either. However, we don't know if we were
* invoked due to a timeout (caller does).
*/
- if (update_term)
+ if (update_term) {
+ /* save the previous term, we may need to send out the
+ * MY_INDEX message */
+ if (tk->state != ST_CANDIDATE) {
+ memcpy(tk->last_valid_tk, tk, sizeof(struct ticket_config));
+ }
tk->current_term++;
+ }
tk->term_expires = 0;
tk->election_end = now + tk->timeout;
tk_log_info("starting new election (term=%d)",
tk->current_term);
clear_election(tk);
if(preference)
new_leader = preference;
else
new_leader = (local->type == SITE) ? local : NULL;
record_vote(tk, local, new_leader);
tk->voted_for = new_leader;
tk->state = ST_CANDIDATE;
/* some callers may want just to repeat on timeout */
if (reason == OR_AGAIN) {
reason = tk->election_reason;
} else {
tk->election_reason = reason;
}
ticket_broadcast(tk, OP_REQ_VOTE, OP_VOTE_FOR, RLT_SUCCESS, reason);
ticket_activate_timeout(tk);
return 0;
}
int new_round(struct ticket_config *tk, cmd_reason_t reason)
{
int rv = 0;
struct timespec delay;
if (local->type == ARBITRATOR) {
/* we cannot really do anything, but keep the copy for
* somebody else who perhaps can */
return 0;
}
disown_ticket(tk);
ticket_write(tk);
/* New vote round; §5.2 */
/* delay the next election start for up to 200ms */
delay.tv_sec = 0;
delay.tv_nsec = 1000000L * (long)cl_rand_from_interval(0, 200);
nanosleep(&delay, NULL);
rv = new_election(tk, NULL, 1, reason);
return rv;
}
/* we were a leader and somebody says that they have a more up
* to date ticket
* there was probably connectivity loss
* tricky
*/
static int leader_handle_newer_ticket(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
update_term_from_msg(tk, msg);
if (leader != no_leader && leader && leader != local) {
/* eek, two leaders, split brain */
/* normally shouldn't happen; run election */
tk_log_error("from %s: ticket granted to %s! (revoking locally)",
site_string(sender),
site_string(leader)
);
} else if (term_time_left(tk)) {
/* eek, two leaders, split brain */
/* normally shouldn't happen; run election */
tk_log_error("from %s: ticket granted to %s! (revoking locally)",
site_string(sender),
site_string(leader)
);
}
tk->next_state = ST_LEADER;
return 0;
}
/* reply to STATUS */
static int process_MY_INDEX (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int i;
int expired;
expired = !msg->ticket.term_valid_for;
i = cmp_msg_ticket(tk, sender, leader, msg);
if (i > 0) {
/* let them know about our newer ticket */
- send_msg(OP_MY_INDEX, tk, sender);
+ /* but if we're voting in elections, our ticket is not
+ * valid yet, don't send it */
+ if (!tk->in_election)
+ send_msg(OP_MY_INDEX, tk, sender);
if (tk->state == ST_LEADER) {
tk_log_info("sending ticket update to %s",
site_string(sender));
return send_msg(OP_UPDATE, tk, sender);
}
}
/* we have a newer or equal ticket and theirs is expired,
* nothing more to do here */
if (i >= 0 && expired) {
return 0;
}
if (tk->state == ST_LEADER) {
/* we're the leader, thread carefully */
if (expired) {
/* if their ticket is expired,
* nothing more to do */
return 0;
}
if (i < 0) {
/* they have a newer ticket, trouble if we're already leader
* for it */
tk_log_warn("from %s: more up to date ticket at %s",
site_string(sender),
site_string(leader)
);
return leader_handle_newer_ticket(tk, sender, leader, msg);
} else {
/* we have the ticket and we don't care */
return 0;
}
}
/* their ticket is either newer or not expired, don't
* ignore it */
update_ticket_from_msg(tk, sender, msg);
tk->leader = leader;
update_ticket_state(tk, sender);
set_ticket_wakeup(tk);
return 0;
}
int raft_answer(
struct ticket_config *tk,
struct booth_site *from,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int cmd;
int rv;
rv = 0;
cmd = ntohl(msg->header.cmd);
tk_log_debug("got message %s from %s",
state_to_string(cmd),
site_string(from));
switch (cmd) {
case OP_REQ_VOTE:
rv = answer_REQ_VOTE(tk, from, leader, msg);
break;
case OP_VOTE_FOR:
rv = process_VOTE_FOR(tk, from, leader, msg);
break;
case OP_ACK:
if (tk->leader == local &&
tk->state == ST_LEADER)
rv = process_ACK(tk, from, leader, msg);
break;
case OP_HEARTBEAT:
if (tk->leader != local &&
(tk->state == ST_INIT ||tk->state == ST_FOLLOWER ||
tk->state == ST_CANDIDATE))
rv = answer_HEARTBEAT(tk, from, leader, msg);
else {
tk_log_warn("unexpected message %s, from %s",
state_to_string(cmd),
site_string(from));
rv = -EINVAL;
}
break;
case OP_UPDATE:
if (tk->leader != local && tk->leader == leader &&
tk->state == ST_FOLLOWER) {
rv = process_UPDATE(tk, from, leader, msg);
} else {
tk_log_warn("unexpected message %s, from %s",
state_to_string(cmd),
site_string(from));
rv = -EINVAL;
}
break;
case OP_REJECTED:
rv = process_REJECTED(tk, from, leader, msg);
break;
case OP_REVOKE:
rv = process_REVOKE(tk, from, leader, msg);
break;
case OP_MY_INDEX:
rv = process_MY_INDEX(tk, from, leader, msg);
break;
case OP_STATUS:
- rv = send_msg(OP_MY_INDEX, tk, from);
+ if (!tk->in_election)
+ rv = send_msg(OP_MY_INDEX, tk, from);
break;
default:
tk_log_error("unknown message %s, from %s",
state_to_string(cmd), site_string(from));
rv = -EINVAL;
}
return rv;
}
diff --git a/src/ticket.c b/src/ticket.c
index a972619..aaa18de 100644
--- a/src/ticket.c
+++ b/src/ticket.c
@@ -1,971 +1,985 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <inttypes.h>
#include <stdio.h>
#include <assert.h>
#include <time.h>
#include "ticket.h"
#include "config.h"
#include "pacemaker.h"
#include "inline-fn.h"
#include "log.h"
#include "booth.h"
#include "raft.h"
#include "handler.h"
#define TK_LINE 256
/* Untrusted input, must fit (incl. \0) in a buffer of max chars. */
int check_max_len_valid(const char *s, int max)
{
int i;
for(i=0; i<max; i++)
if (s[i] == 0)
return 1;
return 0;
}
int find_ticket_by_name(const char *ticket, struct ticket_config **found)
{
int i;
if (found)
*found = NULL;
for (i = 0; i < booth_conf->ticket_count; i++) {
if (!strcmp(booth_conf->ticket[i].name, ticket)) {
if (found)
*found = booth_conf->ticket + i;
return 1;
}
}
return 0;
}
int check_ticket(char *ticket, struct ticket_config **found)
{
if (found)
*found = NULL;
if (!booth_conf)
return 0;
if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name)))
return 0;
return find_ticket_by_name(ticket, found);
}
int check_site(char *site, int *is_local)
{
struct booth_site *node;
if (!check_max_len_valid(site, sizeof(node->addr_string)))
return 0;
if (find_site_by_name(site, &node, 0)) {
*is_local = node->local;
return 1;
}
return 0;
}
int ticket_write(struct ticket_config *tk)
{
if (local->type != SITE)
return -EINVAL;
if (tk->leader == local) {
pcmk_handler.grant_ticket(tk);
} else {
pcmk_handler.revoke_ticket(tk);
}
tk->update_cib = 0;
return 0;
}
/* Ask an external program whether getting the ticket
* makes sense.
* Eg. if the services have a failcount of INFINITY,
* we can't serve here anyway. */
int test_external_prog(struct ticket_config *tk,
int start_election)
{
int rv;
rv = run_handler(tk, tk->ext_verifier, 1);
if (rv) {
tk_log_warn("we are not allowed to acquire ticket");
/* Give it to somebody else.
* Just send a VOTE_FOR message, so the
* others can start elections. */
if (leader_and_valid(tk)) {
reset_ticket(tk);
ticket_write(tk);
if (start_election) {
ticket_broadcast(tk, OP_VOTE_FOR, 0, RLT_SUCCESS, OR_LOCAL_FAIL);
}
}
}
return rv;
}
/* Try to acquire a ticket
* Could be manual grant or after ticket loss
*/
int acquire_ticket(struct ticket_config *tk, cmd_reason_t reason)
{
if (test_external_prog(tk, 0))
return RLT_EXT_FAILED;
return new_election(tk, local, 1, reason);
}
/** Try to get the ticket for the local site.
* */
int do_grant_ticket(struct ticket_config *tk, int options)
{
int rv;
tk_log_info("granting ticket");
if (tk->leader == local)
return RLT_SUCCESS;
if (is_owned(tk))
return RLT_OVERGRANT;
tk->delay_commit = time(NULL) +
tk->term_duration + tk->acquire_after;
if (options & OPT_IMMEDIATE) {
tk_log_warn("granting ticket immediately! If there are "
"unreachable sites, _hope_ you are sure that they don't "
"have the ticket!");
tk->delay_commit = 0;
}
rv = acquire_ticket(tk, OR_ADMIN);
if (rv)
tk->delay_commit = 0;
return rv;
}
static int start_revoke_ticket(struct ticket_config *tk)
{
tk_log_info("revoking ticket");
reset_ticket(tk);
tk->leader = no_leader;
ticket_write(tk);
ticket_activate_timeout(tk);
return ticket_broadcast(tk, OP_REVOKE, OP_ACK, RLT_SUCCESS, OR_ADMIN);
}
/** Ticket revoke.
* Only to be started from the leader. */
int do_revoke_ticket(struct ticket_config *tk)
{
if (tk->acks_expected) {
tk_log_info("delay ticket revoke until the current operation finishes");
tk->next_state = ST_INIT;
return 0;
} else {
return start_revoke_ticket(tk);
}
}
int list_ticket(char **pdata, unsigned int *len)
{
struct ticket_config *tk;
char timeout_str[64];
char pending_str[64];
char *data, *cp;
int i, alloc;
*pdata = NULL;
*len = 0;
alloc = 256 +
booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128);
data = malloc(alloc);
if (!data)
return -ENOMEM;
cp = data;
foreach_ticket(i, tk) {
if (tk->term_expires != 0)
strftime(timeout_str, sizeof(timeout_str), "%F %T",
localtime(&tk->term_expires));
else
strcpy(timeout_str, "INF");
if (tk->leader == local && tk->delay_commit > time(NULL)) {
strcpy(pending_str, " (commit pending until ");
strftime(pending_str + strlen(" (commit pending until "),
sizeof(pending_str) - strlen(" (commit pending until ") - 1,
"%F %T", localtime(&tk->delay_commit));
strcat(pending_str, ")");
} else
*pending_str = '\0';
cp += snprintf(cp,
alloc - (cp - data),
"ticket: %s, leader: %s",
tk->name,
ticket_leader_string(tk));
if (is_owned(tk)) {
cp += snprintf(cp,
alloc - (cp - data),
", expires: %s, commit: %d%s\n",
timeout_str,
tk->commit_index,
pending_str);
} else {
cp += snprintf(cp, alloc - (cp - data), "\n");
}
if (alloc - (cp - data) <= 0)
return -ENOMEM;
}
*pdata = data;
*len = cp - data;
return 0;
}
void reset_ticket(struct ticket_config *tk)
{
disown_ticket(tk);
tk->state = ST_INIT;
tk->voted_for = NULL;
}
static void reacquire_ticket(struct ticket_config *tk)
{
int valid;
const char *where_granted = "\0";
char buff[64];
valid = (tk->term_expires >= time(NULL));
if (tk->leader == local) {
where_granted = "granted here";
} else {
snprintf(buff, sizeof(buff), "granted to %s",
site_string(tk->leader));
where_granted = buff;
}
if (!valid) {
tk_log_warn("%s, but not valid "
"anymore (will try to reacquire)", where_granted);
}
if (tk->is_granted && tk->leader != local) {
if (tk->leader && tk->leader != no_leader) {
tk_log_error("granted here, but also %s, "
"that's really too bad (will try to reacquire)",
where_granted);
} else {
tk_log_warn("granted here, but we're "
"not recorded as the grantee (will try to reacquire)");
}
}
/* try to acquire the
* ticket through new elections
*/
acquire_ticket(tk, OR_REACQUIRE);
}
void update_ticket_state(struct ticket_config *tk, struct booth_site *sender)
{
+ if (tk->state == ST_CANDIDATE) {
+ tk_log_info("learned from %s about "
+ "newer ticket, stopping elections",
+ site_string(sender));
+ }
+
if (tk->leader == local || tk->is_granted) {
/* message from a live leader with valid ticket? */
if (sender == tk->leader && term_time_left(tk)) {
if (tk->is_granted) {
tk_log_warn("ticket was granted here, "
"but it's live at %s (revoking here)",
site_string(sender));
} else {
tk_log_info("ticket live at %s",
site_string(sender));
}
disown_ticket(tk);
ticket_write(tk);
tk->state = ST_FOLLOWER;
tk->next_state = ST_FOLLOWER;
} else {
+ if (tk->state == ST_CANDIDATE) {
+ tk->state = ST_FOLLOWER;
+ }
tk->next_state = ST_LEADER;
}
} else {
if (!tk->leader || tk->leader == no_leader) {
if (sender)
tk_log_info("ticket is not granted");
else
tk_log_info("ticket is not granted (from CIB)");
tk->state = ST_INIT;
} else {
if (sender)
tk_log_info("ticket granted to %s (says %s)",
site_string(tk->leader),
site_string(sender));
else
tk_log_info("ticket granted to %s (from CIB)",
site_string(tk->leader));
tk->state = ST_FOLLOWER;
/* just make sure that we check the ticket soon */
tk->next_state = ST_FOLLOWER;
}
}
}
int setup_ticket(void)
{
struct ticket_config *tk;
int i;
foreach_ticket(i, tk) {
reset_ticket(tk);
if (local->type == SITE) {
if (!pcmk_handler.load_ticket(tk)) {
update_ticket_state(tk, NULL);
}
tk->update_cib = 1;
}
tk_log_info("broadcasting state query");
/* wait until all send their status (or the first
* timeout) */
tk->start_postpone = 1;
ticket_broadcast(tk, OP_STATUS, OP_MY_INDEX, RLT_SUCCESS, 0);
}
return 0;
}
int ticket_answer_list(int fd, struct boothc_ticket_msg *msg)
{
char *data;
int olen, rv;
struct boothc_header hdr;
rv = list_ticket(&data, &olen);
if (rv < 0)
return rv;
init_header(&hdr, CMR_LIST, 0, RLT_SUCCESS, 0, sizeof(hdr) + olen);
return send_header_plus(fd, &hdr, data, olen);
}
int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg)
{
int rv;
struct ticket_config *tk;
if (!check_ticket(msg->ticket.id, &tk)) {
log_warn("client asked to grant unknown ticket %s",
msg->ticket.id);
rv = RLT_INVALID_ARG;
goto reply;
}
if (is_owned(tk)) {
log_warn("client wants to grant an (already granted!) ticket %s",
msg->ticket.id);
rv = RLT_OVERGRANT;
goto reply;
}
rv = do_grant_ticket(tk, ntohl(msg->header.options));
reply:
init_header(&msg->header, CMR_GRANT, 0, rv ?: RLT_ASYNC, 0, sizeof(*msg));
return send_ticket_msg(fd, msg);
}
int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg)
{
int rv;
struct ticket_config *tk;
if (!check_ticket(msg->ticket.id, &tk)) {
log_warn("client wants to revoke an unknown ticket %s",
msg->ticket.id);
rv = RLT_INVALID_ARG;
goto reply;
}
if (!is_owned(tk)) {
log_info("client wants to revoke a free ticket %s",
msg->ticket.id);
rv = RLT_TICKET_IDLE;
goto reply;
}
if (tk->leader != local) {
log_info("the ticket %s is not granted here, "
"redirect to %s",
msg->ticket.id, ticket_leader_string(tk));
rv = RLT_REDIRECT;
goto reply;
}
rv = do_revoke_ticket(tk);
if (rv == 0)
rv = RLT_ASYNC;
reply:
init_ticket_msg(msg, CMR_REVOKE, rv, 0, tk);
return send_ticket_msg(fd, msg);
}
int ticket_broadcast(struct ticket_config *tk,
cmd_request_t cmd, cmd_request_t expected_reply,
cmd_result_t res, cmd_reason_t reason)
{
struct boothc_ticket_msg msg;
init_ticket_msg(&msg, cmd, res, reason, tk);
tk_log_debug("broadcasting '%s' (term=%d, valid=%d)",
state_to_string(cmd),
ntohl(msg.ticket.term),
ntohl(msg.ticket.term_valid_for));
tk->last_request = cmd;
if (expected_reply) {
expect_replies(tk, expected_reply);
}
return transport()->broadcast(&msg, sizeof(msg));
}
/* is it safe to commit the grant?
* if we didn't hear from all sites on the initial grant, we may
* need to delay the commit
*
* TODO: investigate possibility to devise from history whether a
* missing site could be holding a ticket or not
*/
static int ticket_dangerous(struct ticket_config *tk)
{
if (!tk->delay_commit)
return 0;
if (tk->delay_commit <= time(NULL) ||
all_sites_replied(tk)) {
tk_log_debug("ticket delay commit expired");
tk->delay_commit = 0;
return 0;
} else {
tk_log_debug("delay ticket commit for %ds",
(int)(tk->delay_commit - time(NULL)));
}
return 1;
}
/* update the ticket on the leader, write it to the CIB, and
send out the update message to others with the new expiry
time
*/
int leader_update_ticket(struct ticket_config *tk)
{
struct boothc_ticket_msg msg;
int rv = 0;
if (tk->ticket_updated >= 2)
return 0;
if (tk->ticket_updated < 1) {
tk->ticket_updated = 1;
tk->term_expires = time(NULL) + tk->term_duration;
tk_log_debug("broadcasting ticket update");
init_ticket_msg(&msg, OP_UPDATE, RLT_SUCCESS, 0, tk);
rv = transport()->broadcast(&msg, sizeof(msg));
}
if (tk->ticket_updated < 2) {
if (!ticket_dangerous(tk)) {
tk->ticket_updated = 2;
ticket_write(tk);
} else {
/* log just once, on the first retry */
if (tk->retry_number == 1)
tk_log_info("delaying ticket commit to CIB for %ds "
"(or all sites are reached)",
(int)(tk->delay_commit - time(NULL)));
}
}
return rv;
}
static void log_lost_servers(struct ticket_config *tk)
{
struct booth_site *n;
int i;
if (tk->retry_number > 1)
/* log those that we couldn't reach, but do
* that only on the first retry
*/
return;
for (i = 0; i < booth_conf->site_count; i++) {
n = booth_conf->site + i;
if (!(tk->acks_received & n->bitmask)) {
tk_log_warn("%s %s didn't acknowledge our request, "
"will retry %d times",
(n->type == ARBITRATOR ? "arbitrator" : "site"),
site_string(n),
tk->retries);
}
}
}
static void resend_msg(struct ticket_config *tk)
{
struct booth_site *n;
int i;
if (!(tk->acks_received ^ local->bitmask)) {
ticket_broadcast(tk, tk->last_request, tk->acks_expected, RLT_SUCCESS, 0);
} else {
for (i = 0; i < booth_conf->site_count; i++) {
n = booth_conf->site + i;
if (!(tk->acks_received & n->bitmask)) {
tk_log_debug("resending %s to %s",
state_to_string(tk->last_request),
site_string(n)
);
send_msg(tk->last_request, tk, n);
}
}
}
}
static void handle_resends(struct ticket_config *tk)
{
int ack_cnt;
if (++tk->retry_number > tk->retries) {
tk_log_debug("giving up on sending retries");
no_resends(tk);
set_ticket_wakeup(tk);
return;
}
if (!majority_of_bits(tk, tk->acks_received)) {
ack_cnt = count_bits(tk->acks_received) - 1;
if (!ack_cnt) {
tk_log_warn("no answers to our request (try #%d), "
"we are alone",
tk->retry_number);
} else {
tk_log_warn("not enough answers to our request (try #%d): "
"only got %d answers",
tk->retry_number,
ack_cnt);
}
} else {
log_lost_servers(tk);
if (is_owned(tk)) {
/* we have the majority, update the ticket, at
* least the local copy if we're still not
* allowed to commit
*/
leader_update_ticket(tk);
}
}
resend_msg(tk);
ticket_activate_timeout(tk);
}
int postpone_ticket_processing(struct ticket_config *tk)
{
extern time_t start_time;
return tk->start_postpone &&
((time(NULL) - start_time) < tk->timeout);
}
static void ticket_cron(struct ticket_config *tk)
{
time_t now;
now = time(NULL);
/* don't process the tickets too early */
if (postpone_ticket_processing(tk)) {
tk_log_debug("ticket processing postponed (start_postpone=%d)",
tk->start_postpone);
/* but run again soon */
ticket_activate_timeout(tk);
return;
}
if (tk->acks_expected == OP_MY_INDEX) {
no_resends(tk);
}
/* wanting to be follower is not much of an ambition */
if (tk->next_state && tk->next_state != ST_FOLLOWER) {
switch(tk->next_state) {
case ST_LEADER:
if (tk->state == ST_LEADER) {
new_round(tk, OR_SPLIT);
} else {
reacquire_ticket(tk);
}
break;
case ST_INIT:
no_resends(tk);
start_revoke_ticket(tk);
break;
default:
break;
}
tk->start_postpone = 0;
goto out;
}
/* Has an owner, has an expiry date, and expiry date in the past?
* Losing the ticket must happen in _every_ state. */
if (tk->term_expires &&
is_owned(tk) &&
now >= tk->term_expires) {
if (tk->leader != local) {
tk_log_warn("lost at %s", site_string(tk->leader));
} else {
tk_log_warn("lost majority (revoking locally)");
}
tk->lost_leader = tk->leader;
/* Couldn't renew in time - ticket lost. */
new_round(tk, OR_TKT_LOST);
goto out;
}
switch(tk->state) {
case ST_INIT:
/* init state, handle resends for ticket revoke */
if (tk->acks_expected) {
handle_resends(tk);
}
break;
case ST_FOLLOWER:
/* nothing here either, ticket loss is caught earlier
* */
break;
case ST_CANDIDATE:
/* §5.2 */
elections_end(tk);
break;
case ST_LEADER:
/* timeout or ticket renewal? */
if (tk->acks_expected) {
handle_resends(tk);
} else {
/* this is ticket renewal, run local test */
if (!test_external_prog(tk, 1)) {
ticket_broadcast(tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0);
ticket_activate_timeout(tk);
}
}
break;
default:
break;
}
out:
tk->next_state = 0;
if (tk->update_cib)
ticket_write(tk);
}
void process_tickets(void)
{
struct ticket_config *tk;
int i;
struct timeval now, last_cron;
float sec_until;
gettimeofday(&now, NULL);
foreach_ticket(i, tk) {
sec_until = timeval_to_float(tk->next_cron) - timeval_to_float(now);
if (0)
tk_log_debug("next cron %" PRIx64 ".%03d, "
"now %" PRIx64 "%03d, in %f",
(uint64_t)tk->next_cron.tv_sec, timeval_msec(tk->next_cron),
(uint64_t)now.tv_sec, timeval_msec(now),
sec_until);
if (sec_until > 0.0)
continue;
tk_log_debug("ticket cron");
last_cron = tk->next_cron;
ticket_cron(tk);
if (!timercmp(&last_cron, &tk->next_cron, !=)) {
tk_log_debug("nobody set ticket wakeup");
set_ticket_wakeup(tk);
}
}
}
void tickets_log_info(void)
{
struct ticket_config *tk;
int i;
foreach_ticket(i, tk) {
tk_log_info("state '%s' "
"term %d "
"commit index %d "
"leader %s "
"expires %-24.24s",
state_to_string(tk->state),
tk->current_term,
tk->commit_index,
ticket_leader_string(tk),
ctime(&tk->term_expires));
}
}
static void update_acks(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t cmd;
cmd = ntohl(msg->header.cmd);
if (tk->acks_expected != cmd &&
tk->acks_expected != OP_REJECTED)
return;
/* got an ack! */
tk->acks_received |= sender->bitmask;
if (cmd == OP_HEARTBEAT)
tk_log_debug("got ACK from %s, %d/%d agree.",
site_string(sender),
count_bits(tk->acks_received),
booth_conf->site_count);
if (tk->delay_commit && all_sites_replied(tk)) {
tk->delay_commit = 0;
}
if (all_replied(tk)) {
no_resends(tk);
tk->start_postpone = 0;
set_ticket_wakeup(tk);
}
}
/* UDP message receiver. */
int message_recv(struct boothc_ticket_msg *msg, int msglen)
{
uint32_t from;
struct booth_site *source;
struct ticket_config *tk;
struct booth_site *leader;
uint32_t leader_u;
if (check_boothc_header(&msg->header, sizeof(*msg)) < 0 ||
msglen != sizeof(*msg)) {
log_error("message receive error");
return -1;
}
from = ntohl(msg->header.from);
if (!find_site_by_id(from, &source) || !source) {
log_error("unknown sender: %08x", from);
return -1;
}
if (!check_ticket(msg->ticket.id, &tk)) {
log_warn("got invalid ticket name %s from %s",
msg->ticket.id, site_string(source));
return -EINVAL;
}
leader_u = ntohl(msg->ticket.leader);
if (!find_site_by_id(leader_u, &leader)) {
tk_log_error("message with unknown leader %u received", leader_u);
return -EINVAL;
}
update_acks(tk, source, leader, msg);
return raft_answer(tk, source, leader, msg);
}
void set_ticket_wakeup(struct ticket_config *tk)
{
struct timeval tv, now;
/* At least every hour, perhaps sooner. */
ticket_next_cron_in(tk, 3600);
gettimeofday(&now, NULL);
switch (tk->state) {
case ST_LEADER:
assert(tk->leader == local);
tv = now;
tv.tv_sec = next_vote_starts_at(tk);
/* If timestamp is in the past, wakeup at the expiry
* time. */
if (timeval_compare(tv, now) <= 0) {
tk_log_debug("next ts in the past (%f)",
timeval_to_float(tv) - timeval_to_float(now));
tv.tv_sec = tk->term_expires;
}
ticket_next_cron_at(tk, tv);
break;
case ST_CANDIDATE:
assert(tk->election_end);
ticket_next_cron_at_coarse(tk, tk->election_end);
break;
case ST_INIT:
case ST_FOLLOWER:
/* If there is (or should be) some owner, check on it later on.
* If no one is interested - don't care. */
if (is_owned(tk) &&
(local->type == SITE))
ticket_next_cron_at_coarse(tk,
tk->term_expires + tk->acquire_after);
break;
default:
tk_log_error("unknown ticket state: %d", tk->state);
}
if (tk->next_state) {
/* we need to do something soon here */
ticket_activate_timeout(tk);
}
if (ANYDEBUG) {
float sec_until;
gettimeofday(&now, NULL);
sec_until = timeval_to_float(tk->next_cron) - timeval_to_float(now);
tk_log_debug("set ticket wakeup in %f", sec_until);
}
}
/* Given a state (in host byte order), return a human-readable (char*).
* An array is used so that multiple states can be printed in a single printf(). */
char *state_to_string(uint32_t state_ho)
{
union mu { cmd_request_t s; char c[5]; };
static union mu cache[6] = { { 0 } }, *cur;
static int current = 0;
current ++;
if (current >= sizeof(cache)/sizeof(cache[0]))
current = 0;
cur = cache + current;
cur->s = htonl(state_ho);
/* Shouldn't be necessary, union array is initialized with zeroes, and
* these bytes never get written. */
cur->c[4] = 0;
return cur->c;
}
int send_reject(struct booth_site *dest, struct ticket_config *tk, cmd_result_t code)
{
struct boothc_ticket_msg msg;
init_ticket_msg(&msg, OP_REJECTED, code, 0, tk);
return booth_udp_send(dest, &msg, sizeof(msg));
}
int send_msg (
int cmd,
- struct ticket_config *tk,
+ struct ticket_config *current_tk,
struct booth_site *dest
)
{
+ struct ticket_config *tk = current_tk;
struct boothc_ticket_msg msg;
if (cmd == OP_MY_INDEX) {
+ if (current_tk->state == ST_CANDIDATE &&
+ current_tk->last_valid_tk->current_term) {
+ tk = current_tk->last_valid_tk;
+ }
tk_log_info("sending status to %s",
site_string(dest));
}
init_ticket_msg(&msg, cmd, RLT_SUCCESS, 0, tk);
return booth_udp_send(dest, &msg, sizeof(msg));
}

File Metadata

Mime Type
text/x-diff
Expires
Tue, Feb 25, 4:02 AM (1 d, 9 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1464665
Default Alt Text
(75 KB)

Event Timeline