Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/src/raft.c b/src/raft.c
index ec0cf99..f63f14e 100644
--- a/src/raft.c
+++ b/src/raft.c
@@ -1,1011 +1,1002 @@
/*
* Copyright (C) 2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include <stdlib.h>
#include <inttypes.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include "booth.h"
#include "timer.h"
#include "config.h"
#include "transport.h"
#include "inline-fn.h"
#include "raft.h"
#include "ticket.h"
#include "request.h"
#include "log.h"
#include "manual.h"
inline static void clear_election(struct ticket_config *tk)
{
int i;
struct booth_site *site;
tk_log_debug("clear election");
tk->votes_received = 0;
_FOREACH_NODE(i, site) {
tk->votes_for[site->index] = NULL;
}
}
inline static void record_vote(struct ticket_config *tk,
struct booth_site *who,
struct booth_site *vote)
{
tk_log_debug("site %s votes for %s",
site_string(who),
site_string(vote));
if (!tk->votes_for[who->index]) {
tk->votes_for[who->index] = vote;
tk->votes_received |= who->bitmask;
} else {
if (tk->votes_for[who->index] != vote)
tk_log_warn("%s voted previously "
"for %s and now wants to vote for %s (ignored)",
site_string(who),
site_string(tk->votes_for[who->index]),
site_string(vote));
}
}
static void update_term_from_msg(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
uint32_t i;
i = ntohl(msg->ticket.term);
/* if we failed to start the election, then accept the term
* from the leader
* */
if (tk->state == ST_CANDIDATE) {
tk->current_term = i;
} else {
tk->current_term = max(i, tk->current_term);
}
}
static void set_ticket_expiry(struct ticket_config *tk,
int duration)
{
set_future_time(&tk->term_expires, duration);
}
static void update_ticket_from_msg(struct ticket_config *tk,
struct booth_site *sender,
struct boothc_ticket_msg *msg)
{
int duration;
tk_log_info("updating from %s (%d/%d)",
site_string(sender),
ntohl(msg->ticket.term), msg_term_time(msg));
duration = min(tk->term_duration, msg_term_time(msg));
set_ticket_expiry(tk, duration);
update_term_from_msg(tk, msg);
}
static void copy_ticket_from_msg(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
set_ticket_expiry(tk, msg_term_time(msg));
tk->current_term = ntohl(msg->ticket.term);
}
static void become_follower(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
copy_ticket_from_msg(tk, msg);
set_state(tk, ST_FOLLOWER);
time_reset(&tk->delay_commit);
tk->in_election = 0;
/* if we're following and the ticket was granted here
* then commit to CIB right away (we're probably restarting)
*/
if (tk->is_granted) {
disown_ticket(tk);
ticket_write(tk);
}
}
static void won_elections(struct ticket_config *tk)
{
set_leader(tk, local);
set_state(tk, ST_LEADER);
set_ticket_expiry(tk, tk->term_duration);
time_reset(&tk->election_end);
tk->voted_for = NULL;
if (is_time_set(&tk->delay_commit) && all_sites_replied(tk)) {
time_reset(&tk->delay_commit);
tk_log_debug("reset delay commit as all sites replied");
}
save_committed_tkt(tk);
ticket_broadcast(tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0);
tk->ticket_updated = 0;
}
/* if more than one member got the same (and maximum within that
* election) number of votes, then that is a tie
*/
static int is_tie(struct ticket_config *tk)
{
int i;
struct booth_site *v;
int count[MAX_NODES] = { 0, };
int max_votes = 0, max_cnt = 0;
for (i = 0; i < booth_conf->site_count; i++) {
v = tk->votes_for[i];
if (!v)
continue;
count[v->index]++;
max_votes = max(max_votes, count[v->index]);
}
for (i = 0; i < booth_conf->site_count; i++) {
if (count[i] == max_votes)
max_cnt++;
}
return max_cnt > 1;
}
static struct booth_site *majority_votes(struct ticket_config *tk)
{
int i, n;
struct booth_site *v;
int count[MAX_NODES] = { 0, };
for (i = 0; i < booth_conf->site_count; i++) {
v = tk->votes_for[i];
if (!v || v == no_leader)
continue;
n = v->index;
count[n]++;
tk_log_debug("Majority: %d %s wants %d %s => %d",
i, site_string(&booth_conf->site[i]),
n, site_string(v),
count[n]);
if (count[n]*2 <= booth_conf->site_count)
continue;
tk_log_debug("Majority reached: %d of %d for %s",
count[n], booth_conf->site_count,
site_string(v));
return v;
}
return NULL;
}
-void elections_end(struct ticket_config *tk)
+void elections_end(struct booth_config *conf, struct ticket_config *tk)
{
struct booth_site *new_leader;
if (is_past(&tk->election_end)) {
/* This is previous election timed out */
tk_log_info("elections finished");
}
tk->in_election = 0;
new_leader = majority_votes(tk);
if (new_leader == local) {
won_elections(tk);
tk_log_info("granted successfully here");
} else if (new_leader) {
tk_log_info("ticket granted at %s",
site_string(new_leader));
} else {
tk_log_info("nobody won elections, new elections");
tk->outcome = RLT_MORE;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
if (!new_election(tk, NULL, is_tie(tk) ? 2 : 0, OR_AGAIN)) {
ticket_activate_timeout(tk);
}
}
}
static int newer_term(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg,
int in_election)
{
uint32_t term;
/* it may happen that we hear about our newer term */
if (leader == local)
return 0;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (term > tk->current_term) {
set_state(tk, ST_FOLLOWER);
if (!in_election) {
set_leader(tk, leader);
tk_log_info("from %s: higher term %d vs. %d, following %s",
site_string(sender),
term, tk->current_term,
ticket_leader_string(tk));
} else {
tk_log_debug("from %s: higher term %d vs. %d (election)",
site_string(sender),
term, tk->current_term);
}
tk->current_term = term;
return 1;
}
return 0;
}
static int msg_term_invalid(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg)
{
uint32_t term;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (is_term_invalid(tk, term)) {
tk_log_info("got invalid term from %s "
"(%d), ignoring", site_string(sender), term);
return 1;
}
return 0;
}
static int term_too_low(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg)
{
uint32_t term;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (term < tk->current_term) {
tk_log_info("sending reject to %s, its term too low "
"(%d vs. %d)", site_string(sender),
term, tk->current_term
);
send_reject(sender, tk, RLT_TERM_OUTDATED, msg);
return 1;
}
return 0;
}
/* For follower. */
static int answer_HEARTBEAT (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
term = ntohl(msg->ticket.term);
tk_log_debug("heartbeat from leader: %s, have %s; term %d vs %d",
site_string(leader), ticket_leader_string(tk),
term, tk->current_term);
if (term < tk->current_term) {
if (sender == tk->leader) {
tk_log_info("trusting leader %s with a lower term (%d vs %d)",
site_string(leader), term, tk->current_term);
} else if (is_owned(tk)) {
tk_log_warn("different leader %s with a lower term "
"(%d vs %d), sending reject",
site_string(leader), term, tk->current_term);
return send_reject(sender, tk, RLT_TERM_OUTDATED, msg);
}
}
/* got heartbeat, no rejects expected anymore */
tk->expect_more_rejects = 0;
/* Needed? */
newer_term(tk, sender, leader, msg, 0);
become_follower(tk, msg);
/* Racy??? */
assert(sender == leader || !leader);
set_leader(tk, leader);
/* Ack the heartbeat (we comply). */
return send_msg(OP_ACK, tk, sender, msg);
}
static int process_UPDATE (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
if (is_owned(tk) && sender != tk->leader) {
tk_log_warn("different leader %s wants to update "
"our ticket, sending reject",
site_string(leader));
mark_ticket_as_granted(tk, sender);
return send_reject(sender, tk, RLT_TERM_OUTDATED, msg);
}
tk_log_debug("leader %s wants to update our ticket",
site_string(leader));
become_follower(tk, msg);
set_leader(tk, leader);
ticket_write(tk);
/* run ticket_cron if the ticket expires */
set_ticket_wakeup(tk);
return send_msg(OP_ACK, tk, sender, msg);
}
static int process_REVOKE (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int rv;
if (tk->state == ST_INIT && tk->leader == no_leader) {
/* assume that our ack got lost */
rv = send_msg(OP_ACK, tk, sender, msg);
} else if (tk->leader != sender) {
if (!is_manual(tk)) {
tk_log_error("%s wants to revoke ticket, "
"but it is not granted there (ignoring)",
site_string(sender));
return -1;
} else {
rv = process_REVOKE_for_manual_ticket(tk, sender, msg);
// Ticket data stored in this site is not modified. This means
// that this site will still follow another leader (the one which
// has not been revoked) or be a leader itself.
}
} else if (tk->state != ST_FOLLOWER) {
tk_log_error("unexpected ticket revoke from %s "
"(in state %s) (ignoring)",
site_string(sender),
state_to_string(tk->state));
return -1;
} else {
tk_log_info("%s revokes ticket",
site_string(tk->leader));
save_committed_tkt(tk);
reset_ticket_and_set_no_leader(tk);
ticket_write(tk);
rv = send_msg(OP_ACK, tk, sender, msg);
}
return rv;
}
/* For leader. */
-static int process_ACK(
- struct ticket_config *tk,
- struct booth_site *sender,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg
- )
+static int process_ACK(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *sender, struct booth_site *leader,
+ struct boothc_ticket_msg *msg)
{
uint32_t term;
int req;
term = ntohl(msg->ticket.term);
if (newer_term(tk, sender, leader, msg, 0)) {
/* unexpected higher term */
tk_log_warn("got higher term from %s (%d vs. %d)",
site_string(sender),
term, tk->current_term);
return 0;
}
/* Don't send a reject. */
if (term < tk->current_term) {
/* Doesn't know what he's talking about - perhaps
* doesn't receive our packets? */
tk_log_warn("unexpected term "
"from %s (%d vs. %d) (ignoring)",
site_string(sender),
term, tk->current_term);
return 0;
}
/* if the ticket is to be revoked, further processing is not
* interesting (and dangerous) */
if (tk->next_state == ST_INIT || tk->state == ST_INIT)
return 0;
req = ntohl(msg->header.request);
if ((req == OP_UPDATE || req == OP_HEARTBEAT) &&
term == tk->current_term &&
leader == tk->leader) {
if (majority_of_bits(tk, tk->acks_received)) {
/* OK, at least half of the nodes are reachable;
* Update the ticket and send update messages out
*/
- return leader_update_ticket(tk);
+ return leader_update_ticket(conf, tk);
}
}
return 0;
}
-static int process_VOTE_FOR(
- struct ticket_config *tk,
- struct booth_site *sender,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg
- )
+static int process_VOTE_FOR(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *sender, struct booth_site *leader,
+ struct boothc_ticket_msg *msg)
{
if (leader == no_leader) {
/* leader wants to step down? */
if (sender == tk->leader &&
(tk->state == ST_FOLLOWER || tk->state == ST_CANDIDATE)) {
tk_log_info("%s wants to give the ticket away (ticket release)",
site_string(tk->leader));
save_committed_tkt(tk);
reset_ticket(tk);
set_state(tk, ST_FOLLOWER);
if (local->type == SITE) {
ticket_write(tk);
schedule_election(tk, OR_STEPDOWN);
}
} else {
tk_log_info("%s votes for none, ignoring (duplicate ticket release?)",
site_string(sender));
}
return 0;
}
if (tk->state != ST_CANDIDATE) {
/* lost candidate status, somebody rejected our proposal */
tk_log_info("candidate status lost, ignoring VtFr from %s",
site_string(sender));
return 0;
}
if (term_too_low(tk, sender, leader, msg))
return 0;
if (newer_term(tk, sender, leader, msg, 0)) {
clear_election(tk);
}
record_vote(tk, sender, leader);
/* only if all voted can we take the ticket now, otherwise
* wait for timeout in ticket_cron */
if (!tk->acks_expected) {
/* §5.2 */
- elections_end(tk);
+ elections_end(conf, tk);
}
return 0;
}
static int process_REJECTED(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t rv;
rv = ntohl(msg->header.result);
if (tk->state == ST_CANDIDATE &&
leader == local) {
/* the sender has us as the leader (!)
* the elections will time out, then we can try again
*/
tk_log_warn("ticket was granted to us "
"(and we didn't know)");
tk->expect_more_rejects = 1;
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_OUTDATED) {
tk_log_warn("ticket outdated (term %d), granted to %s",
ntohl(msg->ticket.term),
site_string(leader)
);
set_leader(tk, leader);
tk->expect_more_rejects = 1;
become_follower(tk, msg);
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_STILL_VALID) {
if (tk->lost_leader == leader) {
if (tk->election_reason == OR_TKT_LOST) {
tk_log_warn("%s still has the ticket valid, "
"we'll backup a bit",
site_string(sender));
} else {
tk_log_warn("%s unexpectedly rejects elections",
site_string(sender));
}
} else {
tk_log_warn("ticket was granted to %s "
"(and we didn't know)",
site_string(leader));
}
set_leader(tk, leader);
become_follower(tk, msg);
tk->expect_more_rejects = 1;
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_YOU_OUTDATED) {
set_leader(tk, leader);
tk->expect_more_rejects = 1;
if (leader && leader != no_leader) {
tk_log_warn("our ticket is outdated, granted to %s",
site_string(leader));
become_follower(tk, msg);
} else {
tk_log_warn("our ticket is outdated and revoked");
update_ticket_from_msg(tk, sender, msg);
set_state(tk, ST_INIT);
}
return 0;
}
if (!tk->expect_more_rejects) {
tk_log_warn("from %s: in state %s, got %s (unexpected reject)",
site_string(sender),
state_to_string(tk->state),
state_to_string(rv));
}
return 0;
}
static int ticket_seems_ok(struct ticket_config *tk)
{
int left;
left = term_time_left(tk);
if (!left)
return 0; /* quite sure */
if (tk->state == ST_CANDIDATE)
return 0; /* in state of flux */
if (tk->state == ST_LEADER)
return 1; /* quite sure */
if (tk->state == ST_FOLLOWER &&
left >= tk->term_duration/3)
return 1; /* almost quite sure */
return 0;
}
static int test_reason(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int reason;
reason = ntohl(msg->header.reason);
if (reason == OR_TKT_LOST) {
if (tk->state == ST_INIT &&
tk->leader == no_leader) {
tk_log_warn("%s claims that the ticket is lost, "
"but it's in %s state (reject sent)",
site_string(sender),
state_to_string(tk->state)
);
return RLT_YOU_OUTDATED;
}
if (ticket_seems_ok(tk)) {
tk_log_warn("%s claims that the ticket is lost, "
"but it is ok here (reject sent)",
site_string(sender));
return RLT_TERM_STILL_VALID;
}
}
return 0;
}
/* §5.2 */
static int answer_REQ_VOTE(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int valid;
struct boothc_ticket_msg omsg;
cmd_result_t inappr_reason;
int reason;
inappr_reason = test_reason(tk, sender, leader, msg);
if (inappr_reason)
return send_reject(sender, tk, inappr_reason, msg);
valid = term_time_left(tk);
reason = ntohl(msg->header.reason);
/* valid tickets are not allowed only if the sender thinks
* the ticket got lost */
if (sender != tk->leader && valid && reason != OR_STEPDOWN) {
tk_log_warn("election from %s with reason %s rejected "
"(we have %s as ticket owner), ticket still valid for %ds",
site_string(sender), state_to_string(reason),
site_string(tk->leader), valid);
return send_reject(sender, tk, RLT_TERM_STILL_VALID, msg);
}
if (term_too_low(tk, sender, leader, msg))
return 0;
/* set this, so that we know not to send status for the
* ticket */
tk->in_election = 1;
/* reset ticket's leader on not valid tickets */
if (!valid)
set_leader(tk, NULL);
/* if it's a newer term or ... */
if (newer_term(tk, sender, leader, msg, 1)) {
clear_election(tk);
goto vote_for_sender;
}
/* ... we didn't vote yet, then vote for the sender */
/* §5.2, §5.4 */
if (!tk->voted_for) {
vote_for_sender:
tk->voted_for = sender;
record_vote(tk, sender, leader);
}
init_ticket_msg(&omsg, OP_VOTE_FOR, OP_REQ_VOTE, RLT_SUCCESS, 0, tk);
omsg.ticket.leader = htonl(get_node_id(tk->voted_for));
return booth_udp_send_auth(sender, &omsg, sendmsglen(&omsg));
}
#define is_reason(r, tk) \
(reason == (r) || (reason == OR_AGAIN && (tk)->election_reason == (r)))
int new_election(struct ticket_config *tk,
struct booth_site *preference, int update_term, cmd_reason_t reason)
{
struct booth_site *new_leader;
if (local->type != SITE)
return 0;
if ((is_reason(OR_TKT_LOST, tk) || is_reason(OR_STEPDOWN, tk)) &&
check_attr_prereq(tk, GRANT_AUTO)) {
tk_log_info("attribute prerequisite not met, "
"not starting elections");
return 0;
}
/* elections were already started, but not yet finished/timed out */
if (is_time_set(&tk->election_end) && !is_past(&tk->election_end))
return 1;
if (ANYDEBUG) {
int tdiff;
if (is_time_set(&tk->election_end)) {
tdiff = -time_left(&tk->election_end);
tk_log_debug("starting elections, previous finished since " intfmt(tdiff));
} else {
tk_log_debug("starting elections");
}
tk_log_debug("elections caused by %s %s",
state_to_string(reason),
reason == OR_AGAIN ? state_to_string(tk->election_reason) : "" );
}
/* §5.2 */
/* If there was _no_ answer, don't keep incrementing the term number
* indefinitely. If there was no peer, there'll probably be no one
* listening now either. However, we don't know if we were
* invoked due to a timeout (caller does).
*/
/* increment the term only if either the current term was
* valid or if there was a tie (in that case update_term > 1)
*/
if ((update_term > 1) ||
(update_term && tk->last_valid_tk &&
tk->last_valid_tk->current_term >= tk->current_term)) {
/* save the previous term, we may need to send out the
* MY_INDEX message */
if (tk->state != ST_CANDIDATE) {
save_committed_tkt(tk);
}
tk->current_term++;
}
set_future_time(&tk->election_end, tk->timeout);
tk->in_election = 1;
tk_log_info("starting new election (term=%d)",
tk->current_term);
clear_election(tk);
new_leader = preference ? preference : local;
record_vote(tk, local, new_leader);
tk->voted_for = new_leader;
set_state(tk, ST_CANDIDATE);
/* some callers may want just to repeat on timeout */
if (reason == OR_AGAIN) {
reason = tk->election_reason;
} else {
tk->election_reason = reason;
}
ticket_broadcast(tk, OP_REQ_VOTE, OP_VOTE_FOR, RLT_SUCCESS, reason);
add_random_delay(tk);
return 0;
}
/* we were a leader and somebody says that they have a more up
* to date ticket
* there was probably connectivity loss
* tricky
*/
static int leader_handle_newer_ticket(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
update_term_from_msg(tk, msg);
if (leader != no_leader && leader && leader != local) {
/* eek, two leaders, split brain */
/* normally shouldn't happen; run election */
tk_log_error("from %s: ticket granted to %s! (revoking locally)",
site_string(sender),
site_string(leader)
);
} else if (term_time_left(tk)) {
/* eek, two leaders, split brain */
/* normally shouldn't happen; run election */
tk_log_error("from %s: ticket granted to %s! (revoking locally)",
site_string(sender),
site_string(leader)
);
}
set_next_state(tk, ST_LEADER);
return 0;
}
/* reply to STATUS */
static int process_MY_INDEX (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int i;
int expired;
expired = !msg_term_time(msg);
/* test against the last valid(!) ticket we have */
i = my_last_term(tk) - ntohl(msg->ticket.term);
if (i > 0) {
/* let them know about our newer ticket */
send_msg(OP_MY_INDEX, tk, sender, msg);
if (tk->state == ST_LEADER) {
tk_log_info("sending ticket update to %s",
site_string(sender));
return send_msg(OP_UPDATE, tk, sender, msg);
}
}
/* we have a newer or equal ticket and theirs is expired,
* nothing more to do here */
if (i >= 0 && expired) {
return 0;
}
if (tk->state == ST_LEADER) {
/* we're the leader, thread carefully */
if (expired) {
/* if their ticket is expired,
* nothing more to do */
return 0;
}
if (i < 0) {
/* they have a newer ticket, trouble if we're already leader
* for it */
tk_log_warn("from %s: more up to date ticket at %s",
site_string(sender),
site_string(leader)
);
return leader_handle_newer_ticket(tk, sender, leader, msg);
} else {
/* we have the ticket and we don't care */
return 0;
}
} else if (tk->state == ST_CANDIDATE) {
if (leader == local) {
/* a belated MY_INDEX, we're already trying to get the
* ticket */
return 0;
}
}
/* their ticket is either newer or not expired, don't
* ignore it */
update_ticket_from_msg(tk, sender, msg);
set_leader(tk, leader);
update_ticket_state(tk, sender);
save_committed_tkt(tk);
set_ticket_wakeup(tk);
return 0;
}
-int raft_answer(
- struct ticket_config *tk,
- struct booth_site *sender,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg
- )
+int raft_answer(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *sender, struct booth_site *leader,
+ struct boothc_ticket_msg *msg)
{
int cmd, req;
int rv;
rv = 0;
cmd = ntohl(msg->header.cmd);
req = ntohl(msg->header.request);
if (req)
tk_log_debug("got %s (req %s) from %s",
state_to_string(cmd),
state_to_string(req),
site_string(sender));
else
tk_log_debug("got %s from %s",
state_to_string(cmd),
site_string(sender));
/* don't process tickets with invalid term
*/
if (cmd != OP_STATUS &&
msg_term_invalid(tk, sender, leader, msg))
return 0;
switch (cmd) {
case OP_REQ_VOTE:
rv = answer_REQ_VOTE(tk, sender, leader, msg);
break;
case OP_VOTE_FOR:
- rv = process_VOTE_FOR(tk, sender, leader, msg);
+ rv = process_VOTE_FOR(conf, tk, sender, leader, msg);
break;
case OP_ACK:
if (tk->leader == local &&
tk->state == ST_LEADER)
- rv = process_ACK(tk, sender, leader, msg);
+ rv = process_ACK(conf, tk, sender, leader, msg);
break;
case OP_HEARTBEAT:
if ((tk->leader != local || !term_time_left(tk)) &&
(tk->state == ST_INIT || tk->state == ST_FOLLOWER ||
tk->state == ST_CANDIDATE))
rv = answer_HEARTBEAT(tk, sender, leader, msg);
else {
tk_log_warn("unexpected message %s, from %s",
state_to_string(cmd),
site_string(sender));
mark_ticket_as_granted(tk, sender);
if (ticket_seems_ok(tk))
send_reject(sender, tk, RLT_TERM_STILL_VALID, msg);
rv = -EINVAL;
}
break;
case OP_UPDATE:
if (((tk->leader != local && tk->leader == leader) || !is_owned(tk)) &&
(tk->state == ST_INIT || tk->state == ST_FOLLOWER ||
tk->state == ST_CANDIDATE)) {
rv = process_UPDATE(tk, sender, leader, msg);
} else {
tk_log_warn("unexpected message %s, from %s",
state_to_string(cmd),
site_string(sender));
if (ticket_seems_ok(tk))
send_reject(sender, tk, RLT_TERM_STILL_VALID, msg);
rv = -EINVAL;
}
break;
case OP_REJECTED:
rv = process_REJECTED(tk, sender, leader, msg);
break;
case OP_REVOKE:
rv = process_REVOKE(tk, sender, leader, msg);
break;
case OP_MY_INDEX:
rv = process_MY_INDEX(tk, sender, leader, msg);
break;
case OP_STATUS:
if (!tk->in_election)
rv = send_msg(OP_MY_INDEX, tk, sender, msg);
break;
default:
tk_log_error("unknown message %s, from %s",
state_to_string(cmd), site_string(sender));
rv = -EINVAL;
}
return rv;
}
diff --git a/src/raft.h b/src/raft.h
index 0e01b48..9e52d8d 100644
--- a/src/raft.h
+++ b/src/raft.h
@@ -1,43 +1,42 @@
/*
* Copyright (C) 2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _RAFT_H
#define _RAFT_H
#include "booth.h"
typedef enum {
ST_INIT = CHAR2CONST('I', 'n', 'i', 't'),
ST_FOLLOWER = CHAR2CONST('F', 'l', 'l', 'w'),
ST_CANDIDATE = CHAR2CONST('C', 'n', 'd', 'i'),
ST_LEADER = CHAR2CONST('L', 'e', 'a', 'd'),
} server_state_e;
struct ticket_config;
-int raft_answer(struct ticket_config *tk,
- struct booth_site *from,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg);
+int raft_answer(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *from, struct booth_site *leader,
+ struct boothc_ticket_msg *msg);
int new_election(struct ticket_config *tk,
struct booth_site *new_leader, int update_term, cmd_reason_t reason);
-void elections_end(struct ticket_config *tk);
+void elections_end(struct booth_config *conf, struct ticket_config *tk);
#endif /* _RAFT_H */
diff --git a/src/request.c b/src/request.c
index 2503f6c..d7ff5f9 100644
--- a/src/request.c
+++ b/src/request.c
@@ -1,83 +1,84 @@
/*
* Copyright (C) 2015 Dejan Muhamedagic <dejan@hello-penguin.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include <errno.h>
#include <stdio.h>
#include <assert.h>
#include <glib.h>
#include "booth.h"
#include "ticket.h"
#include "request.h"
#include "log.h"
static GList *req_l = NULL;
static int req_id_cnt;
/* add request to the queue; it is up to the caller to manage
* memory for the three parameters
*/
void *add_req(
struct ticket_config *tk,
struct client *req_client,
struct boothc_ticket_msg *msg)
{
struct request *rp;
rp = g_new(struct request, 1);
if (!rp)
return NULL;
rp->id = req_id_cnt++;
rp->tk = tk;
rp->client_fd = req_client->fd;
rp->msg = msg;
req_l = g_list_append(req_l, rp);
return rp;
}
int get_req_id(const void *rp)
{
if (!rp)
return -1;
return ((struct request *)rp)->id;
}
static void del_req(GList *lp)
{
if (!lp)
return;
req_l = g_list_delete_link(req_l, lp);
}
-void foreach_tkt_req(struct ticket_config *tk, req_fp f)
+void foreach_tkt_req(struct booth_config *conf, struct ticket_config *tk,
+ req_fp f)
{
GList *lp, *next;
struct request *rp;
lp = g_list_first(req_l);
while (lp) {
next = g_list_next(lp);
rp = (struct request *)lp->data;
if (rp->tk == tk &&
- (f)(rp->tk, rp->client_fd, rp->msg) == 0) {
+ (f)(conf, rp->tk, rp->client_fd, rp->msg) == 0) {
log_debug("remove request for client %d", rp->client_fd);
del_req(lp); /* don't need this request anymore */
}
lp = next;
}
}
diff --git a/src/request.h b/src/request.h
index c014a09..2b25ca7 100644
--- a/src/request.h
+++ b/src/request.h
@@ -1,55 +1,68 @@
/*
* Copyright (C) 2015 Dejan Muhamedagic <dejan@hello-penguin.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _REQUEST_H
#define _REQUEST_H
#include "booth.h"
#include "config.h"
/* Requests are coming from clients and get queued in a
* round-robin queue (fixed size)
*
* This is one way to make the server more responsive and less
* dependent on misbehaving clients. The requests are queued and
* later served from the server loop.
*/
struct request {
/** Request ID */
int id;
/** The ticket. */
struct ticket_config *tk;
/** The client which sent the request */
int client_fd;
/** The message containing the request */
void *msg;
};
-typedef int (*req_fp)(
- struct ticket_config *, int, struct boothc_ticket_msg *);
+typedef int (*req_fp)(struct booth_config *, struct ticket_config *, int,
+ struct boothc_ticket_msg *);
void *add_req(struct ticket_config *tk, struct client *req_client,
struct boothc_ticket_msg *msg);
-void foreach_tkt_req(struct ticket_config *tk, req_fp f);
+
+/**
+ * @internal
+ * Handle all pending requests for given ticket using function @p f
+ *
+ * @param[in,out] conf config object to refer to
+ * @param[in] tk ticket at hand
+ * @param[in] f handling function
+ *
+ * @return 1 on success, 0 when not done with the message, yet
+ */
+void foreach_tkt_req(struct booth_config *conf, struct ticket_config *tk,
+ req_fp f);
+
int get_req_id(const void *rp);
#endif /* _REQUEST_H */
diff --git a/src/ticket.c b/src/ticket.c
index c9147d8..e668ce0 100644
--- a/src/ticket.c
+++ b/src/ticket.c
@@ -1,1440 +1,1440 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "b_config.h"
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <inttypes.h>
#include <stdio.h>
#include <assert.h>
#include <time.h>
#ifndef RANGE2RANDOM_GLIB
#include <clplumbing/cl_random.h>
#else
#include "alt/range2random_glib.h"
#endif
#include "ticket.h"
#include "config.h"
#include "pacemaker.h"
#include "inline-fn.h"
#include "log.h"
#include "booth.h"
#include "raft.h"
#include "handler.h"
#include "request.h"
#include "manual.h"
extern int TIME_RES;
/* Untrusted input, must fit (incl. \0) in a buffer of max chars. */
int check_max_len_valid(const char *s, int max)
{
for (int i = 0; i < max; i++) {
if (s[i] == 0) {
return 1;
}
}
return 0;
}
int find_ticket_by_name(struct booth_config *conf, const char *ticket,
struct ticket_config **found)
{
struct ticket_config *tk;
int i;
if (found) {
*found = NULL;
}
FOREACH_TICKET(conf, i, tk) {
if (strncmp(tk->name, ticket, sizeof(tk->name))) {
continue;
}
if (found) {
*found = tk;
}
return 1;
}
return 0;
}
int check_ticket(struct booth_config *conf, char *ticket,
struct ticket_config **found)
{
if (found) {
*found = NULL;
}
if (conf == NULL) {
return 0;
}
if (!check_max_len_valid(ticket, sizeof(conf->ticket[0].name))) {
return 0;
}
return find_ticket_by_name(conf, ticket, found);
}
/* is it safe to commit the grant?
* if we didn't hear from all sites on the initial grant, we may
* need to delay the commit
*
* TODO: investigate possibility to devise from history whether a
* missing site could be holding a ticket or not
*/
static int ticket_dangerous(struct ticket_config *tk)
{
int tdiff;
/* we may be invoked often, don't spam the log unnecessarily
*/
static int no_log_delay_msg;
if (!is_time_set(&tk->delay_commit)) {
return 0;
}
if (is_past(&tk->delay_commit) || all_sites_replied(tk)) {
if (tk->leader == local) {
tk_log_info("%s, committing to CIB",
is_past(&tk->delay_commit) ?
"ticket delay expired" : "all sites replied");
}
time_reset(&tk->delay_commit);
no_log_delay_msg = 0;
return 0;
}
tdiff = time_left(&tk->delay_commit);
tk_log_debug("delay ticket commit for another " intfmt(tdiff));
if (!no_log_delay_msg) {
tk_log_info("delaying ticket commit to CIB for " intfmt(tdiff));
tk_log_info("(or all sites are reached)");
no_log_delay_msg = 1;
}
return 1;
}
int ticket_write(struct ticket_config *tk)
{
if (local->type != SITE) {
return -EINVAL;
}
if (ticket_dangerous(tk)) {
return 1;
}
if (tk->leader == local) {
if (tk->state != ST_LEADER) {
tk_log_info("ticket state not yet consistent, "
"delaying ticket grant to CIB");
return 1;
}
pcmk_handler.grant_ticket(tk);
} else {
pcmk_handler.revoke_ticket(tk);
}
tk->update_cib = 0;
return 0;
}
void save_committed_tkt(struct ticket_config *tk)
{
if (!tk->last_valid_tk) {
tk->last_valid_tk = malloc(sizeof(struct ticket_config));
if (!tk->last_valid_tk) {
log_error("out of memory");
return;
}
}
memcpy(tk->last_valid_tk, tk, sizeof(struct ticket_config));
}
static void ext_prog_failed(struct ticket_config *tk, int start_election)
{
if (!is_manual(tk)) {
/* Give it to somebody else.
* Just send a VOTE_FOR message, so the
* others can start elections. */
if (!leader_and_valid(tk)) {
return;
}
save_committed_tkt(tk);
reset_ticket(tk);
ticket_write(tk);
if (start_election) {
ticket_broadcast(tk, OP_VOTE_FOR, OP_REQ_VOTE, RLT_SUCCESS, OR_LOCAL_FAIL);
}
} else {
/* There is not much we can do now because
* the manual ticket cannot be relocated.
* Just warn the user. */
if (tk->leader != local) {
return;
}
save_committed_tkt(tk);
reset_ticket(tk);
ticket_write(tk);
log_error("external test failed on the specified machine, cannot acquire a manual ticket");
}
}
#define attr_found(geo_ap, ap) \
((geo_ap) && !strcmp((geo_ap)->val, (ap)->attr_val))
int check_attr_prereq(struct ticket_config *tk, grant_type_e grant_type)
{
GList *el;
struct attr_prereq *ap;
struct geo_attr *geo_ap;
for (el = g_list_first(tk->attr_prereqs); el; el = g_list_next(el))
{
ap = (struct attr_prereq *) el->data;
if (ap->grant_type != grant_type) {
continue;
}
geo_ap = (struct geo_attr *) g_hash_table_lookup(tk->attr, ap->attr_name);
switch(ap->op) {
case ATTR_OP_EQ:
if (!attr_found(geo_ap, ap)) {
goto fail;
}
break;
case ATTR_OP_NE:
if (attr_found(geo_ap, ap)) {
goto fail;
}
break;
default:
break;
}
}
return 0;
fail:
tk_log_warn("'%s' attr-prereq failed", ap->attr_name);
return 1;
}
/* do we need to run the external program?
* or we already done that and waiting for the outcome
* or program exited and we can collect the status
* return codes
* 0: no program defined
* RUNCMD_MORE: program forked, results later
* != 0: executing program failed (or some other failure)
*/
static int do_ext_prog(struct ticket_config *tk, int start_election)
{
int rv = 0;
if (!tk_test.path) {
return 0;
}
switch(tk_test.progstate) {
case EXTPROG_IDLE:
rv = run_handler(tk);
if (rv == RUNCMD_ERR) {
tk_log_warn("couldn't run external test, not allowed to acquire ticket");
ext_prog_failed(tk, start_election);
}
break;
case EXTPROG_RUNNING:
/* should never get here, but just in case */
rv = RUNCMD_MORE;
break;
case EXTPROG_EXITED:
rv = tk_test_exit_status(tk);
if (rv) {
ext_prog_failed(tk, start_election);
}
break;
case EXTPROG_IGNORE:
/* nothing to do here */
break;
}
return rv;
}
/* Try to acquire a ticket
* Could be manual grant or after start (if the ticket is granted
* and still valid in the CIB)
* If the external program needs to run, this is run twice, once
* to start the program, and then to get the result and start
* elections.
*/
static int acquire_ticket(struct ticket_config *tk, cmd_reason_t reason)
{
int rv;
if (reason == OR_ADMIN && check_attr_prereq(tk, GRANT_MANUAL)) {
return RLT_ATTR_PREREQ;
}
switch(do_ext_prog(tk, 0)) {
case 0:
/* everything fine */
break;
case RUNCMD_MORE:
/* need to wait for the outcome before starting elections */
return 0;
default:
return RLT_EXT_FAILED;
}
if (is_manual(tk)) {
rv = manual_selection(tk, local, 1, reason);
} else {
rv = new_election(tk, local, 1, reason);
}
return rv ? RLT_SYNC_FAIL : 0;
}
/** Try to get the ticket for the local site.
* */
static int do_grant_ticket(struct ticket_config *tk, int options)
{
int rv;
tk_log_info("granting ticket");
if (tk->leader == local) {
return RLT_SUCCESS;
}
if (is_owned(tk)) {
if (is_manual(tk) && (options & OPT_IMMEDIATE)) {
/* -F flag has been used while granting a manual ticket.
* The ticket will be granted and may end up being granted
* on multiple sites */
tk_log_warn("manual ticket forced to be granted! be aware that "
"you may end up having two sites holding the same manual "
"ticket! revoke the ticket from the unnecessary site!");
} else {
return RLT_OVERGRANT;
}
}
set_future_time(&tk->delay_commit, tk->term_duration + tk->acquire_after);
if (options & OPT_IMMEDIATE) {
tk_log_warn("granting ticket immediately! If there are "
"unreachable sites, _hope_ you are sure that they don't "
"have the ticket!");
time_reset(&tk->delay_commit);
}
rv = acquire_ticket(tk, OR_ADMIN);
if (rv) {
time_reset(&tk->delay_commit);
return rv;
} else {
return RLT_MORE;
}
}
static void start_revoke_ticket(struct ticket_config *tk)
{
tk_log_info("revoking ticket");
save_committed_tkt(tk);
reset_ticket_and_set_no_leader(tk);
ticket_write(tk);
ticket_broadcast(tk, OP_REVOKE, OP_ACK, RLT_SUCCESS, OR_ADMIN);
}
/** Ticket revoke.
* Only to be started from the leader. */
static int do_revoke_ticket(struct ticket_config *tk)
{
if (tk->acks_expected) {
tk_log_info("delay ticket revoke until the current operation finishes");
set_next_state(tk, ST_INIT);
return RLT_MORE;
} else {
start_revoke_ticket(tk);
return RLT_SUCCESS;
}
}
static int number_sites_marked_as_granted(struct booth_config *conf,
struct ticket_config *tk)
{
int i, result = 0;
struct booth_site *ignored __attribute__((unused));
FOREACH_NODE(conf, i, ignored) {
result += tk->sites_where_granted[i];
}
return result;
}
static int list_ticket(struct booth_config *conf, char **pdata)
{
GString *s = NULL;
struct ticket_config *tk;
struct booth_site *site;
char timeout_str[64];
char *pending_str = NULL;
int i, site_index;
time_t ts;
s = g_string_sized_new(BUFSIZ);
if (s == NULL) {
return -ENOMEM;
}
FOREACH_TICKET(conf, i, tk) {
if (!is_manual(tk) && is_time_set(&tk->term_expires)) {
/* Manual tickets doesn't have term_expires defined */
ts = wall_ts(&tk->term_expires);
strftime(timeout_str, sizeof(timeout_str), "%F %T",
localtime(&ts));
} else {
strcpy(timeout_str, "INF");
}
if (tk->leader == local && is_time_set(&tk->delay_commit) &&
!is_past(&tk->delay_commit)) {
char until_str[64];
int rc;
ts = wall_ts(&tk->delay_commit);
strftime(until_str, sizeof(until_str), "%F %T",
localtime(&ts));
rc = asprintf(&pending_str, " (commit pending until %s)",
until_str);
if (rc < 0) {
g_string_free(s, TRUE);
return -ENOMEM;
}
}
g_string_append_printf(s, "ticket: %s, leader: %s", tk->name,
ticket_leader_string(tk));
if (is_owned(tk)) {
g_string_append_printf(s, ", expires: %s", timeout_str);
if (pending_str != NULL) {
g_string_append(s, pending_str);
}
}
if (is_manual(tk)) {
g_string_append(s, " [manual mode]");
}
g_string_append(s, "\n");
if (pending_str != NULL) {
free(pending_str);
pending_str = NULL;
}
}
FOREACH_TICKET(conf, i, tk) {
int multiple_grant_warning_length = number_sites_marked_as_granted(conf, tk);
if (multiple_grant_warning_length <= 1) {
continue;
}
g_string_append_printf(s, "\nWARNING: The ticket %s is granted to multiple sites: ",
tk->name);
FOREACH_NODE(conf, site_index, site) {
if (tk->sites_where_granted[site_index] <= 0) {
continue;
}
g_string_append(s, site_string(site));
multiple_grant_warning_length--;
if (multiple_grant_warning_length > 0) {
g_string_append(s, ", ");
}
}
g_string_append(s, ". Revoke the ticket from the faulty sites.\n");
}
*pdata = strdup(s->str);
g_string_free(s, TRUE);
if (*pdata == NULL) {
return -ENOMEM;
}
return 0;
}
void disown_ticket(struct ticket_config *tk)
{
set_leader(tk, NULL);
tk->is_granted = 0;
get_time(&tk->term_expires);
}
void reset_ticket(struct ticket_config *tk)
{
ignore_ext_test(tk);
disown_ticket(tk);
no_resends(tk);
set_state(tk, ST_INIT);
set_next_state(tk, 0);
tk->voted_for = NULL;
}
void reset_ticket_and_set_no_leader(struct ticket_config *tk)
{
mark_ticket_as_revoked_from_leader(tk);
reset_ticket(tk);
tk->leader = no_leader;
tk_log_debug("ticket leader set to no_leader");
}
static void log_reacquire_reason(struct ticket_config *tk)
{
int valid;
const char *where_granted = NULL;
char buff[75];
valid = is_time_set(&tk->term_expires) && !is_past(&tk->term_expires);
if (tk->leader == local) {
where_granted = "granted here";
} else {
snprintf(buff, sizeof(buff), "granted to %s",
site_string(tk->leader));
where_granted = buff;
}
if (!valid) {
tk_log_warn("%s, but not valid anymore (will try to reacquire)",
where_granted);
}
if (tk->is_granted && tk->leader != local) {
if (tk->leader && tk->leader != no_leader) {
tk_log_error("granted here, but also %s, "
"that's really too bad (will try to reacquire)",
where_granted);
} else {
tk_log_warn("granted here, but we're "
"not recorded as the grantee (will try to reacquire)");
}
}
}
void update_ticket_state(struct ticket_config *tk, struct booth_site *sender)
{
if (tk->state == ST_CANDIDATE) {
tk_log_info("learned from %s about newer ticket, stopping elections",
site_string(sender));
/* there could be rejects coming from others; don't log
* warnings unnecessarily */
tk->expect_more_rejects = 1;
}
if (tk->leader == local || tk->is_granted) {
/* message from a live leader with valid ticket? */
if (sender == tk->leader && term_time_left(tk)) {
if (tk->is_granted) {
tk_log_warn("ticket was granted here, "
"but it's live at %s (revoking here)",
site_string(sender));
} else {
tk_log_info("ticket live at %s", site_string(sender));
}
disown_ticket(tk);
ticket_write(tk);
set_state(tk, ST_FOLLOWER);
set_next_state(tk, ST_FOLLOWER);
} else {
if (tk->state == ST_CANDIDATE) {
set_state(tk, ST_FOLLOWER);
}
set_next_state(tk, ST_LEADER);
}
} else {
if (!tk->leader || tk->leader == no_leader) {
if (sender) {
tk_log_info("ticket is not granted");
} else {
tk_log_info("ticket is not granted (from CIB)");
}
set_state(tk, ST_INIT);
} else {
if (sender) {
tk_log_info("ticket granted to %s (says %s)",
site_string(tk->leader),
tk->leader == sender ? "they" : site_string(sender));
} else {
tk_log_info("ticket granted to %s (from CIB)",
site_string(tk->leader));
}
set_state(tk, ST_FOLLOWER);
/* just make sure that we check the ticket soon */
set_next_state(tk, ST_FOLLOWER);
}
}
}
int setup_ticket(struct booth_config *conf)
{
struct ticket_config *tk;
int i;
FOREACH_TICKET(conf, i, tk) {
reset_ticket(tk);
if (local->type == SITE) {
if (!pcmk_handler.load_ticket(conf, tk)) {
update_ticket_state(tk, NULL);
}
tk->update_cib = 1;
}
tk_log_info("broadcasting state query");
/* wait until all send their status (or the first
* timeout) */
tk->start_postpone = 1;
ticket_broadcast(tk, OP_STATUS, OP_MY_INDEX, RLT_SUCCESS, 0);
}
return 0;
}
int ticket_answer_list(struct booth_config *conf, int fd)
{
char *data = NULL;
int rv;
struct boothc_hdr_msg hdr;
rv = list_ticket(conf, &data);
if (rv < 0) {
goto out;
}
init_header(&hdr.header, CL_LIST, 0, 0, RLT_SUCCESS, 0, sizeof(hdr) + strlen(data));
rv = send_header_plus(fd, &hdr, data, strlen(data));
out:
if (data != NULL) {
free(data);
}
return rv;
}
int process_client_request(struct booth_config *conf, struct client *req_client,
void *buf)
{
int rv, rc = 1;
struct ticket_config *tk;
int cmd;
struct boothc_ticket_msg omsg;
struct boothc_ticket_msg *msg;
msg = (struct boothc_ticket_msg *)buf;
cmd = ntohl(msg->header.cmd);
if (!check_ticket(conf, msg->ticket.id, &tk)) {
log_warn("client referenced unknown ticket %s", msg->ticket.id);
rv = RLT_INVALID_ARG;
goto reply_now;
}
/* Perform the initial check before granting
* an already granted non-manual ticket */
if (!is_manual(tk) && cmd == CMD_GRANT && is_owned(tk)) {
log_warn("client wants to grant an (already granted!) ticket %s",
msg->ticket.id);
rv = RLT_OVERGRANT;
goto reply_now;
}
if (cmd == CMD_REVOKE && !is_owned(tk)) {
log_info("client wants to revoke a free ticket %s", msg->ticket.id);
rv = RLT_TICKET_IDLE;
goto reply_now;
}
if (cmd == CMD_REVOKE && tk->leader != local) {
tk_log_info("not granted here, redirect to %s",
ticket_leader_string(tk));
rv = RLT_REDIRECT;
goto reply_now;
}
if (cmd == CMD_REVOKE) {
rv = do_revoke_ticket(tk);
} else {
rv = do_grant_ticket(tk, ntohl(msg->header.options));
}
if (rv == RLT_MORE) {
/* client may receive further notifications, save the
* request for further processing */
add_req(tk, req_client, msg);
tk_log_debug("queue request %s for client %d",
state_to_string(cmd), req_client->fd);
rc = 0; /* we're not yet done with the message */
}
reply_now:
init_ticket_msg(&omsg, CL_RESULT, 0, rv, 0, tk);
send_client_msg(req_client->fd, &omsg);
return rc;
}
-int notify_client(struct ticket_config *tk, int client_fd,
- struct boothc_ticket_msg *msg)
+int notify_client(struct booth_config *conf, struct ticket_config *tk,
+ int client_fd, struct boothc_ticket_msg *msg)
{
struct boothc_ticket_msg omsg;
void (*deadfn) (int ci);
int rv, rc, ci;
int cmd, options;
struct client *req_client;
cmd = ntohl(msg->header.cmd);
options = ntohl(msg->header.options);
rv = tk->outcome;
ci = find_client_by_fd(client_fd);
if (ci < 0) {
tk_log_info("client %d (request %s) left before being notified",
client_fd, state_to_string(cmd));
return 0;
}
tk_log_debug("notifying client %d (request %s)",
client_fd, state_to_string(cmd));
init_ticket_msg(&omsg, CL_RESULT, 0, rv, 0, tk);
rc = send_client_msg(client_fd, &omsg);
if (rc == 0 && (rv == RLT_MORE ||
(rv == RLT_CIB_PENDING && (options & OPT_WAIT_COMMIT)))) {
/* more to do here, keep the request */
return 1;
} else {
/* we sent a definite answer or there was a write error, drop
* the client */
if (rc) {
tk_log_debug("failed to notify client %d (request %s)",
client_fd, state_to_string(cmd));
} else {
tk_log_debug("client %d (request %s) got final notification",
client_fd, state_to_string(cmd));
}
req_client = clients + ci;
deadfn = req_client->deadfn;
if (deadfn) {
deadfn(ci);
}
return 0; /* we're done with this request */
}
}
int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd,
cmd_request_t expected_reply, cmd_result_t res,
cmd_reason_t reason)
{
struct boothc_ticket_msg msg;
init_ticket_msg(&msg, cmd, 0, res, reason, tk);
tk_log_debug("broadcasting '%s' (term=%d, valid=%d)",
state_to_string(cmd),
ntohl(msg.ticket.term),
msg_term_time(&msg));
tk->last_request = cmd;
if (expected_reply) {
expect_replies(tk, expected_reply);
}
ticket_activate_timeout(tk);
return transport()->broadcast_auth(&msg, sendmsglen(&msg));
}
/* update the ticket on the leader, write it to the CIB, and
send out the update message to others with the new expiry
time
*/
-int leader_update_ticket(struct ticket_config *tk)
+int leader_update_ticket(struct booth_config *conf, struct ticket_config *tk)
{
int rv = 0, rv2;
timetype now;
if (tk->ticket_updated >= 2) {
return 0;
}
/* for manual tickets, we don't set time expiration */
if (!is_manual(tk) && tk->ticket_updated < 1) {
tk->ticket_updated = 1;
get_time(&now);
copy_time(&now, &tk->last_renewal);
set_future_time(&tk->term_expires, tk->term_duration);
rv = ticket_broadcast(tk, OP_UPDATE, OP_ACK, RLT_SUCCESS, 0);
}
if (tk->ticket_updated < 2) {
rv2 = ticket_write(tk);
switch(rv2) {
case 0:
tk->ticket_updated = 2;
tk->outcome = RLT_SUCCESS;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
break;
case 1:
if (tk->outcome != RLT_CIB_PENDING) {
tk->outcome = RLT_CIB_PENDING;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
}
break;
default:
break;
}
}
return rv;
}
static void log_lost_servers(struct booth_config *conf, struct ticket_config *tk)
{
struct booth_site *n;
int i;
if (tk->retry_number > 1) {
/* log those that we couldn't reach, but do
* that only on the first retry
*/
return;
}
FOREACH_NODE(conf, i, n) {
if (tk->acks_received & n->bitmask) {
continue;
}
tk_log_warn("%s %s didn't acknowledge our %s, "
"will retry %d times",
(n->type == ARBITRATOR ? "arbitrator" : "site"),
site_string(n), state_to_string(tk->last_request),
tk->retries);
}
}
static void resend_msg(struct booth_config *conf, struct ticket_config *tk)
{
struct booth_site *n;
int i;
if (!(tk->acks_received ^ local->bitmask)) {
ticket_broadcast(tk, tk->last_request, 0, RLT_SUCCESS, 0);
} else {
FOREACH_NODE(conf, i, n) {
if (tk->acks_received & n->bitmask) {
continue;
}
n->resend_cnt++;
tk_log_debug("resending %s to %s",
state_to_string(tk->last_request),
site_string(n));
send_msg(tk->last_request, tk, n, NULL);
}
ticket_activate_timeout(tk);
}
}
static void handle_resends(struct booth_config *conf, struct ticket_config *tk)
{
int ack_cnt;
if (++tk->retry_number > tk->retries) {
tk_log_info("giving up on sending retries");
no_resends(tk);
set_ticket_wakeup(tk);
return;
}
/* try to reach some sites again if we just stepped down */
if (tk->last_request == OP_VOTE_FOR) {
tk_log_warn("no answers to our VtFr request to step down (try #%d), "
"we are alone",
tk->retry_number);
goto just_resend;
}
if (!majority_of_bits(tk, tk->acks_received)) {
ack_cnt = count_bits(tk->acks_received) - 1;
if (!ack_cnt) {
tk_log_warn("no answers to our request (try #%d), "
"we are alone",
tk->retry_number);
} else {
tk_log_warn("not enough answers to our request (try #%d): "
"only got %d answers",
tk->retry_number, ack_cnt);
}
} else {
log_lost_servers(conf, tk);
}
just_resend:
resend_msg(conf, tk);
}
static int postpone_ticket_processing(struct ticket_config *tk)
{
extern timetype start_time;
return tk->start_postpone && (-time_left(&start_time) < tk->timeout);
}
#define has_extprog_exited(tk) ((tk)->clu_test.progstate == EXTPROG_EXITED)
-static void process_next_state(struct ticket_config *tk)
+static void process_next_state(struct booth_config *conf, struct ticket_config *tk)
{
int rv;
switch(tk->next_state) {
case ST_LEADER:
if (has_extprog_exited(tk)) {
if (tk->state == ST_LEADER) {
break;
}
rv = acquire_ticket(tk, OR_ADMIN);
if (rv != 0) { /* external program failed */
tk->outcome = rv;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
}
} else {
log_reacquire_reason(tk);
acquire_ticket(tk, OR_REACQUIRE);
}
break;
case ST_INIT:
no_resends(tk);
start_revoke_ticket(tk);
tk->outcome = RLT_SUCCESS;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
break;
/* wanting to be follower is not much of an ambition; no
* processing, just return; don't reset start_postpone until
* we got some replies to status */
case ST_FOLLOWER:
return;
default:
break;
}
tk->start_postpone = 0;
}
static void ticket_lost(struct ticket_config *tk)
{
int reason = OR_TKT_LOST;
if (tk->leader != local) {
tk_log_warn("lost at %s", site_string(tk->leader));
} else {
if (is_ext_prog_running(tk)) {
ext_prog_timeout(tk);
reason = OR_LOCAL_FAIL;
} else {
tk_log_warn("lost majority (revoking locally)");
reason = tk->election_reason ? tk->election_reason : OR_REACQUIRE;
}
}
tk->lost_leader = tk->leader;
save_committed_tkt(tk);
mark_ticket_as_revoked_from_leader(tk);
reset_ticket(tk);
set_state(tk, ST_FOLLOWER);
if (local->type == SITE) {
ticket_write(tk);
schedule_election(tk, reason);
}
}
static void next_action(struct booth_config *conf, struct ticket_config *tk)
{
int rv;
switch(tk->state) {
case ST_INIT:
/* init state, handle resends for ticket revoke */
/* and rebroadcast if stepping down */
/* try to acquire ticket on grant */
if (has_extprog_exited(tk)) {
rv = acquire_ticket(tk, OR_ADMIN);
if (rv != 0) { /* external program failed */
tk->outcome = rv;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
}
} else {
if (tk->acks_expected) {
handle_resends(conf, tk);
}
}
break;
case ST_FOLLOWER:
if (!is_manual(tk)) {
/* leader/ticket lost? and we didn't vote yet */
tk_log_debug("leader: %s, voted_for: %s",
site_string(tk->leader),
site_string(tk->voted_for));
if (tk->leader) {
break;
}
if (!tk->voted_for || !tk->in_election) {
disown_ticket(tk);
if (!new_election(tk, NULL, 1, OR_AGAIN)) {
ticket_activate_timeout(tk);
}
} else {
/* we should restart elections in case nothing
* happens in the meantime */
tk->in_election = 0;
ticket_activate_timeout(tk);
}
} else {
/* for manual tickets, also try to acquire ticket on grant
* in the Follower state (because we may end up having
* two Leaders) */
if (has_extprog_exited(tk)) {
rv = acquire_ticket(tk, OR_ADMIN);
if (rv != 0) { /* external program failed */
tk->outcome = rv;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
}
} else {
/* Otherwise, just send ACKs if needed */
if (tk->acks_expected) {
handle_resends(conf, tk);
}
}
}
break;
case ST_CANDIDATE:
/* elections timed out? */
- elections_end(tk);
+ elections_end(conf, tk);
break;
case ST_LEADER:
/* timeout or ticket renewal? */
if (tk->acks_expected) {
handle_resends(conf, tk);
if (majority_of_bits(tk, tk->acks_received)) {
- leader_update_ticket(tk);
+ leader_update_ticket(conf, tk);
}
} else if (!do_ext_prog(tk, 1)) {
/* this is ticket renewal, run local test */
ticket_broadcast(tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0);
tk->ticket_updated = 0;
}
break;
default:
break;
}
}
static void ticket_cron(struct booth_config *conf, struct ticket_config *tk)
{
/* don't process the tickets too early after start */
if (postpone_ticket_processing(tk)) {
tk_log_debug("ticket processing postponed (start_postpone=%d)",
tk->start_postpone);
/* but run again soon */
ticket_activate_timeout(tk);
return;
}
/* no need for status resends, we hope we got at least one
* my_index back */
if (tk->acks_expected == OP_MY_INDEX) {
no_resends(tk);
}
/* after startup, we need to decide what to do based on the
* current ticket state; tk->next_state has a hint
* also used for revokes which had to be delayed
*/
if (tk->next_state) {
- process_next_state(tk);
+ process_next_state(conf, tk);
goto out;
}
/* Has an owner, has an expiry date, and expiry date in the past?
* For automatic tickets, losing the ticket must happen
* in _every_ state.
*/
if (!is_manual(tk) && is_owned(tk) && is_time_set(&tk->term_expires) &&
is_past(&tk->term_expires)) {
ticket_lost(tk);
goto out;
}
next_action(conf, tk);
out:
tk->next_state = 0;
if (!tk->in_election && tk->update_cib) {
ticket_write(tk);
}
}
void process_tickets(struct booth_config *conf)
{
struct ticket_config *tk;
int i;
timetype last_cron;
FOREACH_TICKET(conf, i, tk) {
if (!has_extprog_exited(tk) &&
is_time_set(&tk->next_cron) && !is_past(&tk->next_cron)) {
continue;
}
tk_log_debug("ticket cron");
copy_time(&tk->next_cron, &last_cron);
ticket_cron(conf, tk);
if (time_cmp(&last_cron, &tk->next_cron, ==)) {
tk_log_debug("nobody set ticket wakeup");
set_ticket_wakeup(tk);
}
}
}
void tickets_log_info(struct booth_config *conf)
{
struct ticket_config *tk;
int i;
time_t ts;
FOREACH_TICKET(conf, i, tk) {
ts = wall_ts(&tk->term_expires);
tk_log_info("state '%s' term %d leader %s expires %-24.24s",
state_to_string(tk->state),
tk->current_term,
ticket_leader_string(tk),
ctime(&ts));
}
}
static void update_acks(struct ticket_config *tk, struct booth_site *sender,
struct booth_site *leader, struct boothc_ticket_msg *msg)
{
uint32_t cmd;
uint32_t req;
cmd = ntohl(msg->header.cmd);
req = ntohl(msg->header.request);
if (req != tk->last_request ||
(tk->acks_expected != cmd && tk->acks_expected != OP_REJECTED)) {
return;
}
/* got an ack! */
tk->acks_received |= sender->bitmask;
if (all_replied(tk) ||
/* we just stepped down, need only one site to start elections */
(cmd == OP_REQ_VOTE && tk->last_request == OP_VOTE_FOR)) {
no_resends(tk);
tk->start_postpone = 0;
set_ticket_wakeup(tk);
}
}
/* read ticket message */
int ticket_recv(struct booth_config *conf, void *buf, struct booth_site *source)
{
struct boothc_ticket_msg *msg;
struct ticket_config *tk;
struct booth_site *leader;
uint32_t leader_u;
msg = (struct boothc_ticket_msg *)buf;
if (!check_ticket(conf, msg->ticket.id, &tk)) {
log_warn("got invalid ticket name %s from %s",
msg->ticket.id, site_string(source));
source->invalid_cnt++;
return -EINVAL;
}
leader_u = ntohl(msg->ticket.leader);
if (!find_site_by_id(conf, leader_u, &leader)) {
tk_log_error("message with unknown leader %u received", leader_u);
source->invalid_cnt++;
return -EINVAL;
}
update_acks(tk, source, leader, msg);
- return raft_answer(tk, source, leader, msg);
+ return raft_answer(conf, tk, source, leader, msg);
}
static void log_next_wakeup(struct ticket_config *tk)
{
int left;
left = time_left(&tk->next_cron);
tk_log_debug("set ticket wakeup in " intfmt(left));
}
/* New vote round; §5.2 */
/* delay the next election start for some random time
* (up to 1 second)
*/
void add_random_delay(struct ticket_config *tk)
{
timetype tv;
interval_add(&tk->next_cron, rand_time(min(1000, tk->timeout)), &tv);
ticket_next_cron_at(tk, &tv);
if (ANYDEBUG) {
log_next_wakeup(tk);
}
}
void set_ticket_wakeup(struct ticket_config *tk)
{
timetype near_future, tv, next_vote;
set_future_time(&near_future, 10);
if (!is_manual(tk)) {
/* At least every hour, perhaps sooner (default) */
tk_log_debug("ticket will be woken up after up to one hour");
ticket_next_cron_in(tk, 3600*TIME_RES);
switch (tk->state) {
case ST_LEADER:
assert(tk->leader == local);
get_next_election_time(tk, &next_vote);
/* If timestamp is in the past, wakeup in
* near future */
if (!is_time_set(&next_vote)) {
tk_log_debug("next ts unset, wakeup soon");
ticket_next_cron_at(tk, &near_future);
} else if (is_past(&next_vote)) {
int tdiff = time_left(&next_vote);
tk_log_debug("next ts in the past " intfmt(tdiff));
ticket_next_cron_at(tk, &near_future);
} else {
ticket_next_cron_at(tk, &next_vote);
}
break;
case ST_CANDIDATE:
assert(is_time_set(&tk->election_end));
ticket_next_cron_at(tk, &tk->election_end);
break;
case ST_INIT:
case ST_FOLLOWER:
/* If there is (or should be) some owner, check on it later on.
* If no one is interested - don't care. */
if (is_owned(tk)) {
interval_add(&tk->term_expires, tk->acquire_after, &tv);
ticket_next_cron_at(tk, &tv);
}
break;
default:
tk_log_error("unknown ticket state: %d", tk->state);
}
if (tk->next_state) {
/* we need to do something soon here */
if (!tk->acks_expected) {
ticket_next_cron_at(tk, &near_future);
} else {
ticket_activate_timeout(tk);
}
}
} else {
/* At least six minutes, to make sure that multi-leader situations
* will be solved promptly.
*/
tk_log_debug("manual ticket will be woken up after up to six minutes");
ticket_next_cron_in(tk, 60 * TIME_RES);
/* For manual tickets, no earlier timeout could be set in a similar
* way as it is done in a switch above for automatic tickets.
* The reason is that term's timeout is INF and no Raft-based elections
* are performed.
*/
}
if (ANYDEBUG) {
log_next_wakeup(tk);
}
}
void schedule_election(struct ticket_config *tk, cmd_reason_t reason)
{
if (local->type != SITE) {
return;
}
tk->election_reason = reason;
get_time(&tk->next_cron);
/* introduce a short delay before starting election */
add_random_delay(tk);
}
int is_manual(struct ticket_config *tk)
{
return (tk->mode == TICKET_MODE_MANUAL) ? 1 : 0;
}
/* Given a state (in host byte order), return a human-readable (char*).
* An array is used so that multiple states can be printed in a single printf(). */
char *state_to_string(uint32_t state_ho)
{
union mu { cmd_request_t s; char c[5]; };
static union mu cache[6] = { { 0 } }, *cur;
static int current = 0;
current ++;
if (current >= sizeof(cache)/sizeof(cache[0])) {
current = 0;
}
cur = cache + current;
cur->s = htonl(state_ho);
/* Shouldn't be necessary, union array is initialized with zeroes, and
* these bytes never get written. */
cur->c[4] = 0;
return cur->c;
}
int send_reject(struct booth_site *dest, struct ticket_config *tk,
cmd_result_t code, struct boothc_ticket_msg *in_msg)
{
int req = ntohl(in_msg->header.cmd);
struct boothc_ticket_msg msg;
tk_log_debug("sending reject to %s", site_string(dest));
init_ticket_msg(&msg, OP_REJECTED, req, code, 0, tk);
return booth_udp_send_auth(dest, &msg, sendmsglen(&msg));
}
int send_msg(int cmd, struct ticket_config *tk, struct booth_site *dest,
struct boothc_ticket_msg *in_msg)
{
int req = 0;
struct ticket_config *valid_tk = tk;
struct boothc_ticket_msg msg;
/* if we want to send the last valid ticket, then if we're in
* the ST_CANDIDATE state, the last valid ticket is in
* tk->last_valid_tk
*/
if (cmd == OP_MY_INDEX) {
if (tk->state == ST_CANDIDATE && tk->last_valid_tk) {
valid_tk = tk->last_valid_tk;
}
tk_log_info("sending status to %s", site_string(dest));
}
if (in_msg) {
req = ntohl(in_msg->header.cmd);
}
init_ticket_msg(&msg, cmd, req, RLT_SUCCESS, 0, valid_tk);
return booth_udp_send_auth(dest, &msg, sendmsglen(&msg));
}
diff --git a/src/ticket.h b/src/ticket.h
index 454ebaf..22e7215 100644
--- a/src/ticket.h
+++ b/src/ticket.h
@@ -1,249 +1,260 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _TICKET_H
#define _TICKET_H
#include <time.h>
#include <sys/time.h>
#include <math.h>
#include "timer.h"
#include "config.h"
#include "log.h"
extern int TIME_RES;
#define DEFAULT_TICKET_EXPIRY (600*TIME_RES)
#define DEFAULT_TICKET_TIMEOUT (5*TIME_RES)
#define DEFAULT_RETRIES 10
#define FOREACH_TICKET(b_, i_, t_) \
for (i_ = 0; \
(t_ = (b_)->ticket + i_, i_ < (b_)->ticket_count); \
i_++)
#define FOREACH_NODE(b_, i_, n_) \
for (i_ = 0; \
(n_ = (b_)->site + i_, i_ < (b_)->site_count); \
i_++)
#define _FOREACH_TICKET(i_, t_) \
for (i_ = 0; \
(t_ = booth_conf->ticket + i_, i_ < booth_conf->ticket_count); \
i_++)
#define _FOREACH_NODE(i_, n_) \
for (i_ = 0; \
(n_ = booth_conf->site + i_, i_ < booth_conf->site_count); \
i_++)
#define set_leader(tk, who) do { \
if (who == NULL) { \
mark_ticket_as_revoked_from_leader(tk); \
} \
\
tk->leader = who; \
tk_log_debug("ticket leader set to %s", ticket_leader_string(tk)); \
\
if (tk->leader) { \
mark_ticket_as_granted(tk, tk->leader); \
} \
} while(0)
#define mark_ticket_as_granted(tk, who) do { \
if (is_manual(tk) && (who->index > -1)) { \
tk->sites_where_granted[who->index] = 1; \
tk_log_debug("manual ticket marked as granted to %s", ticket_leader_string(tk)); \
} \
} while(0)
#define mark_ticket_as_revoked(tk, who) do { \
if (is_manual(tk) && who && (who->index > -1)) { \
tk->sites_where_granted[who->index] = 0; \
tk_log_debug("manual ticket marked as revoked from %s", site_string(who)); \
} \
} while(0)
#define mark_ticket_as_revoked_from_leader(tk) do { \
if (tk->leader) { \
mark_ticket_as_revoked(tk, tk->leader); \
} \
} while(0)
#define set_state(tk, newst) do { \
tk_log_debug("state transition: %s -> %s", \
state_to_string(tk->state), state_to_string(newst)); \
tk->state = newst; \
} while(0)
#define set_next_state(tk, newst) do { \
if (!(newst)) tk_log_debug("next state reset"); \
else tk_log_debug("next state set to %s", state_to_string(newst)); \
tk->next_state = newst; \
} while(0)
#define is_term_invalid(tk, term) \
((tk)->last_valid_tk && (tk)->last_valid_tk->current_term > (term))
void save_committed_tkt(struct ticket_config *tk);
void disown_ticket(struct ticket_config *tk);
/**
* @internal
* Like @find_ticket_by_name, but perform sanity checks on the found ticket
*
* @param[in,out] conf config object to refer to
* @param[in] ticket name of the ticket to search for
* @param[out] found place the reference here when found
*
* @return 0 on failure, see @find_ticket_by_name otherwise
*/
int check_ticket(struct booth_config *conf, char *ticket, struct ticket_config **tc);
int grant_ticket(struct ticket_config *ticket);
int revoke_ticket(struct ticket_config *ticket);
/**
* @internal
* Second stage of incoming datagram handling (after authentication)
*
* @param[in,out] conf config object to refer to
* @param[in] buf raw message to act upon
* @param[in] source member originating this message
*
* @return 0 on success or negative value (-1 or -errno) on error
*/
int ticket_recv(struct booth_config *conf, void *buf, struct booth_site *source);
void reset_ticket(struct ticket_config *tk);
void reset_ticket_and_set_no_leader(struct ticket_config *tk);
void update_ticket_state(struct ticket_config *tk, struct booth_site *sender);
/**
* @internal
* Broadcast the initial state query
*
* @param[in,out] conf config object to use as a starting point
*
* @return 0 (for the time being)
*/
int setup_ticket(struct booth_config *conf);
int check_max_len_valid(const char *s, int max);
/**
* @internal
* Find a ticket based on a given name
*
* @param[in,out] conf config object to refer to
* @param[in] ticket name of the ticket to search for
* @param[out] found place the reference here when found
*
* @return see @list_ticket and @send_header_plus
*/
int find_ticket_by_name(struct booth_config *conf,
const char *ticket, struct ticket_config **found);
void set_ticket_wakeup(struct ticket_config *tk);
/**
* @internal
* Implementation of ticket listing
*
* @param[in,out] conf config object to refer to
* @param[in] fd file descriptor of the socket to respond to
*
* @return see @list_ticket and @send_header_plus
*/
int ticket_answer_list(struct booth_config *conf, int fd);
/**
* @internal
* Process request from the client (as opposed to the peer daemon)
*
* @param[in,out] conf config object to refer to
* @param[in] req_client client structure of the sender
* @param[in] buf client message
*
* @return 1 on success, or 0 when not yet done with the message
*/
int process_client_request(struct booth_config *conf, struct client *req_client,
void *buf);
int ticket_write(struct ticket_config *tk);
/**
* @internal
* Mainloop of booth ticket handling
*
* @param[in,out] conf config object to refer to
*/
void process_tickets(struct booth_config *conf);
/**
* @internal
* Log properties of all tickets
*
* @param[in,out] conf config object to refer to
*/
void tickets_log_info(struct booth_config *conf);
char *state_to_string(uint32_t state_ho);
int send_reject(struct booth_site *dest, struct ticket_config *tk,
cmd_result_t code, struct boothc_ticket_msg *in_msg);
int send_msg (int cmd, struct ticket_config *tk,
struct booth_site *dest, struct boothc_ticket_msg *in_msg);
-int notify_client(struct ticket_config *tk, int client_fd,
- struct boothc_ticket_msg *msg);
+
+/**
+ * @internal
+ * Notify client at particular socket, regarding particular ticket
+ *
+ * @param[in,out] conf config object to refer to
+ * @param[in] tk ticket at hand
+ * @param[in] fd file descriptor of the socket to respond to
+ * @param[in] msg input message being responded to
+ */
+int notify_client(struct booth_config *conf, struct ticket_config *tk,
+ int client_fd, struct boothc_ticket_msg *msg);
+
int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_request_t expected_reply, cmd_result_t res, cmd_reason_t reason);
-int leader_update_ticket(struct ticket_config *tk);
+int leader_update_ticket(struct booth_config *conf, struct ticket_config *tk);
void add_random_delay(struct ticket_config *tk);
void schedule_election(struct ticket_config *tk, cmd_reason_t reason);
int is_manual(struct ticket_config *tk);
int check_attr_prereq(struct ticket_config *tk, grant_type_e grant_type);
static inline void ticket_next_cron_at(struct ticket_config *tk, timetype *when)
{
copy_time(when, &tk->next_cron);
}
static inline void ticket_next_cron_in(struct ticket_config *tk, int interval)
{
timetype tv;
set_future_time(&tv, interval);
ticket_next_cron_at(tk, &tv);
}
static inline void ticket_activate_timeout(struct ticket_config *tk)
{
/* TODO: increase timeout when no answers */
tk_log_debug("activate ticket timeout in %d", tk->timeout);
ticket_next_cron_in(tk, tk->timeout);
}
#endif /* _TICKET_H */

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 3:28 PM (19 h, 49 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1018847
Default Alt Text
(75 KB)

Event Timeline