Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F2823042
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
29 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/src/raft.c b/src/raft.c
index f9bd2fb..7179832 100644
--- a/src/raft.c
+++ b/src/raft.c
@@ -1,512 +1,514 @@
/*
* Copyright (C) 2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <inttypes.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include "booth.h"
#include "transport.h"
#include "inline-fn.h"
#include "config.h"
#include "raft.h"
#include "ticket.h"
#include "log.h"
inline static void clear_election(struct ticket_config *tk)
{
int i;
struct booth_site *site;
log_info("clear election");
tk->votes_received = 0;
foreach_node(i, site)
tk->votes_for[site->index] = NULL;
}
inline static void site_voted_for(struct ticket_config *tk,
struct booth_site *who,
struct booth_site *vote)
{
log_info("site \"%s\" votes for \"%s\"",
site_string(who),
site_string(vote));
if (!tk->votes_for[who->index]) {
tk->votes_for[who->index] = vote;
tk->votes_received |= who->bitmask;
} else {
if (tk->votes_for[who->index] != vote)
log_error("voted previously (but in same term!) for \"%s\"...",
tk->votes_for[who->index]->addr_string);
}
}
static void become_follower(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
uint32_t i;
int duration;
tk->state = ST_FOLLOWER;
duration = tk->term_duration;
if (msg)
duration = min(duration, ntohl(msg->ticket.term_valid_for));
tk->term_expires = time(NULL) + duration;
if (msg) {
i = ntohl(msg->ticket.term);
tk->current_term = max(i, tk->current_term);
/* § 5.3 */
i = ntohl(msg->ticket.leader_commit);
tk->commit_index = max(i, tk->commit_index);
}
ticket_write(tk);
}
static struct booth_site *majority_votes(struct ticket_config *tk)
{
int i, n;
struct booth_site *v;
int count[MAX_NODES] = { 0, };
for(i=0; i<booth_conf->site_count; i++) {
v = tk->votes_for[i];
if (!v)
continue;
n = v->index;
count[n]++;
log_info("Majority: %d \"%s\" wants %d \"%s\" => %d",
i, booth_conf->site[i].addr_string,
n, v->addr_string,
count[n]);
if (count[n]*2 <= booth_conf->site_count)
continue;
log_info("Majority reached: %d of %d for \"%s\"",
count[n], booth_conf->site_count,
v->addr_string);
return v;
}
return NULL;
}
static int newer_term(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg)
{
uint32_t term;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (term > tk->current_term) {
tk->state = ST_FOLLOWER;
tk->leader = leader;
log_info("higher term %d vs. %d, following \"%s\"",
term, tk->current_term,
ticket_leader_string(tk));
tk->current_term = term;
return 1;
}
return 0;
}
static int term_too_low(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg)
{
uint32_t term;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (term < tk->current_term)
{
log_info("sending REJECT, term too low.");
send_reject(sender, tk, RLT_TERM_OUTDATED);
return 1;
}
return 0;
}
/* For follower. */
static int answer_HEARTBEAT (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
struct boothc_ticket_msg omsg;
-
term = ntohl(msg->ticket.term);
log_debug("leader: %s, have %s; term %d vs %d",
site_string(leader), ticket_leader_string(tk),
term, tk->current_term);
/* No reject. (?) */
- if (term < tk->current_term)
+ if (term < tk->current_term) {
+ log_info("ignoring lower term %d vs. %d, from \"%s\"",
+ term, tk->current_term,
+ ticket_leader_string(tk));
return 0;
+ }
/* Needed? */
newer_term(tk, sender, leader, msg);
become_follower(tk, msg);
/* Racy??? */
assert(sender == leader || !leader);
tk->leader = leader;
+ /* why do we need this? */
ticket_activate_timeout(tk);
- /* Yeth, mathter. */
+ /* Ack the heartbeat (we comply). */
init_ticket_msg(&omsg, OP_HEARTBEAT, RLT_SUCCESS, tk);
return booth_udp_send(sender, &omsg, sizeof(omsg));
}
/* For leader. */
static int process_HEARTBEAT(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
if (newer_term(tk, sender, leader, msg)) {
/* Uh oh. Higher term?? Should we simply believe that? */
log_error("Got higher term number from");
return 0;
}
term = ntohl(msg->ticket.term);
/* Don't send a reject. */
if (term < tk->current_term) {
/* Doesn't know what he's talking about - perhaps
* doesn't receive our packets? */
log_error("Stale/wrong heartbeat from \"%s\": "
"term %d instead of %d",
site_string(sender),
term, tk->current_term);
return 0;
}
if (term == tk->current_term &&
leader == tk->leader) {
/* Hooray, an ACK! */
/* So at least _someone_ is listening. */
tk->hb_received |= sender->bitmask;
log_debug("Got heartbeat ACK from \"%s\", %d/%d agree.",
site_string(sender),
count_bits(tk->hb_received),
booth_conf->site_count);
if (majority_of_bits(tk, tk->hb_received)) {
/* OK, at least half of the nodes are reachable;
* no need to do anything until
* the next heartbeat should be sent. */
set_ticket_wakeup(tk);
tk->retry_number = 0;
if( !tk->majority_acks_received ) {
tk->majority_acks_received = 1;
+ tk->term_expires = time(NULL) + tk->term_duration;
ticket_write(tk);
}
- } else {
- /* Not enough answers yet;
- * wait until timeout expires. */
- ticket_activate_timeout(tk);
}
}
return 0;
}
static int process_VOTE_FOR(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
struct booth_site *new_leader;
term = ntohl(msg->ticket.term);
if (term_too_low(tk, sender, leader, msg))
return 0;
if (term == tk->current_term &&
tk->election_end < time(NULL)) {
/* Election already ended - either by time or majority.
* Ignore. */
return 0;
}
if (newer_term(tk, sender, leader, msg)) {
clear_election(tk);
}
site_voted_for(tk, sender, leader);
/* §5.2 */
new_leader = majority_votes(tk);
if (new_leader) {
tk->leader = new_leader;
tk->term_expires = time(NULL) + tk->term_duration;
tk->election_end = 0;
tk->voted_for = NULL;
if ( new_leader == local) {
tk->commit_index++; // ??
tk->state = ST_LEADER;
send_heartbeat(tk);
}
else
become_follower(tk, NULL);
}
set_ticket_wakeup(tk);
return 0;
}
static int process_REJECTED(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t rv;
rv = ntohl(msg->header.result);
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_OUTDATED) {
log_info("Am out of date, become follower.");
tk->leader = leader;
become_follower(tk, msg);
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_STILL_VALID) {
log_error("There's a leader that I don't see: \"%s\"",
site_string(leader));
tk->leader = leader;
become_follower(tk, msg);
return 0;
}
log_error("unhandled reject: in state %s, got %s.",
state_to_string(tk->state),
state_to_string(rv));
tk->leader = leader;
become_follower(tk, msg);
return 0;
}
/* §5.2 */
static int answer_REQ_VOTE(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
int valid;
struct boothc_ticket_msg omsg;
if (term_too_low(tk, sender, leader, msg))
return 0;
if (newer_term(tk, sender, leader, msg))
goto vote_for_her;
term = ntohl(msg->ticket.term);
/* Important: Ignore duplicated packets! */
valid = term_valid_for(tk);
if (valid &&
term == tk->current_term &&
sender == tk->leader) {
log_debug("Duplicate OP_VOTE_FOR ignored.");
return 0;
}
if (valid) {
log_debug("no election allowed, term valid for %d??", valid);
return send_reject(sender, tk, RLT_TERM_STILL_VALID);
}
/* §5.2, §5.4 */
if (!tk->voted_for) {
vote_for_her:
tk->voted_for = sender;
site_voted_for(tk, sender, leader);
goto yes_you_can;
}
yes_you_can:
init_ticket_msg(&omsg, OP_VOTE_FOR, RLT_SUCCESS, tk);
omsg.ticket.leader = htonl(get_node_id(tk->voted_for));
return transport()->broadcast(&omsg, sizeof(omsg));
}
int new_election(struct ticket_config *tk, struct booth_site *preference)
{
struct booth_site *new_leader;
time_t now;
time(&now);
log_debug("start new election?, now=%" PRIi64 ", end %" PRIi64,
(int64_t)now, (int64_t)(tk->election_end));
if (now <= tk->election_end)
return 0;
/* §5.2 */
/* If there was _no_ answer, don't keep incrementing the term number
* indefinitely. If there was no peer, there'll probably be no one
* listening now either.
* Own vote can be disregarded.
* Not entirely correct? After startup the term should be incremented
* once, to speed up becoming a leader?
* Perhaps only increment once, and then try to rebuild with the same
* term number? With 5 nodes the 2 node partition would still increment
* endlessly. */
if (count_bits(tk->votes_received) > 1)
tk->current_term++;
tk->term_expires = 0;
tk->election_end = now + tk->term_duration;
log_debug("start new election! term=%d, until %" PRIi64,
tk->current_term, (int64_t)tk->election_end);
clear_election(tk);
if(preference)
new_leader = preference;
else
new_leader = (local->type == SITE) ? local : NULL;
site_voted_for(tk, local, new_leader);
tk->voted_for = new_leader;
tk->state = ST_CANDIDATE;
ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS);
+ ticket_activate_timeout(tk);
return 0;
}
int raft_answer(
struct ticket_config *tk,
struct booth_site *from,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int cmd;
int rv;
rv = 0;
cmd = ntohl(msg->header.cmd);
R(tk);
log_debug("got message %s from \"%s\"",
state_to_string(cmd),
from->addr_string);
switch (cmd) {
case OP_REQ_VOTE:
rv = answer_REQ_VOTE(tk, from, leader, msg);
break;
case OP_VOTE_FOR:
rv = process_VOTE_FOR(tk, from, leader, msg);
break;
case OP_HEARTBEAT:
if (tk->leader == local &&
tk->state == ST_LEADER)
rv = process_HEARTBEAT(tk, from, leader, msg);
else if (tk->leader != local &&
tk->state == ST_FOLLOWER)
rv = answer_HEARTBEAT(tk, from, leader, msg);
else
assert("invalid combination - leader, follower");
break;
case OP_REJECTED:
rv = process_REJECTED(tk, from, leader, msg);
break;
default:
log_error("unprocessed message, cmd %x", cmd);
rv = -EINVAL;
}
R(tk);
return rv;
}
diff --git a/src/ticket.c b/src/ticket.c
index 6b8054f..c66efa2 100644
--- a/src/ticket.c
+++ b/src/ticket.c
@@ -1,646 +1,654 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <inttypes.h>
#include <stdio.h>
#include <assert.h>
#include <time.h>
#include "ticket.h"
#include "config.h"
#include "pacemaker.h"
#include "inline-fn.h"
#include "log.h"
#include "booth.h"
#include "raft.h"
#include "handler.h"
#define TK_LINE 256
/* Untrusted input, must fit (incl. \0) in a buffer of max chars. */
int check_max_len_valid(const char *s, int max)
{
int i;
for(i=0; i<max; i++)
if (s[i] == 0)
return 1;
return 0;
}
int find_ticket_by_name(const char *ticket, struct ticket_config **found)
{
int i;
if (found)
*found = NULL;
for (i = 0; i < booth_conf->ticket_count; i++) {
if (!strcmp(booth_conf->ticket[i].name, ticket)) {
if (found)
*found = booth_conf->ticket + i;
return 1;
}
}
return 0;
}
int check_ticket(char *ticket, struct ticket_config **found)
{
if (found)
*found = NULL;
if (!booth_conf)
return 0;
if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name)))
return 0;
return find_ticket_by_name(ticket, found);
}
int check_site(char *site, int *is_local)
{
struct booth_site *node;
if (!check_max_len_valid(site, sizeof(node->addr_string)))
return 0;
if (find_site_by_name(site, &node, 0)) {
*is_local = node->local;
return 1;
}
return 0;
}
#if 0
/** Find out what others think about this ticket.
*
* If we're a SITE, we can ask (and have to tell) Pacemaker.
* An ARBITRATOR can only ask others. */
static int ticket_send_catchup(struct ticket_config *tk)
{
int i, rv = 0;
struct booth_site *site;
struct boothc_ticket_msg msg;
foreach_node(i, site) {
if (!site->local) {
init_ticket_msg(&msg, CMD_CATCHUP, RLT_SUCCESS, tk);
log_debug("attempting catchup from %s", site->addr_string);
rv = booth_udp_send(site, &msg, sizeof(msg));
}
}
ticket_activate_timeout(tk);
return rv;
}
#endif
int ticket_write(struct ticket_config *tk)
{
if (local->type != SITE)
return -EINVAL;
disown_if_expired(tk);
if (tk->leader == local) {
pcmk_handler.grant_ticket(tk);
} else {
pcmk_handler.revoke_ticket(tk);
}
return 0;
}
/* Ask an external program whether getting the ticket
* makes sense.
* Eg. if the services have a failcount of INFINITY,
* we can't serve here anyway. */
int get_ticket_locally_if_allowed(struct ticket_config *tk)
{
int rv;
if (!tk->ext_verifier)
goto get_it;
rv = run_handler(tk, tk->ext_verifier, 1);
if (rv) {
log_error("May not acquire ticket.");
/* Give it to somebody else.
* Just send a commit message, as the
* others couldn't help anyway. */
if (leader_and_valid(tk)) {
disown_ticket(tk);
#if 0
tk->proposed_owner = NULL;
/* Just go one further - others may easily override. */
tk->new_ballot++;
ticket_broadcast_proposed_state(tk, OP_COMMITTED);
tk->state = ST_STABLE;
#endif
ticket_broadcast(tk, OP_VOTE_FOR, RLT_SUCCESS);
}
return rv;
} else {
log_info("May get/keep ticket.");
}
get_it:
if (leader_and_valid(tk)) {
return send_heartbeat(tk);
} else {
/* Ticket should now become active locally, wasn't before. */
new_election(tk, local);
return ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS);
}
}
/** Try to get the ticket for the local site.
* */
int do_grant_ticket(struct ticket_config *tk)
{
int rv;
if (tk->leader == local)
return RLT_SUCCESS;
if (is_owned(tk))
return RLT_OVERGRANT;
rv = get_ticket_locally_if_allowed(tk);
return rv;
}
/** Revoke round.
* That can be started from any site. */
int do_revoke_ticket(struct ticket_config *tk)
{
if (!is_owned(tk))
return RLT_SUCCESS;
disown_ticket(tk);
tk->voted_for = no_leader;
// ticket_write(tk); only when majority wants that? or, if tk->leader was == local, in every case, because the ticket shouldn't be here anymore?
/* 1) lose ticket
* 2) if majority is available, "none" gets it
* 3) if majority not available, they might have voted for somebody else in the meantime anyway
*/
tk->state = ST_FOLLOWER;
/* Start a new vote round, with a new term number. */
tk->current_term++;
return ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS);
}
int list_ticket(char **pdata, unsigned int *len)
{
struct ticket_config *tk;
char timeout_str[64];
char *data, *cp;
int i, alloc;
*pdata = NULL;
*len = 0;
alloc = 256 +
booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128);
data = malloc(alloc);
if (!data)
return -ENOMEM;
cp = data;
foreach_ticket(i, tk) {
if (tk->term_expires != 0)
strftime(timeout_str, sizeof(timeout_str), "%F %T",
localtime(&tk->term_expires));
else
strcpy(timeout_str, "INF");
cp += sprintf(cp,
"ticket: %s, leader: %s, expires: %s, commit: %d\n",
tk->name,
ticket_leader_string(tk),
timeout_str,
tk->commit_index);
*len = cp - data;
assert(*len < alloc);
}
*pdata = data;
return 0;
}
int setup_ticket(void)
{
struct ticket_config *tk;
int i;
foreach_ticket(i, tk) {
tk->leader = NULL;
tk->term_expires = 0;
// abort_proposal(tk);
if (local->type == SITE) {
pcmk_handler.load_ticket(tk);
}
/* We'll start the election immediately if the ticket
* belonged to us */
/* tk->term_expires = time(NULL) + tk->term_duration;*/
tk->state = ST_FOLLOWER;
/* TODO: send query packet to see sooner who's online. */
}
return 0;
}
int ticket_answer_list(int fd, struct boothc_ticket_msg *msg)
{
char *data;
int olen, rv;
struct boothc_header hdr;
rv = list_ticket(&data, &olen);
if (rv < 0)
return rv;
init_header(&hdr, CMR_LIST, RLT_SUCCESS, sizeof(hdr) + olen);
return send_header_plus(fd, &hdr, data, olen);
}
int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg)
{
int rv;
struct ticket_config *tk;
if (!check_ticket(msg->ticket.id, &tk)) {
log_error("Client asked to grant unknown ticket");
rv = RLT_INVALID_ARG;
goto reply;
}
if (is_owned(tk)) {
log_error("client wants to get an (already granted!) ticket \"%s\"",
msg->ticket.id);
rv = RLT_OVERGRANT;
goto reply;
}
rv = do_grant_ticket(tk);
reply:
init_header(&msg->header, CMR_GRANT, rv ?: RLT_ASYNC, sizeof(*msg));
return send_ticket_msg(fd, msg);
}
int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg)
{
int rv;
struct ticket_config *tk;
if (!check_ticket(msg->ticket.id, &tk)) {
log_error("Client asked to grant unknown ticket");
rv = RLT_INVALID_ARG;
goto reply;
}
if (!is_owned(tk)) {
log_info("client wants to revoke a free ticket \"%s\"",
msg->ticket.id);
/* Return a different result code? */
rv = RLT_SUCCESS;
goto reply;
}
rv = do_revoke_ticket(tk);
if (rv == 0)
rv = RLT_ASYNC;
reply:
init_ticket_msg(msg, CMR_REVOKE, rv, tk);
return send_ticket_msg(fd, msg);
}
int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_result_t res)
{
struct boothc_ticket_msg msg;
init_ticket_msg(&msg, cmd, res, tk);
log_debug("broadcasting '%s' for ticket \"%s\"",
state_to_string(cmd), tk->name);
return transport()->broadcast(&msg, sizeof(msg));
}
static void ticket_cron(struct ticket_config *tk)
{
time_t now;
int rv;
now = time(NULL);
R(tk);
/* Has an owner, has an expiry date, and expiry date in the past?
* Losing the ticket must happen in _every_ state. */
if (tk->term_expires &&
is_owned(tk) &&
now > tk->term_expires) {
log_info("LOST ticket: \"%s\" no longer at %s",
tk->name,
ticket_leader_string(tk));
/* Couldn't renew in time - ticket lost. */
disown_ticket(tk);
/* New vote round; §5.2 */
if (local->type == SITE)
new_election(tk, NULL);
/* should be "always" that way
else
tk->state = ST_FOLLOWER;
*/
// abort_proposal(tk); TODO
ticket_write(tk);
- ticket_activate_timeout(tk);
-
/* May not try to re-acquire now, need to find out
* what others think. */
return;
}
R(tk);
switch(tk->state) {
case ST_INIT:
/* Unknown state, ask others. */
// ticket_send_catchup(tk);
break;
case ST_FOLLOWER:
+ /* in case we got restarted and this ticket belongs to
+ * us */
if (tk->is_granted && tk->leader == local) {
tk->current_term++;
new_election(tk, NULL);
}
break;
case ST_CANDIDATE:
/* §5.2 */
if (now > tk->election_end)
new_election(tk, NULL);
break;
case ST_LEADER:
- if (tk->hb_sent_at + tk->timeout > now) {
- /* Heartbeat timeout reached. Oops ... */
- tk->retry_number ++;
- log_error("Not enough answers to heartbeat on try #%d: "
+ /* we get here after we broadcasted a heartbeat;
+ * by this time all sites should've acked the heartbeat
+ */
+ /* if (tk->hb_sent_at + tk->timeout <= now) { */
+ if (count_bits(tk->hb_received) < booth_conf->site_count) {
+ if (!majority_of_bits(tk, tk->hb_received)) {
+ tk->retry_number ++;
+ log_warn("not enough answers to heartbeat on try #%d: "
"only got %d answers (mask 0x%" PRIx64 ")!",
tk->retry_number,
count_bits(tk->hb_received),
tk->hb_received);
-
/* Don't give up, though - there's still some time until leadership is lost. */
+ } else {
+ log_warn("some sites not acked heartbeat on try #%d: "
+ "only got %d answers (mask 0x%" PRIx64 ")!",
+ tk->retry_number,
+ count_bits(tk->hb_received),
+ tk->hb_received);
+ }
}
rv = run_handler(tk, tk->ext_verifier, 1);
if (rv) {
tk->state = ST_FOLLOWER;
- tk->leader= NULL;
+ disown_ticket(tk);
// resp. no owner anymore, new takers?
ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS);
ticket_write(tk);
} else {
- tk->term_expires = now + tk->term_duration;
send_heartbeat(tk);
- // ticket_write(tk); // not correct here -- no acks received yet
+ ticket_activate_timeout(tk);
}
- ticket_activate_timeout(tk);
break;
default:
break;
}
R(tk);
}
void process_tickets(void)
{
struct ticket_config *tk;
int i;
struct timeval now;
float sec_until;
gettimeofday(&now, NULL);
foreach_ticket(i, tk) {
sec_until = timeval_to_float(tk->next_cron) - timeval_to_float(now);
if (0)
log_debug("ticket %s next cron %" PRIx64 ".%03d, "
"now %" PRIx64 "%03d, in %f",
tk->name,
(uint64_t)tk->next_cron.tv_sec, timeval_msec(tk->next_cron),
(uint64_t)now.tv_sec, timeval_msec(now),
sec_until);
if (sec_until > 0.0)
continue;
log_debug("ticket cron: doing %s", tk->name);
/* Set next value, handler may override.
* This should already be handled via the state logic;
* but to be on the safe side the renew repetition is
* duplicated here, too. */
set_ticket_wakeup(tk);
ticket_cron(tk);
}
}
void tickets_log_info(void)
{
struct ticket_config *tk;
int i;
foreach_ticket(i, tk) {
log_info("Ticket %s: state '%s' "
"commit index %d "
"leader \"%s\" "
"expires %-24.24s",
tk->name,
state_to_string(tk->state),
tk->commit_index,
ticket_leader_string(tk),
ctime(&tk->term_expires));
}
}
/* UDP message receiver. */
int message_recv(struct boothc_ticket_msg *msg, int msglen)
{
uint32_t from;
struct booth_site *source;
struct ticket_config *tk;
struct booth_site *leader;
uint32_t leader_u;
if (check_boothc_header(&msg->header, sizeof(*msg)) < 0 ||
msglen != sizeof(*msg)) {
log_error("message receive error");
return -1;
}
from = ntohl(msg->header.from);
if (!find_site_by_id(from, &source) || !source) {
log_error("unknown sender: %08x", from);
return -1;
}
if (!check_ticket(msg->ticket.id, &tk)) {
log_error("got invalid ticket name \"%s\" from %s",
msg->ticket.id, source->addr_string);
return -EINVAL;
}
leader_u = ntohl(msg->ticket.leader);
if (!find_site_by_id(leader_u, &leader)) {
log_error("Message with unknown owner %x received", leader_u);
return -EINVAL;
}
return raft_answer(tk, source, leader, msg);
}
void set_ticket_wakeup(struct ticket_config *tk)
{
struct timeval tv, now;
/* At least every hour, perhaps sooner. */
ticket_next_cron_in(tk, 3600);
switch (tk->state) {
case ST_LEADER:
assert(tk->leader == local);
gettimeofday(&now, NULL);
tv = now;
tv.tv_sec = next_vote_starts_at(tk);
/* If timestamp is in the past, look again in one second. */
if (timeval_compare(tv, now) <= 0)
tv.tv_sec = now.tv_sec + 1;
ticket_next_cron_at(tk, tv);
break;
case ST_CANDIDATE:
assert(tk->election_end);
ticket_next_cron_at_coarse(tk, tk->election_end);
break;
case ST_INIT:
case ST_FOLLOWER:
/* If there is (or should be) some owner, check on her later on.
* If no one is interested - don't care. */
if ((is_owned(tk) || tk->acquire_after) &&
(local->type == SITE))
ticket_next_cron_at_coarse(tk,
tk->term_expires + tk->acquire_after);
break;
default:
log_error("why here?");
}
}
/* Given a state (in host byte order), return a human-readable (char*).
* An array is used so that multiple states can be printed in a single printf(). */
char *state_to_string(uint32_t state_ho)
{
union mu { cmd_request_t s; char c[5]; };
static union mu cache[6] = { { 0 } }, *cur;
static int current = 0;
current ++;
if (current >= sizeof(cache)/sizeof(cache[0]))
current = 0;
cur = cache + current;
cur->s = htonl(state_ho);
/* Shouldn't be necessary, union array is initialized with zeroes, and
* these bytes never get written. */
cur->c[4] = 0;
return cur->c;
}
int send_reject(struct booth_site *dest, struct ticket_config *tk, cmd_result_t code)
{
struct boothc_ticket_msg msg;
init_ticket_msg(&msg, OP_REJECTED, code, tk);
return booth_udp_send(dest, &msg, sizeof(msg));
}
diff --git a/src/ticket.h b/src/ticket.h
index e0af8b2..0b22845 100644
--- a/src/ticket.h
+++ b/src/ticket.h
@@ -1,104 +1,102 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _TICKET_H
#define _TICKET_H
#include <time.h>
#include <sys/time.h>
#include <math.h>
#include "config.h"
#define DEFAULT_TICKET_EXPIRY 600
#define DEFAULT_TICKET_TIMEOUT 10
#define DEFAULT_RETRIES 10
#define foreach_ticket(i_,t_) for(i=0; (t_=booth_conf->ticket+i, i<booth_conf->ticket_count); i++)
#define foreach_node(i_,n_) for(i=0; (n_=booth_conf->site+i, i<booth_conf->site_count); i++)
int check_ticket(char *ticket, struct ticket_config **tc);
int check_site(char *site, int *local);
int do_grant_ticket(struct ticket_config *ticket);
int revoke_ticket(struct ticket_config *ticket);
int list_ticket(char **pdata, unsigned int *len);
int message_recv(struct boothc_ticket_msg *msg, int msglen);
int setup_ticket(void);
int check_max_len_valid(const char *s, int max);
int do_grant_ticket(struct ticket_config *tk);
int do_revoke_ticket(struct ticket_config *tk);
int find_ticket_by_name(const char *ticket, struct ticket_config **found);
void set_ticket_wakeup(struct ticket_config *tk);
int get_ticket_locally_if_allowed(struct ticket_config *tk);
int ticket_answer_list(int fd, struct boothc_ticket_msg *msg);
int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg);
int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg);
int ticket_broadcast_proposed_state(struct ticket_config *tk, cmd_request_t state);
int ticket_write(struct ticket_config *tk);
void process_tickets(void);
void tickets_log_info(void);
char *state_to_string(uint32_t state_ho);
int send_reject(struct booth_site *dest, struct ticket_config *tk, cmd_result_t code);
int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_result_t res);
static inline void ticket_next_cron_at(struct ticket_config *tk, struct timeval when)
{
tk->next_cron = when;
}
static inline void ticket_next_cron_at_coarse(struct ticket_config *tk, time_t when)
{
tk->next_cron.tv_sec = when;
tk->next_cron.tv_usec = 0;
}
static inline void ticket_next_cron_in(struct ticket_config *tk, float seconds)
{
struct timeval tv;
gettimeofday(&tv, NULL);
tv.tv_sec += trunc(seconds);
tv.tv_usec += (seconds - trunc(seconds)) * 1e6;
ticket_next_cron_at(tk, tv);
}
static inline void ticket_activate_timeout(struct ticket_config *tk)
{
/* TODO: increase timeout when no answers */
ticket_next_cron_in(tk, tk->timeout);
-
- tk->retry_number ++;
}
#endif /* _TICKET_H */
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Jan 25, 7:06 AM (1 d, 19 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1321607
Default Alt Text
(29 KB)
Attached To
Mode
rB Booth
Attached
Detach File
Event Timeline
Log In to Comment