Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F1842311
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
75 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/src/raft.c b/src/raft.c
index ec0cf99..f63f14e 100644
--- a/src/raft.c
+++ b/src/raft.c
@@ -1,1011 +1,1002 @@
/*
* Copyright (C) 2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include <stdlib.h>
#include <inttypes.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include "booth.h"
#include "timer.h"
#include "config.h"
#include "transport.h"
#include "inline-fn.h"
#include "raft.h"
#include "ticket.h"
#include "request.h"
#include "log.h"
#include "manual.h"
inline static void clear_election(struct ticket_config *tk)
{
int i;
struct booth_site *site;
tk_log_debug("clear election");
tk->votes_received = 0;
_FOREACH_NODE(i, site) {
tk->votes_for[site->index] = NULL;
}
}
inline static void record_vote(struct ticket_config *tk,
struct booth_site *who,
struct booth_site *vote)
{
tk_log_debug("site %s votes for %s",
site_string(who),
site_string(vote));
if (!tk->votes_for[who->index]) {
tk->votes_for[who->index] = vote;
tk->votes_received |= who->bitmask;
} else {
if (tk->votes_for[who->index] != vote)
tk_log_warn("%s voted previously "
"for %s and now wants to vote for %s (ignored)",
site_string(who),
site_string(tk->votes_for[who->index]),
site_string(vote));
}
}
static void update_term_from_msg(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
uint32_t i;
i = ntohl(msg->ticket.term);
/* if we failed to start the election, then accept the term
* from the leader
* */
if (tk->state == ST_CANDIDATE) {
tk->current_term = i;
} else {
tk->current_term = max(i, tk->current_term);
}
}
static void set_ticket_expiry(struct ticket_config *tk,
int duration)
{
set_future_time(&tk->term_expires, duration);
}
static void update_ticket_from_msg(struct ticket_config *tk,
struct booth_site *sender,
struct boothc_ticket_msg *msg)
{
int duration;
tk_log_info("updating from %s (%d/%d)",
site_string(sender),
ntohl(msg->ticket.term), msg_term_time(msg));
duration = min(tk->term_duration, msg_term_time(msg));
set_ticket_expiry(tk, duration);
update_term_from_msg(tk, msg);
}
static void copy_ticket_from_msg(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
set_ticket_expiry(tk, msg_term_time(msg));
tk->current_term = ntohl(msg->ticket.term);
}
static void become_follower(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
copy_ticket_from_msg(tk, msg);
set_state(tk, ST_FOLLOWER);
time_reset(&tk->delay_commit);
tk->in_election = 0;
/* if we're following and the ticket was granted here
* then commit to CIB right away (we're probably restarting)
*/
if (tk->is_granted) {
disown_ticket(tk);
ticket_write(tk);
}
}
static void won_elections(struct ticket_config *tk)
{
set_leader(tk, local);
set_state(tk, ST_LEADER);
set_ticket_expiry(tk, tk->term_duration);
time_reset(&tk->election_end);
tk->voted_for = NULL;
if (is_time_set(&tk->delay_commit) && all_sites_replied(tk)) {
time_reset(&tk->delay_commit);
tk_log_debug("reset delay commit as all sites replied");
}
save_committed_tkt(tk);
ticket_broadcast(tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0);
tk->ticket_updated = 0;
}
/* if more than one member got the same (and maximum within that
* election) number of votes, then that is a tie
*/
static int is_tie(struct ticket_config *tk)
{
int i;
struct booth_site *v;
int count[MAX_NODES] = { 0, };
int max_votes = 0, max_cnt = 0;
for (i = 0; i < booth_conf->site_count; i++) {
v = tk->votes_for[i];
if (!v)
continue;
count[v->index]++;
max_votes = max(max_votes, count[v->index]);
}
for (i = 0; i < booth_conf->site_count; i++) {
if (count[i] == max_votes)
max_cnt++;
}
return max_cnt > 1;
}
static struct booth_site *majority_votes(struct ticket_config *tk)
{
int i, n;
struct booth_site *v;
int count[MAX_NODES] = { 0, };
for (i = 0; i < booth_conf->site_count; i++) {
v = tk->votes_for[i];
if (!v || v == no_leader)
continue;
n = v->index;
count[n]++;
tk_log_debug("Majority: %d %s wants %d %s => %d",
i, site_string(&booth_conf->site[i]),
n, site_string(v),
count[n]);
if (count[n]*2 <= booth_conf->site_count)
continue;
tk_log_debug("Majority reached: %d of %d for %s",
count[n], booth_conf->site_count,
site_string(v));
return v;
}
return NULL;
}
-void elections_end(struct ticket_config *tk)
+void elections_end(struct booth_config *conf, struct ticket_config *tk)
{
struct booth_site *new_leader;
if (is_past(&tk->election_end)) {
/* This is previous election timed out */
tk_log_info("elections finished");
}
tk->in_election = 0;
new_leader = majority_votes(tk);
if (new_leader == local) {
won_elections(tk);
tk_log_info("granted successfully here");
} else if (new_leader) {
tk_log_info("ticket granted at %s",
site_string(new_leader));
} else {
tk_log_info("nobody won elections, new elections");
tk->outcome = RLT_MORE;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
if (!new_election(tk, NULL, is_tie(tk) ? 2 : 0, OR_AGAIN)) {
ticket_activate_timeout(tk);
}
}
}
static int newer_term(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg,
int in_election)
{
uint32_t term;
/* it may happen that we hear about our newer term */
if (leader == local)
return 0;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (term > tk->current_term) {
set_state(tk, ST_FOLLOWER);
if (!in_election) {
set_leader(tk, leader);
tk_log_info("from %s: higher term %d vs. %d, following %s",
site_string(sender),
term, tk->current_term,
ticket_leader_string(tk));
} else {
tk_log_debug("from %s: higher term %d vs. %d (election)",
site_string(sender),
term, tk->current_term);
}
tk->current_term = term;
return 1;
}
return 0;
}
static int msg_term_invalid(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg)
{
uint32_t term;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (is_term_invalid(tk, term)) {
tk_log_info("got invalid term from %s "
"(%d), ignoring", site_string(sender), term);
return 1;
}
return 0;
}
static int term_too_low(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg)
{
uint32_t term;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (term < tk->current_term) {
tk_log_info("sending reject to %s, its term too low "
"(%d vs. %d)", site_string(sender),
term, tk->current_term
);
send_reject(sender, tk, RLT_TERM_OUTDATED, msg);
return 1;
}
return 0;
}
/* For follower. */
static int answer_HEARTBEAT (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
term = ntohl(msg->ticket.term);
tk_log_debug("heartbeat from leader: %s, have %s; term %d vs %d",
site_string(leader), ticket_leader_string(tk),
term, tk->current_term);
if (term < tk->current_term) {
if (sender == tk->leader) {
tk_log_info("trusting leader %s with a lower term (%d vs %d)",
site_string(leader), term, tk->current_term);
} else if (is_owned(tk)) {
tk_log_warn("different leader %s with a lower term "
"(%d vs %d), sending reject",
site_string(leader), term, tk->current_term);
return send_reject(sender, tk, RLT_TERM_OUTDATED, msg);
}
}
/* got heartbeat, no rejects expected anymore */
tk->expect_more_rejects = 0;
/* Needed? */
newer_term(tk, sender, leader, msg, 0);
become_follower(tk, msg);
/* Racy??? */
assert(sender == leader || !leader);
set_leader(tk, leader);
/* Ack the heartbeat (we comply). */
return send_msg(OP_ACK, tk, sender, msg);
}
static int process_UPDATE (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
if (is_owned(tk) && sender != tk->leader) {
tk_log_warn("different leader %s wants to update "
"our ticket, sending reject",
site_string(leader));
mark_ticket_as_granted(tk, sender);
return send_reject(sender, tk, RLT_TERM_OUTDATED, msg);
}
tk_log_debug("leader %s wants to update our ticket",
site_string(leader));
become_follower(tk, msg);
set_leader(tk, leader);
ticket_write(tk);
/* run ticket_cron if the ticket expires */
set_ticket_wakeup(tk);
return send_msg(OP_ACK, tk, sender, msg);
}
static int process_REVOKE (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int rv;
if (tk->state == ST_INIT && tk->leader == no_leader) {
/* assume that our ack got lost */
rv = send_msg(OP_ACK, tk, sender, msg);
} else if (tk->leader != sender) {
if (!is_manual(tk)) {
tk_log_error("%s wants to revoke ticket, "
"but it is not granted there (ignoring)",
site_string(sender));
return -1;
} else {
rv = process_REVOKE_for_manual_ticket(tk, sender, msg);
// Ticket data stored in this site is not modified. This means
// that this site will still follow another leader (the one which
// has not been revoked) or be a leader itself.
}
} else if (tk->state != ST_FOLLOWER) {
tk_log_error("unexpected ticket revoke from %s "
"(in state %s) (ignoring)",
site_string(sender),
state_to_string(tk->state));
return -1;
} else {
tk_log_info("%s revokes ticket",
site_string(tk->leader));
save_committed_tkt(tk);
reset_ticket_and_set_no_leader(tk);
ticket_write(tk);
rv = send_msg(OP_ACK, tk, sender, msg);
}
return rv;
}
/* For leader. */
-static int process_ACK(
- struct ticket_config *tk,
- struct booth_site *sender,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg
- )
+static int process_ACK(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *sender, struct booth_site *leader,
+ struct boothc_ticket_msg *msg)
{
uint32_t term;
int req;
term = ntohl(msg->ticket.term);
if (newer_term(tk, sender, leader, msg, 0)) {
/* unexpected higher term */
tk_log_warn("got higher term from %s (%d vs. %d)",
site_string(sender),
term, tk->current_term);
return 0;
}
/* Don't send a reject. */
if (term < tk->current_term) {
/* Doesn't know what he's talking about - perhaps
* doesn't receive our packets? */
tk_log_warn("unexpected term "
"from %s (%d vs. %d) (ignoring)",
site_string(sender),
term, tk->current_term);
return 0;
}
/* if the ticket is to be revoked, further processing is not
* interesting (and dangerous) */
if (tk->next_state == ST_INIT || tk->state == ST_INIT)
return 0;
req = ntohl(msg->header.request);
if ((req == OP_UPDATE || req == OP_HEARTBEAT) &&
term == tk->current_term &&
leader == tk->leader) {
if (majority_of_bits(tk, tk->acks_received)) {
/* OK, at least half of the nodes are reachable;
* Update the ticket and send update messages out
*/
- return leader_update_ticket(tk);
+ return leader_update_ticket(conf, tk);
}
}
return 0;
}
-static int process_VOTE_FOR(
- struct ticket_config *tk,
- struct booth_site *sender,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg
- )
+static int process_VOTE_FOR(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *sender, struct booth_site *leader,
+ struct boothc_ticket_msg *msg)
{
if (leader == no_leader) {
/* leader wants to step down? */
if (sender == tk->leader &&
(tk->state == ST_FOLLOWER || tk->state == ST_CANDIDATE)) {
tk_log_info("%s wants to give the ticket away (ticket release)",
site_string(tk->leader));
save_committed_tkt(tk);
reset_ticket(tk);
set_state(tk, ST_FOLLOWER);
if (local->type == SITE) {
ticket_write(tk);
schedule_election(tk, OR_STEPDOWN);
}
} else {
tk_log_info("%s votes for none, ignoring (duplicate ticket release?)",
site_string(sender));
}
return 0;
}
if (tk->state != ST_CANDIDATE) {
/* lost candidate status, somebody rejected our proposal */
tk_log_info("candidate status lost, ignoring VtFr from %s",
site_string(sender));
return 0;
}
if (term_too_low(tk, sender, leader, msg))
return 0;
if (newer_term(tk, sender, leader, msg, 0)) {
clear_election(tk);
}
record_vote(tk, sender, leader);
/* only if all voted can we take the ticket now, otherwise
* wait for timeout in ticket_cron */
if (!tk->acks_expected) {
/* §5.2 */
- elections_end(tk);
+ elections_end(conf, tk);
}
return 0;
}
static int process_REJECTED(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t rv;
rv = ntohl(msg->header.result);
if (tk->state == ST_CANDIDATE &&
leader == local) {
/* the sender has us as the leader (!)
* the elections will time out, then we can try again
*/
tk_log_warn("ticket was granted to us "
"(and we didn't know)");
tk->expect_more_rejects = 1;
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_OUTDATED) {
tk_log_warn("ticket outdated (term %d), granted to %s",
ntohl(msg->ticket.term),
site_string(leader)
);
set_leader(tk, leader);
tk->expect_more_rejects = 1;
become_follower(tk, msg);
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_STILL_VALID) {
if (tk->lost_leader == leader) {
if (tk->election_reason == OR_TKT_LOST) {
tk_log_warn("%s still has the ticket valid, "
"we'll backup a bit",
site_string(sender));
} else {
tk_log_warn("%s unexpectedly rejects elections",
site_string(sender));
}
} else {
tk_log_warn("ticket was granted to %s "
"(and we didn't know)",
site_string(leader));
}
set_leader(tk, leader);
become_follower(tk, msg);
tk->expect_more_rejects = 1;
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_YOU_OUTDATED) {
set_leader(tk, leader);
tk->expect_more_rejects = 1;
if (leader && leader != no_leader) {
tk_log_warn("our ticket is outdated, granted to %s",
site_string(leader));
become_follower(tk, msg);
} else {
tk_log_warn("our ticket is outdated and revoked");
update_ticket_from_msg(tk, sender, msg);
set_state(tk, ST_INIT);
}
return 0;
}
if (!tk->expect_more_rejects) {
tk_log_warn("from %s: in state %s, got %s (unexpected reject)",
site_string(sender),
state_to_string(tk->state),
state_to_string(rv));
}
return 0;
}
static int ticket_seems_ok(struct ticket_config *tk)
{
int left;
left = term_time_left(tk);
if (!left)
return 0; /* quite sure */
if (tk->state == ST_CANDIDATE)
return 0; /* in state of flux */
if (tk->state == ST_LEADER)
return 1; /* quite sure */
if (tk->state == ST_FOLLOWER &&
left >= tk->term_duration/3)
return 1; /* almost quite sure */
return 0;
}
static int test_reason(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int reason;
reason = ntohl(msg->header.reason);
if (reason == OR_TKT_LOST) {
if (tk->state == ST_INIT &&
tk->leader == no_leader) {
tk_log_warn("%s claims that the ticket is lost, "
"but it's in %s state (reject sent)",
site_string(sender),
state_to_string(tk->state)
);
return RLT_YOU_OUTDATED;
}
if (ticket_seems_ok(tk)) {
tk_log_warn("%s claims that the ticket is lost, "
"but it is ok here (reject sent)",
site_string(sender));
return RLT_TERM_STILL_VALID;
}
}
return 0;
}
/* §5.2 */
static int answer_REQ_VOTE(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int valid;
struct boothc_ticket_msg omsg;
cmd_result_t inappr_reason;
int reason;
inappr_reason = test_reason(tk, sender, leader, msg);
if (inappr_reason)
return send_reject(sender, tk, inappr_reason, msg);
valid = term_time_left(tk);
reason = ntohl(msg->header.reason);
/* valid tickets are not allowed only if the sender thinks
* the ticket got lost */
if (sender != tk->leader && valid && reason != OR_STEPDOWN) {
tk_log_warn("election from %s with reason %s rejected "
"(we have %s as ticket owner), ticket still valid for %ds",
site_string(sender), state_to_string(reason),
site_string(tk->leader), valid);
return send_reject(sender, tk, RLT_TERM_STILL_VALID, msg);
}
if (term_too_low(tk, sender, leader, msg))
return 0;
/* set this, so that we know not to send status for the
* ticket */
tk->in_election = 1;
/* reset ticket's leader on not valid tickets */
if (!valid)
set_leader(tk, NULL);
/* if it's a newer term or ... */
if (newer_term(tk, sender, leader, msg, 1)) {
clear_election(tk);
goto vote_for_sender;
}
/* ... we didn't vote yet, then vote for the sender */
/* §5.2, §5.4 */
if (!tk->voted_for) {
vote_for_sender:
tk->voted_for = sender;
record_vote(tk, sender, leader);
}
init_ticket_msg(&omsg, OP_VOTE_FOR, OP_REQ_VOTE, RLT_SUCCESS, 0, tk);
omsg.ticket.leader = htonl(get_node_id(tk->voted_for));
return booth_udp_send_auth(sender, &omsg, sendmsglen(&omsg));
}
#define is_reason(r, tk) \
(reason == (r) || (reason == OR_AGAIN && (tk)->election_reason == (r)))
int new_election(struct ticket_config *tk,
struct booth_site *preference, int update_term, cmd_reason_t reason)
{
struct booth_site *new_leader;
if (local->type != SITE)
return 0;
if ((is_reason(OR_TKT_LOST, tk) || is_reason(OR_STEPDOWN, tk)) &&
check_attr_prereq(tk, GRANT_AUTO)) {
tk_log_info("attribute prerequisite not met, "
"not starting elections");
return 0;
}
/* elections were already started, but not yet finished/timed out */
if (is_time_set(&tk->election_end) && !is_past(&tk->election_end))
return 1;
if (ANYDEBUG) {
int tdiff;
if (is_time_set(&tk->election_end)) {
tdiff = -time_left(&tk->election_end);
tk_log_debug("starting elections, previous finished since " intfmt(tdiff));
} else {
tk_log_debug("starting elections");
}
tk_log_debug("elections caused by %s %s",
state_to_string(reason),
reason == OR_AGAIN ? state_to_string(tk->election_reason) : "" );
}
/* §5.2 */
/* If there was _no_ answer, don't keep incrementing the term number
* indefinitely. If there was no peer, there'll probably be no one
* listening now either. However, we don't know if we were
* invoked due to a timeout (caller does).
*/
/* increment the term only if either the current term was
* valid or if there was a tie (in that case update_term > 1)
*/
if ((update_term > 1) ||
(update_term && tk->last_valid_tk &&
tk->last_valid_tk->current_term >= tk->current_term)) {
/* save the previous term, we may need to send out the
* MY_INDEX message */
if (tk->state != ST_CANDIDATE) {
save_committed_tkt(tk);
}
tk->current_term++;
}
set_future_time(&tk->election_end, tk->timeout);
tk->in_election = 1;
tk_log_info("starting new election (term=%d)",
tk->current_term);
clear_election(tk);
new_leader = preference ? preference : local;
record_vote(tk, local, new_leader);
tk->voted_for = new_leader;
set_state(tk, ST_CANDIDATE);
/* some callers may want just to repeat on timeout */
if (reason == OR_AGAIN) {
reason = tk->election_reason;
} else {
tk->election_reason = reason;
}
ticket_broadcast(tk, OP_REQ_VOTE, OP_VOTE_FOR, RLT_SUCCESS, reason);
add_random_delay(tk);
return 0;
}
/* we were a leader and somebody says that they have a more up
* to date ticket
* there was probably connectivity loss
* tricky
*/
static int leader_handle_newer_ticket(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
update_term_from_msg(tk, msg);
if (leader != no_leader && leader && leader != local) {
/* eek, two leaders, split brain */
/* normally shouldn't happen; run election */
tk_log_error("from %s: ticket granted to %s! (revoking locally)",
site_string(sender),
site_string(leader)
);
} else if (term_time_left(tk)) {
/* eek, two leaders, split brain */
/* normally shouldn't happen; run election */
tk_log_error("from %s: ticket granted to %s! (revoking locally)",
site_string(sender),
site_string(leader)
);
}
set_next_state(tk, ST_LEADER);
return 0;
}
/* reply to STATUS */
static int process_MY_INDEX (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int i;
int expired;
expired = !msg_term_time(msg);
/* test against the last valid(!) ticket we have */
i = my_last_term(tk) - ntohl(msg->ticket.term);
if (i > 0) {
/* let them know about our newer ticket */
send_msg(OP_MY_INDEX, tk, sender, msg);
if (tk->state == ST_LEADER) {
tk_log_info("sending ticket update to %s",
site_string(sender));
return send_msg(OP_UPDATE, tk, sender, msg);
}
}
/* we have a newer or equal ticket and theirs is expired,
* nothing more to do here */
if (i >= 0 && expired) {
return 0;
}
if (tk->state == ST_LEADER) {
/* we're the leader, thread carefully */
if (expired) {
/* if their ticket is expired,
* nothing more to do */
return 0;
}
if (i < 0) {
/* they have a newer ticket, trouble if we're already leader
* for it */
tk_log_warn("from %s: more up to date ticket at %s",
site_string(sender),
site_string(leader)
);
return leader_handle_newer_ticket(tk, sender, leader, msg);
} else {
/* we have the ticket and we don't care */
return 0;
}
} else if (tk->state == ST_CANDIDATE) {
if (leader == local) {
/* a belated MY_INDEX, we're already trying to get the
* ticket */
return 0;
}
}
/* their ticket is either newer or not expired, don't
* ignore it */
update_ticket_from_msg(tk, sender, msg);
set_leader(tk, leader);
update_ticket_state(tk, sender);
save_committed_tkt(tk);
set_ticket_wakeup(tk);
return 0;
}
-int raft_answer(
- struct ticket_config *tk,
- struct booth_site *sender,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg
- )
+int raft_answer(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *sender, struct booth_site *leader,
+ struct boothc_ticket_msg *msg)
{
int cmd, req;
int rv;
rv = 0;
cmd = ntohl(msg->header.cmd);
req = ntohl(msg->header.request);
if (req)
tk_log_debug("got %s (req %s) from %s",
state_to_string(cmd),
state_to_string(req),
site_string(sender));
else
tk_log_debug("got %s from %s",
state_to_string(cmd),
site_string(sender));
/* don't process tickets with invalid term
*/
if (cmd != OP_STATUS &&
msg_term_invalid(tk, sender, leader, msg))
return 0;
switch (cmd) {
case OP_REQ_VOTE:
rv = answer_REQ_VOTE(tk, sender, leader, msg);
break;
case OP_VOTE_FOR:
- rv = process_VOTE_FOR(tk, sender, leader, msg);
+ rv = process_VOTE_FOR(conf, tk, sender, leader, msg);
break;
case OP_ACK:
if (tk->leader == local &&
tk->state == ST_LEADER)
- rv = process_ACK(tk, sender, leader, msg);
+ rv = process_ACK(conf, tk, sender, leader, msg);
break;
case OP_HEARTBEAT:
if ((tk->leader != local || !term_time_left(tk)) &&
(tk->state == ST_INIT || tk->state == ST_FOLLOWER ||
tk->state == ST_CANDIDATE))
rv = answer_HEARTBEAT(tk, sender, leader, msg);
else {
tk_log_warn("unexpected message %s, from %s",
state_to_string(cmd),
site_string(sender));
mark_ticket_as_granted(tk, sender);
if (ticket_seems_ok(tk))
send_reject(sender, tk, RLT_TERM_STILL_VALID, msg);
rv = -EINVAL;
}
break;
case OP_UPDATE:
if (((tk->leader != local && tk->leader == leader) || !is_owned(tk)) &&
(tk->state == ST_INIT || tk->state == ST_FOLLOWER ||
tk->state == ST_CANDIDATE)) {
rv = process_UPDATE(tk, sender, leader, msg);
} else {
tk_log_warn("unexpected message %s, from %s",
state_to_string(cmd),
site_string(sender));
if (ticket_seems_ok(tk))
send_reject(sender, tk, RLT_TERM_STILL_VALID, msg);
rv = -EINVAL;
}
break;
case OP_REJECTED:
rv = process_REJECTED(tk, sender, leader, msg);
break;
case OP_REVOKE:
rv = process_REVOKE(tk, sender, leader, msg);
break;
case OP_MY_INDEX:
rv = process_MY_INDEX(tk, sender, leader, msg);
break;
case OP_STATUS:
if (!tk->in_election)
rv = send_msg(OP_MY_INDEX, tk, sender, msg);
break;
default:
tk_log_error("unknown message %s, from %s",
state_to_string(cmd), site_string(sender));
rv = -EINVAL;
}
return rv;
}
diff --git a/src/raft.h b/src/raft.h
index 0e01b48..9e52d8d 100644
--- a/src/raft.h
+++ b/src/raft.h
@@ -1,43 +1,42 @@
/*
* Copyright (C) 2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _RAFT_H
#define _RAFT_H
#include "booth.h"
typedef enum {
ST_INIT = CHAR2CONST('I', 'n', 'i', 't'),
ST_FOLLOWER = CHAR2CONST('F', 'l', 'l', 'w'),
ST_CANDIDATE = CHAR2CONST('C', 'n', 'd', 'i'),
ST_LEADER = CHAR2CONST('L', 'e', 'a', 'd'),
} server_state_e;
struct ticket_config;
-int raft_answer(struct ticket_config *tk,
- struct booth_site *from,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg);
+int raft_answer(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *from, struct booth_site *leader,
+ struct boothc_ticket_msg *msg);
int new_election(struct ticket_config *tk,
struct booth_site *new_leader, int update_term, cmd_reason_t reason);
-void elections_end(struct ticket_config *tk);
+void elections_end(struct booth_config *conf, struct ticket_config *tk);
#endif /* _RAFT_H */
diff --git a/src/request.c b/src/request.c
index 2503f6c..d7ff5f9 100644
--- a/src/request.c
+++ b/src/request.c
@@ -1,83 +1,84 @@
/*
* Copyright (C) 2015 Dejan Muhamedagic <dejan@hello-penguin.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include <errno.h>
#include <stdio.h>
#include <assert.h>
#include <glib.h>
#include "booth.h"
#include "ticket.h"
#include "request.h"
#include "log.h"
static GList *req_l = NULL;
static int req_id_cnt;
/* add request to the queue; it is up to the caller to manage
* memory for the three parameters
*/
void *add_req(
struct ticket_config *tk,
struct client *req_client,
struct boothc_ticket_msg *msg)
{
struct request *rp;
rp = g_new(struct request, 1);
if (!rp)
return NULL;
rp->id = req_id_cnt++;
rp->tk = tk;
rp->client_fd = req_client->fd;
rp->msg = msg;
req_l = g_list_append(req_l, rp);
return rp;
}
int get_req_id(const void *rp)
{
if (!rp)
return -1;
return ((struct request *)rp)->id;
}
static void del_req(GList *lp)
{
if (!lp)
return;
req_l = g_list_delete_link(req_l, lp);
}
-void foreach_tkt_req(struct ticket_config *tk, req_fp f)
+void foreach_tkt_req(struct booth_config *conf, struct ticket_config *tk,
+ req_fp f)
{
GList *lp, *next;
struct request *rp;
lp = g_list_first(req_l);
while (lp) {
next = g_list_next(lp);
rp = (struct request *)lp->data;
if (rp->tk == tk &&
- (f)(rp->tk, rp->client_fd, rp->msg) == 0) {
+ (f)(conf, rp->tk, rp->client_fd, rp->msg) == 0) {
log_debug("remove request for client %d", rp->client_fd);
del_req(lp); /* don't need this request anymore */
}
lp = next;
}
}
diff --git a/src/request.h b/src/request.h
index c014a09..2b25ca7 100644
--- a/src/request.h
+++ b/src/request.h
@@ -1,55 +1,68 @@
/*
* Copyright (C) 2015 Dejan Muhamedagic <dejan@hello-penguin.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _REQUEST_H
#define _REQUEST_H
#include "booth.h"
#include "config.h"
/* Requests are coming from clients and get queued in a
* round-robin queue (fixed size)
*
* This is one way to make the server more responsive and less
* dependent on misbehaving clients. The requests are queued and
* later served from the server loop.
*/
struct request {
/** Request ID */
int id;
/** The ticket. */
struct ticket_config *tk;
/** The client which sent the request */
int client_fd;
/** The message containing the request */
void *msg;
};
-typedef int (*req_fp)(
- struct ticket_config *, int, struct boothc_ticket_msg *);
+typedef int (*req_fp)(struct booth_config *, struct ticket_config *, int,
+ struct boothc_ticket_msg *);
void *add_req(struct ticket_config *tk, struct client *req_client,
struct boothc_ticket_msg *msg);
-void foreach_tkt_req(struct ticket_config *tk, req_fp f);
+
+/**
+ * @internal
+ * Handle all pending requests for given ticket using function @p f
+ *
+ * @param[in,out] conf config object to refer to
+ * @param[in] tk ticket at hand
+ * @param[in] f handling function
+ *
+ * @return 1 on success, 0 when not done with the message, yet
+ */
+void foreach_tkt_req(struct booth_config *conf, struct ticket_config *tk,
+ req_fp f);
+
int get_req_id(const void *rp);
#endif /* _REQUEST_H */
diff --git a/src/ticket.c b/src/ticket.c
index c9147d8..e668ce0 100644
--- a/src/ticket.c
+++ b/src/ticket.c
@@ -1,1440 +1,1440 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "b_config.h"
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <inttypes.h>
#include <stdio.h>
#include <assert.h>
#include <time.h>
#ifndef RANGE2RANDOM_GLIB
#include <clplumbing/cl_random.h>
#else
#include "alt/range2random_glib.h"
#endif
#include "ticket.h"
#include "config.h"
#include "pacemaker.h"
#include "inline-fn.h"
#include "log.h"
#include "booth.h"
#include "raft.h"
#include "handler.h"
#include "request.h"
#include "manual.h"
extern int TIME_RES;
/* Untrusted input, must fit (incl. \0) in a buffer of max chars. */
int check_max_len_valid(const char *s, int max)
{
for (int i = 0; i < max; i++) {
if (s[i] == 0) {
return 1;
}
}
return 0;
}
int find_ticket_by_name(struct booth_config *conf, const char *ticket,
struct ticket_config **found)
{
struct ticket_config *tk;
int i;
if (found) {
*found = NULL;
}
FOREACH_TICKET(conf, i, tk) {
if (strncmp(tk->name, ticket, sizeof(tk->name))) {
continue;
}
if (found) {
*found = tk;
}
return 1;
}
return 0;
}
int check_ticket(struct booth_config *conf, char *ticket,
struct ticket_config **found)
{
if (found) {
*found = NULL;
}
if (conf == NULL) {
return 0;
}
if (!check_max_len_valid(ticket, sizeof(conf->ticket[0].name))) {
return 0;
}
return find_ticket_by_name(conf, ticket, found);
}
/* is it safe to commit the grant?
* if we didn't hear from all sites on the initial grant, we may
* need to delay the commit
*
* TODO: investigate possibility to devise from history whether a
* missing site could be holding a ticket or not
*/
static int ticket_dangerous(struct ticket_config *tk)
{
int tdiff;
/* we may be invoked often, don't spam the log unnecessarily
*/
static int no_log_delay_msg;
if (!is_time_set(&tk->delay_commit)) {
return 0;
}
if (is_past(&tk->delay_commit) || all_sites_replied(tk)) {
if (tk->leader == local) {
tk_log_info("%s, committing to CIB",
is_past(&tk->delay_commit) ?
"ticket delay expired" : "all sites replied");
}
time_reset(&tk->delay_commit);
no_log_delay_msg = 0;
return 0;
}
tdiff = time_left(&tk->delay_commit);
tk_log_debug("delay ticket commit for another " intfmt(tdiff));
if (!no_log_delay_msg) {
tk_log_info("delaying ticket commit to CIB for " intfmt(tdiff));
tk_log_info("(or all sites are reached)");
no_log_delay_msg = 1;
}
return 1;
}
int ticket_write(struct ticket_config *tk)
{
if (local->type != SITE) {
return -EINVAL;
}
if (ticket_dangerous(tk)) {
return 1;
}
if (tk->leader == local) {
if (tk->state != ST_LEADER) {
tk_log_info("ticket state not yet consistent, "
"delaying ticket grant to CIB");
return 1;
}
pcmk_handler.grant_ticket(tk);
} else {
pcmk_handler.revoke_ticket(tk);
}
tk->update_cib = 0;
return 0;
}
void save_committed_tkt(struct ticket_config *tk)
{
if (!tk->last_valid_tk) {
tk->last_valid_tk = malloc(sizeof(struct ticket_config));
if (!tk->last_valid_tk) {
log_error("out of memory");
return;
}
}
memcpy(tk->last_valid_tk, tk, sizeof(struct ticket_config));
}
static void ext_prog_failed(struct ticket_config *tk, int start_election)
{
if (!is_manual(tk)) {
/* Give it to somebody else.
* Just send a VOTE_FOR message, so the
* others can start elections. */
if (!leader_and_valid(tk)) {
return;
}
save_committed_tkt(tk);
reset_ticket(tk);
ticket_write(tk);
if (start_election) {
ticket_broadcast(tk, OP_VOTE_FOR, OP_REQ_VOTE, RLT_SUCCESS, OR_LOCAL_FAIL);
}
} else {
/* There is not much we can do now because
* the manual ticket cannot be relocated.
* Just warn the user. */
if (tk->leader != local) {
return;
}
save_committed_tkt(tk);
reset_ticket(tk);
ticket_write(tk);
log_error("external test failed on the specified machine, cannot acquire a manual ticket");
}
}
#define attr_found(geo_ap, ap) \
((geo_ap) && !strcmp((geo_ap)->val, (ap)->attr_val))
int check_attr_prereq(struct ticket_config *tk, grant_type_e grant_type)
{
GList *el;
struct attr_prereq *ap;
struct geo_attr *geo_ap;
for (el = g_list_first(tk->attr_prereqs); el; el = g_list_next(el))
{
ap = (struct attr_prereq *) el->data;
if (ap->grant_type != grant_type) {
continue;
}
geo_ap = (struct geo_attr *) g_hash_table_lookup(tk->attr, ap->attr_name);
switch(ap->op) {
case ATTR_OP_EQ:
if (!attr_found(geo_ap, ap)) {
goto fail;
}
break;
case ATTR_OP_NE:
if (attr_found(geo_ap, ap)) {
goto fail;
}
break;
default:
break;
}
}
return 0;
fail:
tk_log_warn("'%s' attr-prereq failed", ap->attr_name);
return 1;
}
/* do we need to run the external program?
* or we already done that and waiting for the outcome
* or program exited and we can collect the status
* return codes
* 0: no program defined
* RUNCMD_MORE: program forked, results later
* != 0: executing program failed (or some other failure)
*/
static int do_ext_prog(struct ticket_config *tk, int start_election)
{
int rv = 0;
if (!tk_test.path) {
return 0;
}
switch(tk_test.progstate) {
case EXTPROG_IDLE:
rv = run_handler(tk);
if (rv == RUNCMD_ERR) {
tk_log_warn("couldn't run external test, not allowed to acquire ticket");
ext_prog_failed(tk, start_election);
}
break;
case EXTPROG_RUNNING:
/* should never get here, but just in case */
rv = RUNCMD_MORE;
break;
case EXTPROG_EXITED:
rv = tk_test_exit_status(tk);
if (rv) {
ext_prog_failed(tk, start_election);
}
break;
case EXTPROG_IGNORE:
/* nothing to do here */
break;
}
return rv;
}
/* Try to acquire a ticket
* Could be manual grant or after start (if the ticket is granted
* and still valid in the CIB)
* If the external program needs to run, this is run twice, once
* to start the program, and then to get the result and start
* elections.
*/
static int acquire_ticket(struct ticket_config *tk, cmd_reason_t reason)
{
int rv;
if (reason == OR_ADMIN && check_attr_prereq(tk, GRANT_MANUAL)) {
return RLT_ATTR_PREREQ;
}
switch(do_ext_prog(tk, 0)) {
case 0:
/* everything fine */
break;
case RUNCMD_MORE:
/* need to wait for the outcome before starting elections */
return 0;
default:
return RLT_EXT_FAILED;
}
if (is_manual(tk)) {
rv = manual_selection(tk, local, 1, reason);
} else {
rv = new_election(tk, local, 1, reason);
}
return rv ? RLT_SYNC_FAIL : 0;
}
/** Try to get the ticket for the local site.
* */
static int do_grant_ticket(struct ticket_config *tk, int options)
{
int rv;
tk_log_info("granting ticket");
if (tk->leader == local) {
return RLT_SUCCESS;
}
if (is_owned(tk)) {
if (is_manual(tk) && (options & OPT_IMMEDIATE)) {
/* -F flag has been used while granting a manual ticket.
* The ticket will be granted and may end up being granted
* on multiple sites */
tk_log_warn("manual ticket forced to be granted! be aware that "
"you may end up having two sites holding the same manual "
"ticket! revoke the ticket from the unnecessary site!");
} else {
return RLT_OVERGRANT;
}
}
set_future_time(&tk->delay_commit, tk->term_duration + tk->acquire_after);
if (options & OPT_IMMEDIATE) {
tk_log_warn("granting ticket immediately! If there are "
"unreachable sites, _hope_ you are sure that they don't "
"have the ticket!");
time_reset(&tk->delay_commit);
}
rv = acquire_ticket(tk, OR_ADMIN);
if (rv) {
time_reset(&tk->delay_commit);
return rv;
} else {
return RLT_MORE;
}
}
static void start_revoke_ticket(struct ticket_config *tk)
{
tk_log_info("revoking ticket");
save_committed_tkt(tk);
reset_ticket_and_set_no_leader(tk);
ticket_write(tk);
ticket_broadcast(tk, OP_REVOKE, OP_ACK, RLT_SUCCESS, OR_ADMIN);
}
/** Ticket revoke.
* Only to be started from the leader. */
static int do_revoke_ticket(struct ticket_config *tk)
{
if (tk->acks_expected) {
tk_log_info("delay ticket revoke until the current operation finishes");
set_next_state(tk, ST_INIT);
return RLT_MORE;
} else {
start_revoke_ticket(tk);
return RLT_SUCCESS;
}
}
static int number_sites_marked_as_granted(struct booth_config *conf,
struct ticket_config *tk)
{
int i, result = 0;
struct booth_site *ignored __attribute__((unused));
FOREACH_NODE(conf, i, ignored) {
result += tk->sites_where_granted[i];
}
return result;
}
static int list_ticket(struct booth_config *conf, char **pdata)
{
GString *s = NULL;
struct ticket_config *tk;
struct booth_site *site;
char timeout_str[64];
char *pending_str = NULL;
int i, site_index;
time_t ts;
s = g_string_sized_new(BUFSIZ);
if (s == NULL) {
return -ENOMEM;
}
FOREACH_TICKET(conf, i, tk) {
if (!is_manual(tk) && is_time_set(&tk->term_expires)) {
/* Manual tickets doesn't have term_expires defined */
ts = wall_ts(&tk->term_expires);
strftime(timeout_str, sizeof(timeout_str), "%F %T",
localtime(&ts));
} else {
strcpy(timeout_str, "INF");
}
if (tk->leader == local && is_time_set(&tk->delay_commit) &&
!is_past(&tk->delay_commit)) {
char until_str[64];
int rc;
ts = wall_ts(&tk->delay_commit);
strftime(until_str, sizeof(until_str), "%F %T",
localtime(&ts));
rc = asprintf(&pending_str, " (commit pending until %s)",
until_str);
if (rc < 0) {
g_string_free(s, TRUE);
return -ENOMEM;
}
}
g_string_append_printf(s, "ticket: %s, leader: %s", tk->name,
ticket_leader_string(tk));
if (is_owned(tk)) {
g_string_append_printf(s, ", expires: %s", timeout_str);
if (pending_str != NULL) {
g_string_append(s, pending_str);
}
}
if (is_manual(tk)) {
g_string_append(s, " [manual mode]");
}
g_string_append(s, "\n");
if (pending_str != NULL) {
free(pending_str);
pending_str = NULL;
}
}
FOREACH_TICKET(conf, i, tk) {
int multiple_grant_warning_length = number_sites_marked_as_granted(conf, tk);
if (multiple_grant_warning_length <= 1) {
continue;
}
g_string_append_printf(s, "\nWARNING: The ticket %s is granted to multiple sites: ",
tk->name);
FOREACH_NODE(conf, site_index, site) {
if (tk->sites_where_granted[site_index] <= 0) {
continue;
}
g_string_append(s, site_string(site));
multiple_grant_warning_length--;
if (multiple_grant_warning_length > 0) {
g_string_append(s, ", ");
}
}
g_string_append(s, ". Revoke the ticket from the faulty sites.\n");
}
*pdata = strdup(s->str);
g_string_free(s, TRUE);
if (*pdata == NULL) {
return -ENOMEM;
}
return 0;
}
void disown_ticket(struct ticket_config *tk)
{
set_leader(tk, NULL);
tk->is_granted = 0;
get_time(&tk->term_expires);
}
void reset_ticket(struct ticket_config *tk)
{
ignore_ext_test(tk);
disown_ticket(tk);
no_resends(tk);
set_state(tk, ST_INIT);
set_next_state(tk, 0);
tk->voted_for = NULL;
}
void reset_ticket_and_set_no_leader(struct ticket_config *tk)
{
mark_ticket_as_revoked_from_leader(tk);
reset_ticket(tk);
tk->leader = no_leader;
tk_log_debug("ticket leader set to no_leader");
}
static void log_reacquire_reason(struct ticket_config *tk)
{
int valid;
const char *where_granted = NULL;
char buff[75];
valid = is_time_set(&tk->term_expires) && !is_past(&tk->term_expires);
if (tk->leader == local) {
where_granted = "granted here";
} else {
snprintf(buff, sizeof(buff), "granted to %s",
site_string(tk->leader));
where_granted = buff;
}
if (!valid) {
tk_log_warn("%s, but not valid anymore (will try to reacquire)",
where_granted);
}
if (tk->is_granted && tk->leader != local) {
if (tk->leader && tk->leader != no_leader) {
tk_log_error("granted here, but also %s, "
"that's really too bad (will try to reacquire)",
where_granted);
} else {
tk_log_warn("granted here, but we're "
"not recorded as the grantee (will try to reacquire)");
}
}
}
void update_ticket_state(struct ticket_config *tk, struct booth_site *sender)
{
if (tk->state == ST_CANDIDATE) {
tk_log_info("learned from %s about newer ticket, stopping elections",
site_string(sender));
/* there could be rejects coming from others; don't log
* warnings unnecessarily */
tk->expect_more_rejects = 1;
}
if (tk->leader == local || tk->is_granted) {
/* message from a live leader with valid ticket? */
if (sender == tk->leader && term_time_left(tk)) {
if (tk->is_granted) {
tk_log_warn("ticket was granted here, "
"but it's live at %s (revoking here)",
site_string(sender));
} else {
tk_log_info("ticket live at %s", site_string(sender));
}
disown_ticket(tk);
ticket_write(tk);
set_state(tk, ST_FOLLOWER);
set_next_state(tk, ST_FOLLOWER);
} else {
if (tk->state == ST_CANDIDATE) {
set_state(tk, ST_FOLLOWER);
}
set_next_state(tk, ST_LEADER);
}
} else {
if (!tk->leader || tk->leader == no_leader) {
if (sender) {
tk_log_info("ticket is not granted");
} else {
tk_log_info("ticket is not granted (from CIB)");
}
set_state(tk, ST_INIT);
} else {
if (sender) {
tk_log_info("ticket granted to %s (says %s)",
site_string(tk->leader),
tk->leader == sender ? "they" : site_string(sender));
} else {
tk_log_info("ticket granted to %s (from CIB)",
site_string(tk->leader));
}
set_state(tk, ST_FOLLOWER);
/* just make sure that we check the ticket soon */
set_next_state(tk, ST_FOLLOWER);
}
}
}
int setup_ticket(struct booth_config *conf)
{
struct ticket_config *tk;
int i;
FOREACH_TICKET(conf, i, tk) {
reset_ticket(tk);
if (local->type == SITE) {
if (!pcmk_handler.load_ticket(conf, tk)) {
update_ticket_state(tk, NULL);
}
tk->update_cib = 1;
}
tk_log_info("broadcasting state query");
/* wait until all send their status (or the first
* timeout) */
tk->start_postpone = 1;
ticket_broadcast(tk, OP_STATUS, OP_MY_INDEX, RLT_SUCCESS, 0);
}
return 0;
}
int ticket_answer_list(struct booth_config *conf, int fd)
{
char *data = NULL;
int rv;
struct boothc_hdr_msg hdr;
rv = list_ticket(conf, &data);
if (rv < 0) {
goto out;
}
init_header(&hdr.header, CL_LIST, 0, 0, RLT_SUCCESS, 0, sizeof(hdr) + strlen(data));
rv = send_header_plus(fd, &hdr, data, strlen(data));
out:
if (data != NULL) {
free(data);
}
return rv;
}
int process_client_request(struct booth_config *conf, struct client *req_client,
void *buf)
{
int rv, rc = 1;
struct ticket_config *tk;
int cmd;
struct boothc_ticket_msg omsg;
struct boothc_ticket_msg *msg;
msg = (struct boothc_ticket_msg *)buf;
cmd = ntohl(msg->header.cmd);
if (!check_ticket(conf, msg->ticket.id, &tk)) {
log_warn("client referenced unknown ticket %s", msg->ticket.id);
rv = RLT_INVALID_ARG;
goto reply_now;
}
/* Perform the initial check before granting
* an already granted non-manual ticket */
if (!is_manual(tk) && cmd == CMD_GRANT && is_owned(tk)) {
log_warn("client wants to grant an (already granted!) ticket %s",
msg->ticket.id);
rv = RLT_OVERGRANT;
goto reply_now;
}
if (cmd == CMD_REVOKE && !is_owned(tk)) {
log_info("client wants to revoke a free ticket %s", msg->ticket.id);
rv = RLT_TICKET_IDLE;
goto reply_now;
}
if (cmd == CMD_REVOKE && tk->leader != local) {
tk_log_info("not granted here, redirect to %s",
ticket_leader_string(tk));
rv = RLT_REDIRECT;
goto reply_now;
}
if (cmd == CMD_REVOKE) {
rv = do_revoke_ticket(tk);
} else {
rv = do_grant_ticket(tk, ntohl(msg->header.options));
}
if (rv == RLT_MORE) {
/* client may receive further notifications, save the
* request for further processing */
add_req(tk, req_client, msg);
tk_log_debug("queue request %s for client %d",
state_to_string(cmd), req_client->fd);
rc = 0; /* we're not yet done with the message */
}
reply_now:
init_ticket_msg(&omsg, CL_RESULT, 0, rv, 0, tk);
send_client_msg(req_client->fd, &omsg);
return rc;
}
-int notify_client(struct ticket_config *tk, int client_fd,
- struct boothc_ticket_msg *msg)
+int notify_client(struct booth_config *conf, struct ticket_config *tk,
+ int client_fd, struct boothc_ticket_msg *msg)
{
struct boothc_ticket_msg omsg;
void (*deadfn) (int ci);
int rv, rc, ci;
int cmd, options;
struct client *req_client;
cmd = ntohl(msg->header.cmd);
options = ntohl(msg->header.options);
rv = tk->outcome;
ci = find_client_by_fd(client_fd);
if (ci < 0) {
tk_log_info("client %d (request %s) left before being notified",
client_fd, state_to_string(cmd));
return 0;
}
tk_log_debug("notifying client %d (request %s)",
client_fd, state_to_string(cmd));
init_ticket_msg(&omsg, CL_RESULT, 0, rv, 0, tk);
rc = send_client_msg(client_fd, &omsg);
if (rc == 0 && (rv == RLT_MORE ||
(rv == RLT_CIB_PENDING && (options & OPT_WAIT_COMMIT)))) {
/* more to do here, keep the request */
return 1;
} else {
/* we sent a definite answer or there was a write error, drop
* the client */
if (rc) {
tk_log_debug("failed to notify client %d (request %s)",
client_fd, state_to_string(cmd));
} else {
tk_log_debug("client %d (request %s) got final notification",
client_fd, state_to_string(cmd));
}
req_client = clients + ci;
deadfn = req_client->deadfn;
if (deadfn) {
deadfn(ci);
}
return 0; /* we're done with this request */
}
}
int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd,
cmd_request_t expected_reply, cmd_result_t res,
cmd_reason_t reason)
{
struct boothc_ticket_msg msg;
init_ticket_msg(&msg, cmd, 0, res, reason, tk);
tk_log_debug("broadcasting '%s' (term=%d, valid=%d)",
state_to_string(cmd),
ntohl(msg.ticket.term),
msg_term_time(&msg));
tk->last_request = cmd;
if (expected_reply) {
expect_replies(tk, expected_reply);
}
ticket_activate_timeout(tk);
return transport()->broadcast_auth(&msg, sendmsglen(&msg));
}
/* update the ticket on the leader, write it to the CIB, and
send out the update message to others with the new expiry
time
*/
-int leader_update_ticket(struct ticket_config *tk)
+int leader_update_ticket(struct booth_config *conf, struct ticket_config *tk)
{
int rv = 0, rv2;
timetype now;
if (tk->ticket_updated >= 2) {
return 0;
}
/* for manual tickets, we don't set time expiration */
if (!is_manual(tk) && tk->ticket_updated < 1) {
tk->ticket_updated = 1;
get_time(&now);
copy_time(&now, &tk->last_renewal);
set_future_time(&tk->term_expires, tk->term_duration);
rv = ticket_broadcast(tk, OP_UPDATE, OP_ACK, RLT_SUCCESS, 0);
}
if (tk->ticket_updated < 2) {
rv2 = ticket_write(tk);
switch(rv2) {
case 0:
tk->ticket_updated = 2;
tk->outcome = RLT_SUCCESS;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
break;
case 1:
if (tk->outcome != RLT_CIB_PENDING) {
tk->outcome = RLT_CIB_PENDING;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
}
break;
default:
break;
}
}
return rv;
}
static void log_lost_servers(struct booth_config *conf, struct ticket_config *tk)
{
struct booth_site *n;
int i;
if (tk->retry_number > 1) {
/* log those that we couldn't reach, but do
* that only on the first retry
*/
return;
}
FOREACH_NODE(conf, i, n) {
if (tk->acks_received & n->bitmask) {
continue;
}
tk_log_warn("%s %s didn't acknowledge our %s, "
"will retry %d times",
(n->type == ARBITRATOR ? "arbitrator" : "site"),
site_string(n), state_to_string(tk->last_request),
tk->retries);
}
}
static void resend_msg(struct booth_config *conf, struct ticket_config *tk)
{
struct booth_site *n;
int i;
if (!(tk->acks_received ^ local->bitmask)) {
ticket_broadcast(tk, tk->last_request, 0, RLT_SUCCESS, 0);
} else {
FOREACH_NODE(conf, i, n) {
if (tk->acks_received & n->bitmask) {
continue;
}
n->resend_cnt++;
tk_log_debug("resending %s to %s",
state_to_string(tk->last_request),
site_string(n));
send_msg(tk->last_request, tk, n, NULL);
}
ticket_activate_timeout(tk);
}
}
static void handle_resends(struct booth_config *conf, struct ticket_config *tk)
{
int ack_cnt;
if (++tk->retry_number > tk->retries) {
tk_log_info("giving up on sending retries");
no_resends(tk);
set_ticket_wakeup(tk);
return;
}
/* try to reach some sites again if we just stepped down */
if (tk->last_request == OP_VOTE_FOR) {
tk_log_warn("no answers to our VtFr request to step down (try #%d), "
"we are alone",
tk->retry_number);
goto just_resend;
}
if (!majority_of_bits(tk, tk->acks_received)) {
ack_cnt = count_bits(tk->acks_received) - 1;
if (!ack_cnt) {
tk_log_warn("no answers to our request (try #%d), "
"we are alone",
tk->retry_number);
} else {
tk_log_warn("not enough answers to our request (try #%d): "
"only got %d answers",
tk->retry_number, ack_cnt);
}
} else {
log_lost_servers(conf, tk);
}
just_resend:
resend_msg(conf, tk);
}
static int postpone_ticket_processing(struct ticket_config *tk)
{
extern timetype start_time;
return tk->start_postpone && (-time_left(&start_time) < tk->timeout);
}
#define has_extprog_exited(tk) ((tk)->clu_test.progstate == EXTPROG_EXITED)
-static void process_next_state(struct ticket_config *tk)
+static void process_next_state(struct booth_config *conf, struct ticket_config *tk)
{
int rv;
switch(tk->next_state) {
case ST_LEADER:
if (has_extprog_exited(tk)) {
if (tk->state == ST_LEADER) {
break;
}
rv = acquire_ticket(tk, OR_ADMIN);
if (rv != 0) { /* external program failed */
tk->outcome = rv;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
}
} else {
log_reacquire_reason(tk);
acquire_ticket(tk, OR_REACQUIRE);
}
break;
case ST_INIT:
no_resends(tk);
start_revoke_ticket(tk);
tk->outcome = RLT_SUCCESS;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
break;
/* wanting to be follower is not much of an ambition; no
* processing, just return; don't reset start_postpone until
* we got some replies to status */
case ST_FOLLOWER:
return;
default:
break;
}
tk->start_postpone = 0;
}
static void ticket_lost(struct ticket_config *tk)
{
int reason = OR_TKT_LOST;
if (tk->leader != local) {
tk_log_warn("lost at %s", site_string(tk->leader));
} else {
if (is_ext_prog_running(tk)) {
ext_prog_timeout(tk);
reason = OR_LOCAL_FAIL;
} else {
tk_log_warn("lost majority (revoking locally)");
reason = tk->election_reason ? tk->election_reason : OR_REACQUIRE;
}
}
tk->lost_leader = tk->leader;
save_committed_tkt(tk);
mark_ticket_as_revoked_from_leader(tk);
reset_ticket(tk);
set_state(tk, ST_FOLLOWER);
if (local->type == SITE) {
ticket_write(tk);
schedule_election(tk, reason);
}
}
static void next_action(struct booth_config *conf, struct ticket_config *tk)
{
int rv;
switch(tk->state) {
case ST_INIT:
/* init state, handle resends for ticket revoke */
/* and rebroadcast if stepping down */
/* try to acquire ticket on grant */
if (has_extprog_exited(tk)) {
rv = acquire_ticket(tk, OR_ADMIN);
if (rv != 0) { /* external program failed */
tk->outcome = rv;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
}
} else {
if (tk->acks_expected) {
handle_resends(conf, tk);
}
}
break;
case ST_FOLLOWER:
if (!is_manual(tk)) {
/* leader/ticket lost? and we didn't vote yet */
tk_log_debug("leader: %s, voted_for: %s",
site_string(tk->leader),
site_string(tk->voted_for));
if (tk->leader) {
break;
}
if (!tk->voted_for || !tk->in_election) {
disown_ticket(tk);
if (!new_election(tk, NULL, 1, OR_AGAIN)) {
ticket_activate_timeout(tk);
}
} else {
/* we should restart elections in case nothing
* happens in the meantime */
tk->in_election = 0;
ticket_activate_timeout(tk);
}
} else {
/* for manual tickets, also try to acquire ticket on grant
* in the Follower state (because we may end up having
* two Leaders) */
if (has_extprog_exited(tk)) {
rv = acquire_ticket(tk, OR_ADMIN);
if (rv != 0) { /* external program failed */
tk->outcome = rv;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
}
} else {
/* Otherwise, just send ACKs if needed */
if (tk->acks_expected) {
handle_resends(conf, tk);
}
}
}
break;
case ST_CANDIDATE:
/* elections timed out? */
- elections_end(tk);
+ elections_end(conf, tk);
break;
case ST_LEADER:
/* timeout or ticket renewal? */
if (tk->acks_expected) {
handle_resends(conf, tk);
if (majority_of_bits(tk, tk->acks_received)) {
- leader_update_ticket(tk);
+ leader_update_ticket(conf, tk);
}
} else if (!do_ext_prog(tk, 1)) {
/* this is ticket renewal, run local test */
ticket_broadcast(tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0);
tk->ticket_updated = 0;
}
break;
default:
break;
}
}
static void ticket_cron(struct booth_config *conf, struct ticket_config *tk)
{
/* don't process the tickets too early after start */
if (postpone_ticket_processing(tk)) {
tk_log_debug("ticket processing postponed (start_postpone=%d)",
tk->start_postpone);
/* but run again soon */
ticket_activate_timeout(tk);
return;
}
/* no need for status resends, we hope we got at least one
* my_index back */
if (tk->acks_expected == OP_MY_INDEX) {
no_resends(tk);
}
/* after startup, we need to decide what to do based on the
* current ticket state; tk->next_state has a hint
* also used for revokes which had to be delayed
*/
if (tk->next_state) {
- process_next_state(tk);
+ process_next_state(conf, tk);
goto out;
}
/* Has an owner, has an expiry date, and expiry date in the past?
* For automatic tickets, losing the ticket must happen
* in _every_ state.
*/
if (!is_manual(tk) && is_owned(tk) && is_time_set(&tk->term_expires) &&
is_past(&tk->term_expires)) {
ticket_lost(tk);
goto out;
}
next_action(conf, tk);
out:
tk->next_state = 0;
if (!tk->in_election && tk->update_cib) {
ticket_write(tk);
}
}
void process_tickets(struct booth_config *conf)
{
struct ticket_config *tk;
int i;
timetype last_cron;
FOREACH_TICKET(conf, i, tk) {
if (!has_extprog_exited(tk) &&
is_time_set(&tk->next_cron) && !is_past(&tk->next_cron)) {
continue;
}
tk_log_debug("ticket cron");
copy_time(&tk->next_cron, &last_cron);
ticket_cron(conf, tk);
if (time_cmp(&last_cron, &tk->next_cron, ==)) {
tk_log_debug("nobody set ticket wakeup");
set_ticket_wakeup(tk);
}
}
}
void tickets_log_info(struct booth_config *conf)
{
struct ticket_config *tk;
int i;
time_t ts;
FOREACH_TICKET(conf, i, tk) {
ts = wall_ts(&tk->term_expires);
tk_log_info("state '%s' term %d leader %s expires %-24.24s",
state_to_string(tk->state),
tk->current_term,
ticket_leader_string(tk),
ctime(&ts));
}
}
static void update_acks(struct ticket_config *tk, struct booth_site *sender,
struct booth_site *leader, struct boothc_ticket_msg *msg)
{
uint32_t cmd;
uint32_t req;
cmd = ntohl(msg->header.cmd);
req = ntohl(msg->header.request);
if (req != tk->last_request ||
(tk->acks_expected != cmd && tk->acks_expected != OP_REJECTED)) {
return;
}
/* got an ack! */
tk->acks_received |= sender->bitmask;
if (all_replied(tk) ||
/* we just stepped down, need only one site to start elections */
(cmd == OP_REQ_VOTE && tk->last_request == OP_VOTE_FOR)) {
no_resends(tk);
tk->start_postpone = 0;
set_ticket_wakeup(tk);
}
}
/* read ticket message */
int ticket_recv(struct booth_config *conf, void *buf, struct booth_site *source)
{
struct boothc_ticket_msg *msg;
struct ticket_config *tk;
struct booth_site *leader;
uint32_t leader_u;
msg = (struct boothc_ticket_msg *)buf;
if (!check_ticket(conf, msg->ticket.id, &tk)) {
log_warn("got invalid ticket name %s from %s",
msg->ticket.id, site_string(source));
source->invalid_cnt++;
return -EINVAL;
}
leader_u = ntohl(msg->ticket.leader);
if (!find_site_by_id(conf, leader_u, &leader)) {
tk_log_error("message with unknown leader %u received", leader_u);
source->invalid_cnt++;
return -EINVAL;
}
update_acks(tk, source, leader, msg);
- return raft_answer(tk, source, leader, msg);
+ return raft_answer(conf, tk, source, leader, msg);
}
static void log_next_wakeup(struct ticket_config *tk)
{
int left;
left = time_left(&tk->next_cron);
tk_log_debug("set ticket wakeup in " intfmt(left));
}
/* New vote round; §5.2 */
/* delay the next election start for some random time
* (up to 1 second)
*/
void add_random_delay(struct ticket_config *tk)
{
timetype tv;
interval_add(&tk->next_cron, rand_time(min(1000, tk->timeout)), &tv);
ticket_next_cron_at(tk, &tv);
if (ANYDEBUG) {
log_next_wakeup(tk);
}
}
void set_ticket_wakeup(struct ticket_config *tk)
{
timetype near_future, tv, next_vote;
set_future_time(&near_future, 10);
if (!is_manual(tk)) {
/* At least every hour, perhaps sooner (default) */
tk_log_debug("ticket will be woken up after up to one hour");
ticket_next_cron_in(tk, 3600*TIME_RES);
switch (tk->state) {
case ST_LEADER:
assert(tk->leader == local);
get_next_election_time(tk, &next_vote);
/* If timestamp is in the past, wakeup in
* near future */
if (!is_time_set(&next_vote)) {
tk_log_debug("next ts unset, wakeup soon");
ticket_next_cron_at(tk, &near_future);
} else if (is_past(&next_vote)) {
int tdiff = time_left(&next_vote);
tk_log_debug("next ts in the past " intfmt(tdiff));
ticket_next_cron_at(tk, &near_future);
} else {
ticket_next_cron_at(tk, &next_vote);
}
break;
case ST_CANDIDATE:
assert(is_time_set(&tk->election_end));
ticket_next_cron_at(tk, &tk->election_end);
break;
case ST_INIT:
case ST_FOLLOWER:
/* If there is (or should be) some owner, check on it later on.
* If no one is interested - don't care. */
if (is_owned(tk)) {
interval_add(&tk->term_expires, tk->acquire_after, &tv);
ticket_next_cron_at(tk, &tv);
}
break;
default:
tk_log_error("unknown ticket state: %d", tk->state);
}
if (tk->next_state) {
/* we need to do something soon here */
if (!tk->acks_expected) {
ticket_next_cron_at(tk, &near_future);
} else {
ticket_activate_timeout(tk);
}
}
} else {
/* At least six minutes, to make sure that multi-leader situations
* will be solved promptly.
*/
tk_log_debug("manual ticket will be woken up after up to six minutes");
ticket_next_cron_in(tk, 60 * TIME_RES);
/* For manual tickets, no earlier timeout could be set in a similar
* way as it is done in a switch above for automatic tickets.
* The reason is that term's timeout is INF and no Raft-based elections
* are performed.
*/
}
if (ANYDEBUG) {
log_next_wakeup(tk);
}
}
void schedule_election(struct ticket_config *tk, cmd_reason_t reason)
{
if (local->type != SITE) {
return;
}
tk->election_reason = reason;
get_time(&tk->next_cron);
/* introduce a short delay before starting election */
add_random_delay(tk);
}
int is_manual(struct ticket_config *tk)
{
return (tk->mode == TICKET_MODE_MANUAL) ? 1 : 0;
}
/* Given a state (in host byte order), return a human-readable (char*).
* An array is used so that multiple states can be printed in a single printf(). */
char *state_to_string(uint32_t state_ho)
{
union mu { cmd_request_t s; char c[5]; };
static union mu cache[6] = { { 0 } }, *cur;
static int current = 0;
current ++;
if (current >= sizeof(cache)/sizeof(cache[0])) {
current = 0;
}
cur = cache + current;
cur->s = htonl(state_ho);
/* Shouldn't be necessary, union array is initialized with zeroes, and
* these bytes never get written. */
cur->c[4] = 0;
return cur->c;
}
int send_reject(struct booth_site *dest, struct ticket_config *tk,
cmd_result_t code, struct boothc_ticket_msg *in_msg)
{
int req = ntohl(in_msg->header.cmd);
struct boothc_ticket_msg msg;
tk_log_debug("sending reject to %s", site_string(dest));
init_ticket_msg(&msg, OP_REJECTED, req, code, 0, tk);
return booth_udp_send_auth(dest, &msg, sendmsglen(&msg));
}
int send_msg(int cmd, struct ticket_config *tk, struct booth_site *dest,
struct boothc_ticket_msg *in_msg)
{
int req = 0;
struct ticket_config *valid_tk = tk;
struct boothc_ticket_msg msg;
/* if we want to send the last valid ticket, then if we're in
* the ST_CANDIDATE state, the last valid ticket is in
* tk->last_valid_tk
*/
if (cmd == OP_MY_INDEX) {
if (tk->state == ST_CANDIDATE && tk->last_valid_tk) {
valid_tk = tk->last_valid_tk;
}
tk_log_info("sending status to %s", site_string(dest));
}
if (in_msg) {
req = ntohl(in_msg->header.cmd);
}
init_ticket_msg(&msg, cmd, req, RLT_SUCCESS, 0, valid_tk);
return booth_udp_send_auth(dest, &msg, sendmsglen(&msg));
}
diff --git a/src/ticket.h b/src/ticket.h
index 454ebaf..22e7215 100644
--- a/src/ticket.h
+++ b/src/ticket.h
@@ -1,249 +1,260 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _TICKET_H
#define _TICKET_H
#include <time.h>
#include <sys/time.h>
#include <math.h>
#include "timer.h"
#include "config.h"
#include "log.h"
extern int TIME_RES;
#define DEFAULT_TICKET_EXPIRY (600*TIME_RES)
#define DEFAULT_TICKET_TIMEOUT (5*TIME_RES)
#define DEFAULT_RETRIES 10
#define FOREACH_TICKET(b_, i_, t_) \
for (i_ = 0; \
(t_ = (b_)->ticket + i_, i_ < (b_)->ticket_count); \
i_++)
#define FOREACH_NODE(b_, i_, n_) \
for (i_ = 0; \
(n_ = (b_)->site + i_, i_ < (b_)->site_count); \
i_++)
#define _FOREACH_TICKET(i_, t_) \
for (i_ = 0; \
(t_ = booth_conf->ticket + i_, i_ < booth_conf->ticket_count); \
i_++)
#define _FOREACH_NODE(i_, n_) \
for (i_ = 0; \
(n_ = booth_conf->site + i_, i_ < booth_conf->site_count); \
i_++)
#define set_leader(tk, who) do { \
if (who == NULL) { \
mark_ticket_as_revoked_from_leader(tk); \
} \
\
tk->leader = who; \
tk_log_debug("ticket leader set to %s", ticket_leader_string(tk)); \
\
if (tk->leader) { \
mark_ticket_as_granted(tk, tk->leader); \
} \
} while(0)
#define mark_ticket_as_granted(tk, who) do { \
if (is_manual(tk) && (who->index > -1)) { \
tk->sites_where_granted[who->index] = 1; \
tk_log_debug("manual ticket marked as granted to %s", ticket_leader_string(tk)); \
} \
} while(0)
#define mark_ticket_as_revoked(tk, who) do { \
if (is_manual(tk) && who && (who->index > -1)) { \
tk->sites_where_granted[who->index] = 0; \
tk_log_debug("manual ticket marked as revoked from %s", site_string(who)); \
} \
} while(0)
#define mark_ticket_as_revoked_from_leader(tk) do { \
if (tk->leader) { \
mark_ticket_as_revoked(tk, tk->leader); \
} \
} while(0)
#define set_state(tk, newst) do { \
tk_log_debug("state transition: %s -> %s", \
state_to_string(tk->state), state_to_string(newst)); \
tk->state = newst; \
} while(0)
#define set_next_state(tk, newst) do { \
if (!(newst)) tk_log_debug("next state reset"); \
else tk_log_debug("next state set to %s", state_to_string(newst)); \
tk->next_state = newst; \
} while(0)
#define is_term_invalid(tk, term) \
((tk)->last_valid_tk && (tk)->last_valid_tk->current_term > (term))
void save_committed_tkt(struct ticket_config *tk);
void disown_ticket(struct ticket_config *tk);
/**
* @internal
* Like @find_ticket_by_name, but perform sanity checks on the found ticket
*
* @param[in,out] conf config object to refer to
* @param[in] ticket name of the ticket to search for
* @param[out] found place the reference here when found
*
* @return 0 on failure, see @find_ticket_by_name otherwise
*/
int check_ticket(struct booth_config *conf, char *ticket, struct ticket_config **tc);
int grant_ticket(struct ticket_config *ticket);
int revoke_ticket(struct ticket_config *ticket);
/**
* @internal
* Second stage of incoming datagram handling (after authentication)
*
* @param[in,out] conf config object to refer to
* @param[in] buf raw message to act upon
* @param[in] source member originating this message
*
* @return 0 on success or negative value (-1 or -errno) on error
*/
int ticket_recv(struct booth_config *conf, void *buf, struct booth_site *source);
void reset_ticket(struct ticket_config *tk);
void reset_ticket_and_set_no_leader(struct ticket_config *tk);
void update_ticket_state(struct ticket_config *tk, struct booth_site *sender);
/**
* @internal
* Broadcast the initial state query
*
* @param[in,out] conf config object to use as a starting point
*
* @return 0 (for the time being)
*/
int setup_ticket(struct booth_config *conf);
int check_max_len_valid(const char *s, int max);
/**
* @internal
* Find a ticket based on a given name
*
* @param[in,out] conf config object to refer to
* @param[in] ticket name of the ticket to search for
* @param[out] found place the reference here when found
*
* @return see @list_ticket and @send_header_plus
*/
int find_ticket_by_name(struct booth_config *conf,
const char *ticket, struct ticket_config **found);
void set_ticket_wakeup(struct ticket_config *tk);
/**
* @internal
* Implementation of ticket listing
*
* @param[in,out] conf config object to refer to
* @param[in] fd file descriptor of the socket to respond to
*
* @return see @list_ticket and @send_header_plus
*/
int ticket_answer_list(struct booth_config *conf, int fd);
/**
* @internal
* Process request from the client (as opposed to the peer daemon)
*
* @param[in,out] conf config object to refer to
* @param[in] req_client client structure of the sender
* @param[in] buf client message
*
* @return 1 on success, or 0 when not yet done with the message
*/
int process_client_request(struct booth_config *conf, struct client *req_client,
void *buf);
int ticket_write(struct ticket_config *tk);
/**
* @internal
* Mainloop of booth ticket handling
*
* @param[in,out] conf config object to refer to
*/
void process_tickets(struct booth_config *conf);
/**
* @internal
* Log properties of all tickets
*
* @param[in,out] conf config object to refer to
*/
void tickets_log_info(struct booth_config *conf);
char *state_to_string(uint32_t state_ho);
int send_reject(struct booth_site *dest, struct ticket_config *tk,
cmd_result_t code, struct boothc_ticket_msg *in_msg);
int send_msg (int cmd, struct ticket_config *tk,
struct booth_site *dest, struct boothc_ticket_msg *in_msg);
-int notify_client(struct ticket_config *tk, int client_fd,
- struct boothc_ticket_msg *msg);
+
+/**
+ * @internal
+ * Notify client at particular socket, regarding particular ticket
+ *
+ * @param[in,out] conf config object to refer to
+ * @param[in] tk ticket at hand
+ * @param[in] fd file descriptor of the socket to respond to
+ * @param[in] msg input message being responded to
+ */
+int notify_client(struct booth_config *conf, struct ticket_config *tk,
+ int client_fd, struct boothc_ticket_msg *msg);
+
int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_request_t expected_reply, cmd_result_t res, cmd_reason_t reason);
-int leader_update_ticket(struct ticket_config *tk);
+int leader_update_ticket(struct booth_config *conf, struct ticket_config *tk);
void add_random_delay(struct ticket_config *tk);
void schedule_election(struct ticket_config *tk, cmd_reason_t reason);
int is_manual(struct ticket_config *tk);
int check_attr_prereq(struct ticket_config *tk, grant_type_e grant_type);
static inline void ticket_next_cron_at(struct ticket_config *tk, timetype *when)
{
copy_time(when, &tk->next_cron);
}
static inline void ticket_next_cron_in(struct ticket_config *tk, int interval)
{
timetype tv;
set_future_time(&tv, interval);
ticket_next_cron_at(tk, &tv);
}
static inline void ticket_activate_timeout(struct ticket_config *tk)
{
/* TODO: increase timeout when no answers */
tk_log_debug("activate ticket timeout in %d", tk->timeout);
ticket_next_cron_in(tk, tk->timeout);
}
#endif /* _TICKET_H */
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Nov 23, 3:28 PM (16 h, 23 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1018847
Default Alt Text
(75 KB)
Attached To
Mode
rB Booth
Attached
Detach File
Event Timeline
Log In to Comment