Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F3152457
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
37 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/src/config.h b/src/config.h
index a66a988..2d659ff 100644
--- a/src/config.h
+++ b/src/config.h
@@ -1,176 +1,179 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _CONFIG_H
#define _CONFIG_H
#include <stdint.h>
#include "booth.h"
#include "raft.h"
#include "transport.h"
/** @{ */
/** Definitions for in-RAM data. */
#define MAX_NODES 16
#define TICKET_ALLOC 16
struct ticket_config {
/** \name Configuration items.
* @{ */
/** Name of ticket. */
boothc_ticket name;
/** How many seconds a term lasts (if not refreshed). */
int term_duration;
/** Network related timeouts. */
int timeout;
/** Retries before giving up. */
int retries;
/** If >0, time to wait for a site to get fenced.
* The ticket may be acquired after that timespan by
* another site. */
int acquire_after; /* TODO: needed? */
/* Program to ask whether it makes sense to
* acquire the ticket */
char *ext_verifier;
/** Node weights. */
int weight[MAX_NODES];
/** @} */
/** \name Runtime values.
* @{ */
/** Current state. */
server_state_e state;
/** When something has to be done */
struct timeval next_cron;
/** Current leader. This is effectively the log[] in Raft. */
struct booth_site *leader;
/** Timestamp of leadership expiration */
time_t term_expires;
/** End of election period */
time_t election_end;
struct booth_site *voted_for;
/** Who the various sites vote for.
* NO_OWNER = no vote yet. */
struct booth_site *votes_for[MAX_NODES];
/* bitmap */
uint64_t votes_received;
/** Last voting round that was seen. */
uint32_t current_term;
/** @} */
/** */
uint32_t commit_index;
/** */
uint32_t last_applied;
uint32_t next_index[MAX_NODES];
uint32_t match_index[MAX_NODES];
+ uint64_t hb_received;
+ time_t hb_sent_at;
+
/** \name Needed while proposals are being done.
* @{ */
/** Whom to vote for the next time.
* Needed to push a ticket to someone else. */
#if 0
/** Bitmap of sites that acknowledge that state. */
uint64_t proposal_acknowledges;
/** When an incompletely acknowledged proposal gets done.
* If all peers agree, that happens sooner.
* See switch_state_to(). */
struct timeval proposal_switch;
/** Timestamp of proposal expiration. */
time_t proposal_expires;
#endif
/** Number of send retries left.
* Used on the new owner.
* Starts at 0, counts up. */
int retry_number;
/** @} */
};
struct booth_config {
char name[BOOTH_NAME_LEN];
transport_layer_t proto;
uint16_t port;
/** Stores the OR of the individual host bitmasks. */
uint64_t site_bits;
char site_user[BOOTH_NAME_LEN];
char site_group[BOOTH_NAME_LEN];
char arb_user[BOOTH_NAME_LEN];
char arb_group[BOOTH_NAME_LEN];
uid_t uid;
gid_t gid;
int site_count;
struct booth_site site[MAX_NODES];
int ticket_count;
int ticket_allocated;
struct ticket_config *ticket;
};
extern struct booth_config *booth_conf;
int read_config(const char *path);
int check_config(int type);
int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type);
int find_site_by_id(uint32_t site_id, struct booth_site **node);
const char *type_to_string(int type);
#include <stdio.h>
#define R(tk_) printf("## %12s:%3d state %s, term %d, index %d, leader %s\n", __FILE__, __LINE__, state_to_string(tk_->state), tk_->current_term, tk_->commit_index, site_string(tk_->leader))
#endif /* _CONFIG_H */
diff --git a/src/inline-fn.h b/src/inline-fn.h
index f28020c..2944e63 100644
--- a/src/inline-fn.h
+++ b/src/inline-fn.h
@@ -1,288 +1,305 @@
/*
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _INLINE_FN_H
#define _INLINE_FN_H
#include <time.h>
#include <sys/time.h>
#include <assert.h>
#include <string.h>
#include "config.h"
#include "ticket.h"
#include "transport.h"
inline static uint32_t get_local_id(void)
{
return local ? local->site_id : -1;
}
inline static uint32_t get_node_id(struct booth_site *node)
{
return node ? node->site_id : NO_ONE;
}
inline static int term_valid_for(const struct ticket_config *tk)
{
int left;
left = tk->term_expires - time(NULL);
return (left < 0) ? 0 : left;
}
/** Returns number of seconds left, if any. */
inline static int leader_and_valid(const struct ticket_config *tk)
{
if (tk->leader != local)
return 0;
return term_valid_for(tk);
}
static inline void init_header_bare(struct boothc_header *h) {
h->magic = htonl(BOOTHC_MAGIC);
h->version = htonl(BOOTHC_VERSION);
h->from = htonl(local->site_id);
h->iv = htonl(0);
h->auth1 = htonl(0);
h->auth2 = htonl(0);
}
static inline void init_header(struct boothc_header *h, int cmd,
int result, int data_len)
{
init_header_bare(h);
h->length = htonl(data_len);
h->cmd = htonl(cmd);
h->result = htonl(result);
}
static inline void init_ticket_site_header(struct boothc_ticket_msg *msg, int cmd)
{
init_header(&msg->header, cmd, 0, sizeof(*msg));
}
static inline void init_ticket_msg(struct boothc_ticket_msg *msg,
int cmd, int rv,
struct ticket_config *tk)
{
assert(sizeof(msg->ticket.id) == sizeof(tk->name));
init_header(&msg->header, cmd, rv, sizeof(*msg));
if (!tk) {
memset(&msg->ticket, 0, sizeof(msg->ticket));
} else {
memcpy(msg->ticket.id, tk->name, sizeof(msg->ticket.id));
msg->ticket.leader = htonl(get_node_id(tk->leader ?: tk->voted_for));
msg->ticket.term = htonl(tk->current_term);
msg->ticket.term_valid_for = htonl(term_valid_for(tk));
msg->ticket.prev_log_index = htonl(tk->last_applied);
msg->ticket.leader_commit = htonl(tk->commit_index);
}
}
static inline struct booth_transport const *transport(void)
{
return booth_transport + booth_conf->proto;
}
static inline const char *site_string(struct booth_site *site)
{
return site ? site->addr_string : "NONE";
}
static inline const char *ticket_leader_string(struct ticket_config *tk)
{
return site_string(tk->leader);
}
static inline void disown_ticket(struct ticket_config *tk)
{
tk->leader = NULL;
time(&tk->term_expires);
}
static inline int disown_if_expired(struct ticket_config *tk)
{
if (time(NULL) >= tk->term_expires ||
!tk->leader) {
disown_ticket(tk);
return 1;
}
return 0;
}
/* We allow half of the uint32_t to be used;
* half of that below, half of that above the current known "good" value.
* 0 UINT32_MAX
* |--------------------------+----------------+------------|
* | | |
* |--------+-------| allowed range
* |
* current commit index
*
* So, on overflow it looks like that:
* UINT32_MAX 0
* |--------------------------+-----------||---+------------|
* | | |
* |--------+-------| allowed range
* |
* current commit index
*
* This should be possible by using the same datatype and relying
* on the under/overflow semantics.
*
*
* Having 30 bits available, and assuming an expire time of
* one minute and a (high) commit index step of 64 == 2^6 (because
* of weights), we get 2^24 minutes of range - which is ~750
* years. "Should be enough for everybody."
*/
static inline int index_is_higher_than(uint32_t c_high, uint32_t c_low)
{
uint32_t diff;
if (c_high == c_low)
return 0;
diff = c_high - c_low;
if (diff < UINT32_MAX/4)
return 1;
diff = c_low - c_high;
if (diff < UINT32_MAX/4)
return 0;
assert(!"commit index out of range - invalid");
}
static inline uint32_t index_max2(uint32_t a, uint32_t b)
{
return index_is_higher_than(a, b) ? a : b;
}
static inline uint32_t index_max3(uint32_t a, uint32_t b, uint32_t c)
{
return index_max2( index_max2(a, b), c);
}
static inline double timeval_to_float(struct timeval tv)
{
return tv.tv_sec + tv.tv_usec*(double)1.0e-6;
}
static inline int timeval_msec(struct timeval tv)
{
int m;
m = tv.tv_usec / 1000;
if (m >= 1000)
m = 999;
return m;
}
static inline int timeval_compare(struct timeval tv1, struct timeval tv2)
{
if (tv1.tv_sec < tv2.tv_sec)
return -1;
if (tv1.tv_sec > tv2.tv_sec)
return +1;
if (tv1.tv_usec < tv2.tv_usec)
return -1;
if (tv1.tv_usec > tv2.tv_usec)
return +1;
return 0;
}
static inline int timeval_in_past(struct timeval which)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return timeval_compare(tv, which) > 0;
}
static inline time_t next_vote_starts_at(struct ticket_config *tk)
{
time_t half_exp, retries_needed;
/* If not owner, don't renew. */
if (tk->leader != local)
return 0;
/* Try to renew at half of expiry time. */
half_exp = tk->term_expires - tk->term_duration/2;
/* Also start renewal if we couldn't get
* a few message retransmission in the alloted
* expiry time. */
retries_needed = tk->term_expires - tk->timeout * tk->retries/2;
/* Return earlier timestamp. */
return half_exp < retries_needed
? half_exp
: retries_needed;
}
static inline int should_start_renewal(struct ticket_config *tk)
{
time_t now, when;
when = next_vote_starts_at(tk);
if (!when)
return 0;
time(&now);
return when <= now;
}
static inline int send_heartbeat(struct ticket_config *tk)
{
+ tk->hb_received = local->bitmask;
+ tk->hb_sent_at = time(NULL);
+
return ticket_broadcast(tk, OP_HEARTBEAT, RLT_SUCCESS);
}
static inline struct booth_site *my_vote(struct ticket_config *tk)
{
return tk->votes_for[ local->index ];
}
+static inline int count_bits(uint64_t val) {
+ return __builtin_popcount(val);
+}
+
+static inline int majority_of_bits(struct ticket_config *tk, uint64_t val)
+{
+ /* Use ">" to get majority decision, even for an even number
+ * of participants. */
+ return count_bits(val) * 2 >
+ booth_conf->site_count;
+}
+
+
+
#endif
diff --git a/src/raft.c b/src/raft.c
index 51aaf70..fac3fb1 100644
--- a/src/raft.c
+++ b/src/raft.c
@@ -1,405 +1,472 @@
/*
* Copyright (C) 2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <inttypes.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include "booth.h"
#include "transport.h"
#include "inline-fn.h"
#include "config.h"
#include "raft.h"
#include "ticket.h"
#include "log.h"
inline static void clear_election(struct ticket_config *tk)
{
int i;
struct booth_site *site;
log_info("clear election");
tk->votes_received = 0;
foreach_node(i, site)
tk->votes_for[site->index] = NULL;
}
inline static void site_voted_for(struct ticket_config *tk,
struct booth_site *who,
struct booth_site *vote)
{
log_info("site \"%s\" votes for \"%s\"",
who->addr_string,
vote->addr_string);
if (!tk->votes_for[who->index]) {
tk->votes_for[who->index] = vote;
tk->votes_received |= who->bitmask;
} else {
if (tk->votes_for[who->index] != vote)
log_error("voted previously (but in same term!) for \"%s\"...",
tk->votes_for[who->index]->addr_string);
}
}
static void become_follower(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
uint32_t i;
int duration;
tk->state = ST_FOLLOWER;
duration = tk->term_duration;
if (msg)
duration = min(duration, ntohl(msg->ticket.term_valid_for));
tk->term_expires = time(NULL) + duration;
if (msg) {
i = ntohl(msg->ticket.term);
tk->current_term = max(i, tk->current_term);
/* § 5.3 */
i = ntohl(msg->ticket.leader_commit);
tk->commit_index = max(i, tk->commit_index);
}
ticket_write(tk);
}
static struct booth_site *majority_votes(struct ticket_config *tk)
{
int i, n;
struct booth_site *v;
int count[MAX_NODES] = { 0, };
for(i=0; i<booth_conf->site_count; i++) {
v = tk->votes_for[i];
if (!v)
continue;
n = v->index;
count[n]++;
log_info("Majority: %d \"%s\" wants %d \"%s\" => %d",
i, booth_conf->site[i].addr_string,
n, v->addr_string,
count[n]);
if (count[n]*2 <= booth_conf->site_count)
continue;
log_info("Majority reached: %d of %d for \"%s\"",
count[n], booth_conf->site_count,
v->addr_string);
return v;
}
return NULL;
}
+
+
+/* For follower. */
static int answer_HEARTBEAT (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
+ struct boothc_ticket_msg omsg;
+
+
term = ntohl(msg->ticket.term);
log_debug("leader: %s, have %s; term %d vs %d",
site_string(leader), ticket_leader_string(tk),
term, tk->current_term);
if (term < tk->current_term)
return 0; //send_reject(sender, tk, RLT_TERM_OUTDATED);
become_follower(tk, msg);
assert(sender == leader);
tk->leader = leader;
- return 0;
+
+ /* Yeth, mathter. */
+ init_ticket_msg(&omsg, OP_HEARTBEAT, RLT_SUCCESS, tk);
+ return booth_udp_send(sender, &omsg, sizeof(omsg));
+}
+
+
+/* For leader. */
+static int process_HEARTBEAT(
+ struct ticket_config *tk,
+ struct booth_site *sender,
+ struct booth_site *leader,
+ struct boothc_ticket_msg *msg
+ )
+{
+ uint32_t term;
+
+ term = ntohl(msg->ticket.term);
+ if (term == tk->current_term &&
+ leader == tk->leader) {
+ /* Hooray, an ACK! */
+ log_debug("Got heartbeat ACK from \"%s\".",
+ site_string(sender));
+
+ /* So at least _someone_ is listening. */
+ tk->hb_received |= sender->bitmask;
+
+ if (majority_of_bits(tk, tk->hb_received)) {
+ /* OK, at least half of the nodes are reachable;
+ * no need to do anything until
+ * the next heartbeat should be sent. */
+ set_ticket_wakeup(tk);
+ tk->retry_number = 0;
+ } else {
+ /* Not enough answers yet;
+ * wait until timeout expires. */
+ ticket_activate_timeout(tk);
+ }
+ return 0;
+ }
+
+ if (term < tk->current_term) {
+ /* Doesn't know what he's talking about - perhaps
+ * doesn't receive our packets? */
+ log_error("Stale/wrong heartbeat from \"%s\": "
+ "term %d instead of %d",
+ site_string(sender),
+ term, tk->current_term);
+ return 0;
+ }
+
+ /* Uh oh. Higher term?? Should we simply believe that? */
+ /* TODO */
+ log_error("Got higher term number from");
+ assert(0);
}
static int process_VOTE_FOR(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
struct booth_site *new_leader;
term = ntohl(msg->ticket.term);
if (term < tk->current_term)
return send_reject(sender, tk, RLT_TERM_OUTDATED);
if (term == tk->current_term &&
tk->election_end < time(NULL)) {
/* Election already ended - either by time or majority.
* Ignore. */
return 0;
}
if (term > tk->current_term)
clear_election(tk);
site_voted_for(tk, sender, leader);
/* §5.2 */
new_leader = majority_votes(tk);
if (new_leader) {
tk->leader = new_leader;
tk->term_expires = time(NULL) + tk->term_duration;
tk->election_end = 0;
tk->voted_for = NULL;
if ( new_leader == local) {
tk->commit_index++; // ??
tk->state = ST_LEADER;
send_heartbeat(tk);
ticket_write(tk);
}
else
become_follower(tk, NULL);
}
set_ticket_wakeup(tk);
return 0;
}
static int process_REJECTED(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t rv;
rv = ntohl(msg->header.result);
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_OUTDATED) {
log_info("Am out of date, become follower.");
tk->leader = leader;
become_follower(tk, msg);
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_STILL_VALID) {
log_error("There's a leader that I don't see: \"%s\"",
site_string(leader));
tk->leader = leader;
become_follower(tk, msg);
return 0;
}
log_error("unhandled reject: in state %s, got %s.",
state_to_string(tk->state),
state_to_string(rv));
tk->leader = leader;
become_follower(tk, msg);
return 0;
}
/* §5.2 */
static int answer_REQ_VOTE(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
int valid;
struct boothc_ticket_msg omsg;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (term < tk->current_term)
{
log_info("sending REJECT, term too low.");
return send_reject(sender, tk, RLT_TERM_OUTDATED);
}
/* This if() would trigger more or less always, as
* OP_REQ_VOTE *starts* an election.
* if (tk->election_end < time(NULL))
*/
/* If the received term was _higher_ than the locally
* known one, we've already converted to ST_FOLLOWER.
* So the term is equal now. */
/* Important: Ignore duplicated packets! */
valid = term_valid_for(tk);
if (valid &&
term == tk->current_term &&
sender == tk->leader) {
log_debug("Duplicate OP_VOTE_FOR ignored.");
return 0;
}
if (valid) {
log_debug("no election allowed, term valid for %d??", valid);
return send_reject(sender, tk, RLT_TERM_STILL_VALID);
}
/* §5.2, §5.4 */
if (!tk->voted_for &&
ntohl(msg->ticket.last_log_index) >= tk->last_applied) {
tk->voted_for = sender;
site_voted_for(tk, sender, leader);
goto yes_you_can;
}
yes_you_can:
init_ticket_msg(&omsg, OP_VOTE_FOR, RLT_SUCCESS, tk);
omsg.ticket.leader = htonl(get_node_id(tk->voted_for));
return transport()->broadcast(&omsg, sizeof(omsg));
}
int new_election(struct ticket_config *tk, struct booth_site *preference)
{
struct booth_site *new_leader;
time_t now;
time(&now);
log_debug("start new election?, now=%" PRIi64 ", end %" PRIi64,
now, tk->election_end);
if (now <= tk->election_end)
return 0;
/* §5.2 */
tk->current_term++;
tk->term_expires = 0;
tk->election_end = now + tk->term_duration;
log_debug("start new election! term=%d, until %" PRIi64,
tk->current_term, tk->election_end);
clear_election(tk);
if(preference)
new_leader = preference;
else
new_leader = (local->type == SITE) ? local : NULL;
site_voted_for(tk, local, new_leader);
tk->voted_for = new_leader;
tk->state = ST_CANDIDATE;
ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS);
return 0;
}
int raft_answer(
struct ticket_config *tk,
struct booth_site *from,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int cmd;
uint32_t term;
int rv;
cmd = ntohl(msg->header.cmd);
term = ntohl(msg->ticket.term);
R(tk);
log_debug("got message %s from \"%s\", term %d vs. %d",
state_to_string(cmd),
from->addr_string,
term, tk->current_term);
if (cmd == OP_REJECTED) {
R(tk);
rv = process_REJECTED(tk, from, leader, msg);
R(tk);
return (rv);
}
/* §5.1 */
if (term > tk->current_term) {
tk->state = ST_FOLLOWER;
tk->current_term = term;
tk->leader = leader;
log_info("higher term %d vs. %d, following \"%s\"",
term, tk->current_term,
ticket_leader_string(tk));
/* TODO: note that we've already switched state?
* Or make that test in every single function? */
}
R(tk);
switch (cmd) {
case OP_REQ_VOTE:
- rv = answer_REQ_VOTE (tk, from, leader, msg);
+ rv = answer_REQ_VOTE(tk, from, leader, msg);
break;
case OP_VOTE_FOR:
rv = process_VOTE_FOR(tk, from, leader, msg);
break;
case OP_HEARTBEAT:
- rv = answer_HEARTBEAT(tk, from, leader, msg);
+ if (tk->leader == local &&
+ tk->state == ST_LEADER)
+ rv = process_HEARTBEAT(tk, from, leader, msg);
+ else if (tk->leader != local &&
+ tk->state == ST_FOLLOWER)
+ rv = answer_HEARTBEAT(tk, from, leader, msg);
+ else
+ assert("invalid combination - leader, follower");
break;
case OP_REJECTED:
assert(!"here");
break;
default:
log_error("unprocessed message, cmd %x", cmd);
rv = -EINVAL;
}
R(tk);
return rv;
}
diff --git a/src/ticket.c b/src/ticket.c
index e596183..e951566 100644
--- a/src/ticket.c
+++ b/src/ticket.c
@@ -1,668 +1,680 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <inttypes.h>
#include <stdio.h>
#include <assert.h>
#include <time.h>
#include "ticket.h"
#include "config.h"
#include "pacemaker.h"
#include "inline-fn.h"
#include "log.h"
#include "booth.h"
#include "raft.h"
#include "handler.h"
#define TK_LINE 256
/* Untrusted input, must fit (incl. \0) in a buffer of max chars. */
int check_max_len_valid(const char *s, int max)
{
int i;
for(i=0; i<max; i++)
if (s[i] == 0)
return 1;
return 0;
}
int find_ticket_by_name(const char *ticket, struct ticket_config **found)
{
int i;
if (found)
*found = NULL;
for (i = 0; i < booth_conf->ticket_count; i++) {
if (!strcmp(booth_conf->ticket[i].name, ticket)) {
if (found)
*found = booth_conf->ticket + i;
return 1;
}
}
return 0;
}
int check_ticket(char *ticket, struct ticket_config **found)
{
if (found)
*found = NULL;
if (!booth_conf)
return 0;
if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name)))
return 0;
return find_ticket_by_name(ticket, found);
}
int check_site(char *site, int *is_local)
{
struct booth_site *node;
if (!check_max_len_valid(site, sizeof(node->addr_string)))
return 0;
if (find_site_by_name(site, &node, 0)) {
*is_local = node->local;
return 1;
}
return 0;
}
#if 0
/** Find out what others think about this ticket.
*
* If we're a SITE, we can ask (and have to tell) Pacemaker.
* An ARBITRATOR can only ask others. */
static int ticket_send_catchup(struct ticket_config *tk)
{
int i, rv = 0;
struct booth_site *site;
struct boothc_ticket_msg msg;
foreach_node(i, site) {
if (!site->local) {
init_ticket_msg(&msg, CMD_CATCHUP, RLT_SUCCESS, tk);
log_debug("attempting catchup from %s", site->addr_string);
rv = booth_udp_send(site, &msg, sizeof(msg));
}
}
ticket_activate_timeout(tk);
return rv;
}
#endif
int ticket_write(struct ticket_config *tk)
{
if (local->type != SITE)
return -EINVAL;
disown_if_expired(tk);
if (tk->leader == local) {
pcmk_handler.grant_ticket(tk);
} else {
pcmk_handler.revoke_ticket(tk);
}
return 0;
}
/* Ask an external program whether getting the ticket
* makes sense.
* Eg. if the services have a failcount of INFINITY,
* we can't serve here anyway. */
int get_ticket_locally_if_allowed(struct ticket_config *tk)
{
int rv;
if (!tk->ext_verifier)
goto get_it;
rv = run_handler(tk, tk->ext_verifier, 1);
if (rv) {
log_error("May not acquire ticket.");
/* Give it to somebody else.
* Just send a commit message, as the
* others couldn't help anyway. */
if (leader_and_valid(tk)) {
disown_ticket(tk);
#if 0
tk->proposed_owner = NULL;
/* Just go one further - others may easily override. */
tk->new_ballot++;
ticket_broadcast_proposed_state(tk, OP_COMMITTED);
tk->state = ST_STABLE;
#endif
ticket_broadcast(tk, OP_VOTE_FOR, RLT_SUCCESS);
}
return rv;
} else {
log_info("May keep ticket.");
}
get_it:
if (leader_and_valid(tk)) {
return send_heartbeat(tk);
}
else {
new_election(tk, local);
return ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS);
}
#if 0
return paxos_start_round(tk, local);
#endif
}
/** Try to get the ticket for the local site.
* */
int do_grant_ticket(struct ticket_config *tk)
{
int rv;
if (tk->leader == local)
return RLT_SUCCESS;
if (tk->leader)
return RLT_OVERGRANT;
rv = get_ticket_locally_if_allowed(tk);
return rv;
}
/** Start a PAXOS round for revoking.
* That can be started from any site. */
int do_revoke_ticket(struct ticket_config *tk)
{
int rv;
if (!tk->leader)
return RLT_SUCCESS;
disown_ticket(tk);
ticket_write(tk);
return ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS);
#if 0
rv = paxos_start_round(tk, NULL);
#endif
return rv;
}
int list_ticket(char **pdata, unsigned int *len)
{
struct ticket_config *tk;
char timeout_str[64];
char *data, *cp;
int i, alloc;
*pdata = NULL;
*len = 0;
alloc = 256 +
booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128);
data = malloc(alloc);
if (!data)
return -ENOMEM;
cp = data;
foreach_ticket(i, tk) {
if (tk->term_expires != 0)
strftime(timeout_str, sizeof(timeout_str), "%F %T",
localtime(&tk->term_expires));
else
strcpy(timeout_str, "INF");
cp += sprintf(cp,
"ticket: %s, leader: %s, expires: %s, commit: %d\n",
tk->name,
ticket_leader_string(tk),
timeout_str,
tk->commit_index);
*len = cp - data;
assert(*len < alloc);
}
*pdata = data;
return 0;
}
int setup_ticket(void)
{
struct ticket_config *tk;
int i;
/* TODO */
foreach_ticket(i, tk) {
tk->leader = NULL;
tk->term_expires = 0;
// abort_proposal(tk);
if (local->type == SITE) {
pcmk_handler.load_ticket(tk);
}
/* There might be a leader; wait for its notification. */
tk->term_expires = time(NULL) + tk->term_duration;
tk->state = ST_FOLLOWER;
}
return 0;
}
int ticket_answer_list(int fd, struct boothc_ticket_msg *msg)
{
char *data;
int olen, rv;
struct boothc_header hdr;
rv = list_ticket(&data, &olen);
if (rv < 0)
return rv;
init_header(&hdr, CMR_LIST, RLT_SUCCESS, sizeof(hdr) + olen);
return send_header_plus(fd, &hdr, data, olen);
}
int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg)
{
int rv;
struct ticket_config *tk;
if (!check_ticket(msg->ticket.id, &tk)) {
log_error("Client asked to grant unknown ticket");
rv = RLT_INVALID_ARG;
goto reply;
}
if (tk->leader) {
log_error("client wants to get an (already granted!) ticket \"%s\"",
msg->ticket.id);
rv = RLT_OVERGRANT;
goto reply;
}
rv = do_grant_ticket(tk);
reply:
init_header(&msg->header, CMR_GRANT, rv ?: RLT_ASYNC, sizeof(*msg));
return send_ticket_msg(fd, msg);
}
int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg)
{
int rv;
struct ticket_config *tk;
if (!check_ticket(msg->ticket.id, &tk)) {
log_error("Client asked to grant unknown ticket");
rv = RLT_INVALID_ARG;
goto reply;
}
if (!tk->leader) {
log_info("client wants to revoke a free ticket \"%s\"",
msg->ticket.id);
/* Return a different result code? */
rv = RLT_SUCCESS;
goto reply;
}
rv = do_revoke_ticket(tk);
if (rv == 0)
rv = RLT_ASYNC;
reply:
init_ticket_msg(msg, CMR_REVOKE, rv, tk);
return send_ticket_msg(fd, msg);
}
int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_result_t res)
{
struct boothc_ticket_msg msg;
init_ticket_msg(&msg, cmd, res, tk);
log_debug("broadcasting '%s' for ticket \"%s\"",
state_to_string(cmd), tk->name);
return transport()->broadcast(&msg, sizeof(msg));
}
#if 0
/** Send new state request to all sites.
* Perhaps this should take a flag for ACCEPTOR etc.?
* No need currently, as all nodes are more or less identical. */
int ticket_broadcast_proposed_state(struct ticket_config *tk, cmd_request_t state)
{
struct boothc_ticket_msg msg;
tk->state = state;
init_ticket_msg(&msg, state, RLT_SUCCESS, tk);
msg.ticket.leader = htonl(get_node_id(tk->proposed_owner));
log_debug("broadcasting '%s' for ticket \"%s\"",
state_to_string(state), tk->name);
/* Switch state after one second, if the majority says ok. */
gettimeofday(&tk->proposal_switch, NULL);
tk->proposal_switch.tv_sec++;
return transport()->broadcast(&msg, sizeof(msg));
}
#endif
static void ticket_cron(struct ticket_config *tk)
{
time_t now;
now = time(NULL);
R(tk);
/* Has an owner, has an expiry date, and expiry date in the past?
* Losing the ticket must happen in _every_ state. */
if (tk->term_expires &&
tk->leader &&
now > tk->term_expires) {
log_info("LOST ticket: \"%s\" no longer at %s",
tk->name,
ticket_leader_string(tk));
/* Couldn't renew in time - ticket lost. */
disown_ticket(tk);
/* New vote round; §5.2 */
if (local->type == SITE)
new_election(tk, NULL);
/* should be "always" that way
else
tk->state = ST_FOLLOWER;
*/
// abort_proposal(tk); TODO
ticket_write(tk);
ticket_activate_timeout(tk);
/* May not try to re-acquire now, need to find out
* what others think. */
return;
}
R(tk);
switch(tk->state) {
case ST_INIT:
/* Unknown state, ask others. */
// ticket_send_catchup(tk);
break;
case ST_FOLLOWER:
if (tk->term_expires &&
now > tk->term_expires) {
new_election(tk, NULL);
}
break;
case ST_CANDIDATE:
/* §5.2 */
if (now > tk->election_end)
new_election(tk, NULL);
break;
case ST_LEADER:
+ if (tk->hb_sent_at + tk->timeout > now) {
+ /* Heartbeat timeout reached. Oops ... */
+ tk->retry_number ++;
+ log_error("Not enough answers to heartbeat on try #%d: "
+ "only got %d answers (mask 0x%" PRIx64 ")!",
+ tk->retry_number,
+ count_bits(tk->hb_received),
+ tk->hb_received);
+
+ /* Don't give up, though - there's still some time until leadership is lost. */
+ }
tk->term_expires = now + tk->term_duration;
send_heartbeat(tk);
ticket_write(tk);
- set_ticket_wakeup(tk);
+ ticket_activate_timeout(tk);
+ // set_ticket_wakeup(tk);
break;
default:
break;
}
R(tk);
}
void process_tickets(void)
{
struct ticket_config *tk;
int i;
struct timeval now;
float sec_until;
gettimeofday(&now, NULL);
foreach_ticket(i, tk) {
sec_until = timeval_to_float(tk->next_cron) - timeval_to_float(now);
if (0)
log_debug("ticket %s next cron %" PRIx64 ".%03d, "
"now %" PRIx64 "%03d, in %f",
tk->name,
(uint64_t)tk->next_cron.tv_sec, timeval_msec(tk->next_cron),
(uint64_t)now.tv_sec, timeval_msec(now),
sec_until);
if (sec_until > 0.0)
continue;
log_debug("ticket cron: doing %s", tk->name);
/* Set next value, handler may override.
* This should already be handled via the state logic;
* but to be on the safe side the renew repetition is
* duplicated here, too. */
set_ticket_wakeup(tk);
ticket_cron(tk);
}
}
void tickets_log_info(void)
{
struct ticket_config *tk;
int i;
foreach_ticket(i, tk) {
log_info("Ticket %s: state '%s' "
"commit index %d "
"leader \"%s\" "
"expires %-24.24s",
tk->name,
state_to_string(tk->state),
tk->commit_index,
ticket_leader_string(tk),
ctime(&tk->term_expires));
}
}
/* UDP message receiver. */
int message_recv(struct boothc_ticket_msg *msg, int msglen)
{
int rv;
uint32_t from;
struct booth_site *source;
struct ticket_config *tk;
struct booth_site *leader;
uint32_t leader_u;
if (check_boothc_header(&msg->header, sizeof(*msg)) < 0 ||
msglen != sizeof(*msg)) {
log_error("message receive error");
return -1;
}
from = ntohl(msg->header.from);
if (!find_site_by_id(from, &source) || !source) {
log_error("unknown sender: %08x", from);
return -1;
}
if (!check_ticket(msg->ticket.id, &tk)) {
log_error("got invalid ticket name \"%s\" from %s",
msg->ticket.id, source->addr_string);
return -EINVAL;
}
leader_u = ntohl(msg->ticket.leader);
if (!find_site_by_id(leader_u, &leader)) {
log_error("Message with unknown owner %x received", leader_u);
return -EINVAL;
}
rv = raft_answer(tk, source, leader, msg);
#if 0
cmd = ntohl(msg->header.cmd);
switch (cmd) {
case CMD_CATCHUP:
return ticket_answer_catchup(tk, source, msg, ballot, new_owner_p);
case CMR_CATCHUP:
return ticket_process_catchup(tk, source, msg, ballot, new_owner_p);
default:
/* only used in catchup, and not even really there ?? */
assert(ntohl(msg->header.result) == 0);
rv = raft_answer(tk, source, msg);
-// TODO assert((tk->proposal_acknowledges & ~booth_conf->site_bits) == 0);
+ // TODO assert((tk->proposal_acknowledges & ~booth_conf->site_bits) == 0);
return rv;
}
#endif
return rv;
}
void set_ticket_wakeup(struct ticket_config *tk)
{
struct timeval tv, now;
/* At least every hour, perhaps sooner. */
ticket_next_cron_in(tk, 3600);
switch (tk->state) {
case ST_LEADER:
assert(tk->leader == local);
gettimeofday(&now, NULL);
tv = now;
tv.tv_sec = next_vote_starts_at(tk);
/* If timestamp is in the past, look again in one second. */
if (timeval_compare(tv, now) <= 0)
tv.tv_sec = now.tv_sec + 1;
ticket_next_cron_at(tk, tv);
break;
case ST_CANDIDATE:
assert(tk->election_end);
ticket_next_cron_at_coarse(tk, tk->election_end);
break;
case ST_INIT:
case ST_FOLLOWER:
/* If there is (or should be) some owner, check on her later on.
* If no one is interested - don't care. */
if ((tk->leader || tk->acquire_after) &&
(local->type == SITE))
ticket_next_cron_at_coarse(tk,
tk->term_expires + tk->acquire_after);
break;
default:
log_error("why here?");
}
}
/* Given a state (in host byte order), return a human-readable (char*).
* An array is used so that multiple states can be printed in a single printf(). */
char *state_to_string(uint32_t state_ho)
{
union mu { cmd_request_t s; char c[5]; };
static union mu cache[6] = { { 0 } }, *cur;
static int current = 0;
current ++;
if (current >= sizeof(cache)/sizeof(cache[0]))
current = 0;
cur = cache + current;
cur->s = htonl(state_ho);
/* Shouldn't be necessary, union array is initialized with zeroes, and
* these bytes never get written. */
cur->c[4] = 0;
return cur->c;
}
int send_reject(struct booth_site *dest, struct ticket_config *tk, cmd_result_t code)
{
struct boothc_ticket_msg msg;
init_ticket_msg(&msg, OP_REJECTED, code, tk);
return booth_udp_send(dest, &msg, sizeof(msg));
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Mon, Feb 24, 10:46 PM (14 h, 29 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1462415
Default Alt Text
(37 KB)
Attached To
Mode
rB Booth
Attached
Detach File
Event Timeline
Log In to Comment