diff --git a/src/inline-fn.h b/src/inline-fn.h index bf9feeb..67bd6d2 100644 --- a/src/inline-fn.h +++ b/src/inline-fn.h @@ -1,336 +1,344 @@ /* * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _INLINE_FN_H #define _INLINE_FN_H #include #include #include #include #include "config.h" #include "ticket.h" #include "transport.h" inline static uint32_t get_local_id(void) { return local ? local->site_id : -1; } inline static uint32_t get_node_id(struct booth_site *node) { return node ? node->site_id : NO_ONE; } inline static int term_time_left(const struct ticket_config *tk) { int left; left = tk->term_expires - time(NULL); return (left < 0) ? 0 : left; } /** Returns number of seconds left, if any. */ inline static int leader_and_valid(const struct ticket_config *tk) { if (tk->leader != local) return 0; return term_time_left(tk); } /** Is this some leader? */ inline static int is_owned(const struct ticket_config *tk) { return (tk->leader && tk->leader != no_leader); } static inline void init_header_bare(struct boothc_header *h) { h->magic = htonl(BOOTHC_MAGIC); h->version = htonl(BOOTHC_VERSION); h->from = htonl(local->site_id); h->iv = htonl(0); h->auth1 = htonl(0); h->auth2 = htonl(0); } static inline void init_header(struct boothc_header *h, int cmd, int options, int result, int reason, int data_len) { init_header_bare(h); h->length = htonl(data_len); h->cmd = htonl(cmd); h->options = htonl(options); h->result = htonl(result); h->reason = htonl(reason); } static inline void init_ticket_site_header(struct boothc_ticket_msg *msg, int cmd) { init_header(&msg->header, cmd, 0, 0, 0, sizeof(*msg)); } static inline void init_ticket_msg(struct boothc_ticket_msg *msg, int cmd, int rv, int reason, struct ticket_config *tk) { assert(sizeof(msg->ticket.id) == sizeof(tk->name)); init_header(&msg->header, cmd, 0, rv, reason, sizeof(*msg)); if (!tk) { memset(&msg->ticket, 0, sizeof(msg->ticket)); } else { memcpy(msg->ticket.id, tk->name, sizeof(msg->ticket.id)); msg->ticket.leader = htonl(get_node_id( (tk->leader && tk->leader != no_leader) ? tk->leader : tk->voted_for)); msg->ticket.term = htonl(tk->current_term); msg->ticket.term_valid_for = htonl(term_time_left(tk)); msg->ticket.leader_commit = htonl(tk->commit_index); } } static inline struct booth_transport const *transport(void) { return booth_transport + booth_conf->proto; } static inline const char *site_string(struct booth_site *site) { return site ? site->addr_string : "NONE"; } static inline const char *ticket_leader_string(struct ticket_config *tk) { return site_string(tk->leader); } static inline void disown_ticket(struct ticket_config *tk) { tk->leader = no_leader; tk->is_granted = 0; time(&tk->term_expires); } static inline int disown_if_expired(struct ticket_config *tk) { if (time(NULL) >= tk->term_expires || !tk->leader) { disown_ticket(tk); return 1; } return 0; } /* We allow half of the uint32_t to be used; * half of that below, half of that above the current known "good" value. * 0 UINT32_MAX * |--------------------------+----------------+------------| * | | | * |--------+-------| allowed range * | * current commit index * * So, on overflow it looks like that: * UINT32_MAX 0 * |--------------------------+-----------||---+------------| * | | | * |--------+-------| allowed range * | * current commit index * * This should be possible by using the same datatype and relying * on the under/overflow semantics. * * * Having 30 bits available, and assuming an expire time of * one minute and a (high) commit index step of 64 == 2^6 (because * of weights), we get 2^24 minutes of range - which is ~750 * years. "Should be enough for everybody." */ static inline int index_is_higher_than(uint32_t c_high, uint32_t c_low) { uint32_t diff; if (c_high == c_low) return 0; diff = c_high - c_low; if (diff < UINT32_MAX/4) return 1; diff = c_low - c_high; if (diff < UINT32_MAX/4) return 0; assert(!"commit index out of range - invalid"); } static inline uint32_t index_max2(uint32_t a, uint32_t b) { return index_is_higher_than(a, b) ? a : b; } static inline uint32_t index_max3(uint32_t a, uint32_t b, uint32_t c) { return index_max2( index_max2(a, b), c); } static inline double timeval_to_float(struct timeval tv) { return tv.tv_sec + tv.tv_usec*(double)1.0e-6; } static inline int timeval_msec(struct timeval tv) { int m; m = tv.tv_usec / 1000; if (m >= 1000) m = 999; return m; } static inline int timeval_compare(struct timeval tv1, struct timeval tv2) { if (tv1.tv_sec < tv2.tv_sec) return -1; if (tv1.tv_sec > tv2.tv_sec) return +1; if (tv1.tv_usec < tv2.tv_usec) return -1; if (tv1.tv_usec > tv2.tv_usec) return +1; return 0; } static inline int timeval_in_past(struct timeval which) { struct timeval tv; gettimeofday(&tv, NULL); return timeval_compare(tv, which) > 0; } static inline time_t next_vote_starts_at(struct ticket_config *tk) { time_t half_exp, retries_needed, t; /* If not owner, don't renew. */ if (tk->leader != local) return 0; /* Try to renew at half of expiry time. */ half_exp = tk->term_expires - tk->term_duration/2; /* Also start renewal if we couldn't get * a few message retransmission in the alloted * expiry time. */ retries_needed = tk->term_expires - tk->timeout * tk->retries/2; /* Return earlier timestamp. */ t = min(half_exp, retries_needed); /* Return earlier timestamp if we need to delay the grant. */ if (tk->delay_grant > time(NULL)) t = min(tk->delay_grant, t); return t; } static inline int should_start_renewal(struct ticket_config *tk) { time_t now, when; when = next_vote_starts_at(tk); if (!when) return 0; time(&now); return when <= now; } static inline void expect_replies(struct ticket_config *tk, int reply_type) { + tk->retry_number = 0; tk->acks_expected = reply_type; tk->acks_received = local->bitmask; tk->req_sent_at = time(NULL); tk->ticket_updated = 0; } +static inline void no_resends(struct ticket_config *tk) +{ + tk->retry_number = 0; + tk->acks_expected = 0; + tk->acks_received = 0; +} + static inline int send_heartbeat(struct ticket_config *tk) { expect_replies(tk, OP_HEARTBEAT); return ticket_broadcast(tk, OP_HEARTBEAT, RLT_SUCCESS, 0); } static inline struct booth_site *my_vote(struct ticket_config *tk) { return tk->votes_for[ local->index ]; } static inline int count_bits(uint64_t val) { return __builtin_popcount(val); } static inline int majority_of_bits(struct ticket_config *tk, uint64_t val) { /* Use ">" to get majority decision, even for an even number * of participants. */ return count_bits(val) * 2 > booth_conf->site_count; } static inline int all_replied(struct ticket_config *tk) { return !(tk->acks_received ^ booth_conf->all_bits); } static inline int all_sites_replied(struct ticket_config *tk) { return !((tk->acks_received & booth_conf->sites_bits) ^ booth_conf->sites_bits); } #endif diff --git a/src/raft.c b/src/raft.c index ba4d7ab..07eadc0 100644 --- a/src/raft.c +++ b/src/raft.c @@ -1,905 +1,883 @@ /* * Copyright (C) 2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include "booth.h" #include "transport.h" #include "inline-fn.h" #include "config.h" #include "raft.h" #include "ticket.h" #include "log.h" inline static void clear_election(struct ticket_config *tk) { int i; struct booth_site *site; tk_log_debug("clear election"); tk->votes_received = 0; foreach_node(i, site) tk->votes_for[site->index] = NULL; } inline static void record_vote(struct ticket_config *tk, struct booth_site *who, struct booth_site *vote) { tk_log_debug("site %s votes for %s", site_string(who), site_string(vote)); if (!tk->votes_for[who->index]) { tk->votes_for[who->index] = vote; tk->votes_received |= who->bitmask; } else { if (tk->votes_for[who->index] != vote) tk_log_warn("%s voted previously " "for %s and now wants to vote for %s (ignored)", site_string(who), site_string(tk->votes_for[who->index]), site_string(vote)); } } static int cmp_msg_ticket(struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg) { if (tk->current_term != ntohl(msg->ticket.term)) { return tk->current_term - ntohl(msg->ticket.term); } /* compare commit_index only from the leader */ if (sender == leader) { return tk->commit_index - ntohl(msg->ticket.leader_commit); } return 0; } static void update_term_from_msg(struct ticket_config *tk, struct boothc_ticket_msg *msg) { uint32_t i; i = ntohl(msg->ticket.term); /* if we failed to start the election, then accept the term * from the leader * */ if (tk->state == ST_CANDIDATE) { tk->current_term = i; } else { tk->current_term = max(i, tk->current_term); } /* § 5.3 */ i = ntohl(msg->ticket.leader_commit); tk->commit_index = max(i, tk->commit_index); } static void update_ticket_from_msg(struct ticket_config *tk, struct boothc_ticket_msg *msg) { int duration; duration = tk->term_duration; if (msg) duration = min(duration, ntohl(msg->ticket.term_valid_for)); tk->term_expires = time(NULL) + duration; if (msg) { update_term_from_msg(tk, msg); } } static void become_follower(struct ticket_config *tk, struct boothc_ticket_msg *msg) { update_ticket_from_msg(tk, msg); tk->state = ST_FOLLOWER; tk->delay_grant = 0; } struct booth_site *majority_votes(struct ticket_config *tk) { int i, n; struct booth_site *v; int count[MAX_NODES] = { 0, }; for(i=0; isite_count; i++) { v = tk->votes_for[i]; if (!v) continue; n = v->index; count[n]++; tk_log_debug("Majority: %d %s wants %d %s => %d", i, site_string(&booth_conf->site[i]), n, site_string(v), count[n]); if (count[n]*2 <= booth_conf->site_count) continue; tk_log_debug("Majority reached: %d of %d for %s", count[n], booth_conf->site_count, site_string(v)); return v; } return NULL; } static int newer_term(struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg, int in_election) { uint32_t term; term = ntohl(msg->ticket.term); /* §5.1 */ if (term > tk->current_term) { tk->state = ST_FOLLOWER; if (!in_election) { tk->leader = leader; tk_log_info("from %s: higher term %d vs. %d, following %s", site_string(sender), term, tk->current_term, ticket_leader_string(tk)); } else { tk->leader = no_leader; tk_log_debug("from %s: higher term %d vs. %d (election)", site_string(sender), term, tk->current_term); } tk->current_term = term; return 1; } return 0; } static int term_too_low(struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg) { uint32_t term; term = ntohl(msg->ticket.term); /* §5.1 */ if (term < tk->current_term) { tk_log_info("sending reject to %s, its term too low " "(%d vs. %d)", site_string(sender), term, tk->current_term ); send_reject(sender, tk, RLT_TERM_OUTDATED); return 1; } return 0; } /* For follower. */ static int answer_HEARTBEAT ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; struct boothc_ticket_msg omsg; term = ntohl(msg->ticket.term); tk_log_debug("leader: %s, have %s; term %d vs %d", site_string(leader), ticket_leader_string(tk), term, tk->current_term); /* got heartbeat, no rejects expected anymore */ tk->expect_more_rejects = 0; /* if we're candidate, it may be that we got a heartbeat from * a legitimate leader, so don't ignore a lower term */ if (tk->state != ST_CANDIDATE && term < tk->current_term) { tk_log_info("ignoring lower term %d vs. %d, from %s", term, tk->current_term, ticket_leader_string(tk)); return 0; } /* Needed? */ newer_term(tk, sender, leader, msg, 0); become_follower(tk, msg); /* Racy??? */ assert(sender == leader || !leader); tk->leader = leader; /* Ack the heartbeat (we comply). */ init_ticket_msg(&omsg, OP_HEARTBEAT, RLT_SUCCESS, 0, tk); return booth_udp_send(sender, &omsg, sizeof(omsg)); } static int process_UPDATE ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; term = ntohl(msg->ticket.term); tk_log_debug("leader: %s, have %s; term %d vs %d", site_string(leader), ticket_leader_string(tk), term, tk->current_term); /* No reject. (?) */ if (term < tk->current_term) { tk_log_info("ignoring lower term %d vs. %d, from %s", term, tk->current_term, ticket_leader_string(tk)); return 0; } update_ticket_from_msg(tk, msg); ticket_write(tk); /* run ticket_cron if the ticket expires */ set_ticket_wakeup(tk); return 0; } static int process_REVOKE ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { if (tk->leader != sender) { tk_log_error("%s wants to revoke ticket, " "but it is not granted there (ignoring)", site_string(sender)); return 1; } else if (tk->state != ST_FOLLOWER) { tk_log_error("unexpected ticket revoke from %s " "(in state %s) (ignoring)", site_string(sender), state_to_string(tk->state)); return 1; } else { tk_log_info("%s revokes ticket", site_string(tk->leader)); reset_ticket(tk); ticket_write(tk); } return 0; } /* is it safe to commit the grant? * if we didn't hear from all sites on the initial grant, we may * need to delay the commit * * TODO: investigate possibility to devise from history whether a * missing site could be holding a ticket or not */ static int ticket_dangerous(struct ticket_config *tk) { if (!tk->delay_grant) return 0; if (tk->delay_grant < time(NULL) || all_sites_replied(tk)) { tk->delay_grant = 0; return 0; } return 1; } /* update the ticket on the leader, write it to the CIB, and send out the update message to others with the new expiry time */ int leader_update_ticket(struct ticket_config *tk) { struct boothc_ticket_msg msg; int rv = 0; if( tk->ticket_updated ) return 0; tk->ticket_updated = 1; tk->term_expires = time(NULL) + tk->term_duration; if (!ticket_dangerous(tk)) { ticket_write(tk); init_ticket_msg(&msg, OP_UPDATE, RLT_SUCCESS, 0, tk); rv = transport()->broadcast(&msg, sizeof(msg)); } else { tk_log_info("delaying ticket commit to CIB until %s " "(or all sites are reached)", ctime(&tk->delay_grant)); } - set_ticket_wakeup(tk); return rv; } /* For leader. */ static int process_HEARTBEAT( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; term = ntohl(msg->ticket.term); if (newer_term(tk, sender, leader, msg, 0)) { /* unexpected higher term */ tk_log_warn("got higher term from %s (%d vs. %d)", site_string(sender), term, tk->current_term); return 0; } /* Don't send a reject. */ if (term < tk->current_term) { /* Doesn't know what he's talking about - perhaps * doesn't receive our packets? */ tk_log_warn("unexpected term " "from %s (%d vs. %d) (ignoring)", site_string(sender), term, tk->current_term); return 0; } if (term == tk->current_term && leader == tk->leader) { if (majority_of_bits(tk, tk->acks_received) && !ticket_dangerous(tk)) { /* OK, at least half of the nodes are reachable; * Update the ticket and send update messages out */ return leader_update_ticket(tk); } } return 0; } void leader_elected( struct ticket_config *tk, struct booth_site *new_leader ) { if (new_leader) { tk->leader = new_leader; tk->term_expires = time(NULL) + tk->term_duration; tk->election_end = 0; tk->voted_for = NULL; - tk->retry_number = 0; if (new_leader == local) { tk_log_info("the ticket is granted here"); tk->commit_index++; tk->state = ST_LEADER; send_heartbeat(tk); ticket_activate_timeout(tk); } else { tk_log_info("ticket granted at %s", site_string(new_leader)); become_follower(tk, NULL); set_ticket_wakeup(tk); } } } static int process_VOTE_FOR( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { if (term_too_low(tk, sender, leader, msg)) return 0; if (newer_term(tk, sender, leader, msg, 0)) { clear_election(tk); } /* leader wants to step down? */ if (leader == no_leader && sender == tk->leader && (tk->state == ST_FOLLOWER || tk->state == ST_CANDIDATE)) { tk_log_info("%s wants to give the ticket away", site_string(tk->leader)); return new_round(tk, OR_STEPDOWN); } record_vote(tk, sender, leader); if (tk->state != ST_CANDIDATE) { /* lost candidate status, somebody rejected our proposal */ return 0; } /* only if all voted can we take the ticket now, otherwise * wait for timeout in ticket_cron */ if (!tk->acks_expected) { /* §5.2 */ leader_elected(tk, majority_votes(tk)); } return 0; } static int process_REJECTED( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t rv; rv = ntohl(msg->header.result); if (tk->state == ST_CANDIDATE && rv == RLT_TERM_OUTDATED) { tk_log_warn("ticket outdated (term %d), granted at %s", ntohl(msg->ticket.term), site_string(leader) ); tk->leader = leader; tk->expect_more_rejects = 1; become_follower(tk, msg); return 0; } if (tk->state == ST_CANDIDATE && rv == RLT_TERM_STILL_VALID) { tk_log_warn("ticket was granted at %s " "(and we didn't know)", site_string(leader)); tk->leader = leader; tk->expect_more_rejects = 1; become_follower(tk, msg); return 0; } if (tk->state == ST_CANDIDATE && rv == RLT_YOU_OUTDATED) { tk->leader = leader; tk->expect_more_rejects = 1; if (leader && leader != no_leader) { tk_log_warn("our ticket is outdated, granted at %s", site_string(leader)); become_follower(tk, msg); } else { tk_log_warn("our ticket is outdated and revoked"); update_ticket_from_msg(tk, msg); tk->state = ST_INIT; } return 0; } if (!tk->expect_more_rejects) { tk_log_warn("from %s: in state %s, got %s (unexpected reject)", site_string(sender), state_to_string(tk->state), state_to_string(rv)); } return 0; } -static int send_ticket ( - int cmd, - struct ticket_config *tk, - struct booth_site *to_site - ) -{ - struct boothc_ticket_msg omsg; - - - if (cmd == OP_MY_INDEX) { - tk_log_info("sending status to %s", - site_string(to_site)); - } - init_ticket_msg(&omsg, cmd, RLT_SUCCESS, 0, tk); - return booth_udp_send(to_site, &omsg, sizeof(omsg)); -} - static int ticket_seems_ok(struct ticket_config *tk) { int time_left; time_left = term_time_left(tk); if (!time_left) return 0; /* quite sure */ if (tk->state == ST_CANDIDATE) return 0; /* in state of flux */ if (tk->state == ST_LEADER) return 1; /* quite sure */ if (tk->state == ST_FOLLOWER && time_left >= tk->term_duration/3) return 1; /* almost quite sure */ return 0; } static int test_reason( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int reason; reason = ntohl(msg->header.reason); if (reason == OR_TKT_LOST) { if (tk->state == ST_INIT) { tk_log_warn("%s claims that the ticket is lost, " "but it's in %s state (reject sent)", site_string(sender), state_to_string(tk->state) ); return RLT_YOU_OUTDATED; } if (ticket_seems_ok(tk)) { tk_log_warn("%s claims that the ticket is lost, " "but it is ok here (reject sent)", site_string(sender)); return RLT_TERM_STILL_VALID; } } return 0; } /* §5.2 */ static int answer_REQ_VOTE( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; int valid; struct boothc_ticket_msg omsg; cmd_result_t inappr_reason; inappr_reason = test_reason(tk, sender, leader, msg); if (inappr_reason) return send_reject(sender, tk, inappr_reason); term = ntohl(msg->ticket.term); /* Important: Ignore duplicated packets! */ valid = term_time_left(tk); if (valid && term == tk->current_term && sender == tk->leader) { tk_log_debug("Duplicate OP_VOTE_FOR ignored."); return 0; } if (valid) { tk_log_warn("election rejected, term still valid for %ds", valid); return send_reject(sender, tk, RLT_TERM_STILL_VALID); } if (term_too_low(tk, sender, leader, msg)) return 0; /* if it's a newer term or ... */ if (newer_term(tk, sender, leader, msg, 1)) { clear_election(tk); goto vote_for_sender; } /* ... we didn't vote yet, then vote for the sender */ /* §5.2, §5.4 */ if (!tk->voted_for) { vote_for_sender: tk->voted_for = sender; record_vote(tk, sender, leader); } init_ticket_msg(&omsg, OP_VOTE_FOR, RLT_SUCCESS, 0, tk); omsg.ticket.leader = htonl(get_node_id(tk->voted_for)); return booth_udp_send(sender, &omsg, sizeof(omsg)); } int new_election(struct ticket_config *tk, struct booth_site *preference, int update_term, cmd_reason_t reason) { struct booth_site *new_leader; time_t now; static cmd_reason_t last_reason; time(&now); tk_log_debug("start new election?, now=%" PRIi64 ", end %" PRIi64, (int64_t)now, (int64_t)(tk->election_end)); if (now <= tk->election_end) return 0; /* §5.2 */ /* If there was _no_ answer, don't keep incrementing the term number * indefinitely. If there was no peer, there'll probably be no one * listening now either. However, we don't know if we were * invoked due to a timeout (caller does). */ if (update_term) tk->current_term++; tk->term_expires = 0; tk->election_end = now + tk->timeout; tk_log_info("starting new election (term=%d, until %s)", tk->current_term, ctime(&tk->election_end)); clear_election(tk); if(preference) new_leader = preference; else new_leader = (local->type == SITE) ? local : NULL; record_vote(tk, local, new_leader); tk->voted_for = new_leader; tk->state = ST_CANDIDATE; /* some callers may want just to repeat on timeout */ if (reason == OR_AGAIN) { reason = last_reason; } else { last_reason = reason; } expect_replies(tk, OP_VOTE_FOR); ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS, reason); ticket_activate_timeout(tk); return 0; } /* we were a leader and somebody says that they have a more up * to date ticket * there was probably connectivity loss * tricky */ static int leader_handle_newer_ticket( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { if (leader == no_leader || !leader || leader == local) { /* at least nobody else owns the ticket */ /* it is not kosher to update from their copy, but since * they don't own the ticket, nothing bad can happen */ update_term_from_msg(tk, msg); /* get the ticket again, if we can */ tk_log_info("trying to reclaim the ticket"); return acquire_ticket(tk, OR_REACQUIRE); } /* eek, two leaders, split brain */ /* normally shouldn't happen; run election */ tk_log_error("from %s: ticket granted at %s! (revoking locally)", site_string(sender), site_string(leader) ); return new_round(tk, OR_SPLIT); } /* reply to STATUS */ static int process_MY_INDEX ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int i; int rv; if (!msg->ticket.term_valid_for) { /* ticket not valid */ return 0; } i = cmp_msg_ticket(tk, sender, leader, msg); if (i > 0) { /* let them know about our newer ticket */ - send_ticket(OP_MY_INDEX, tk, sender); + send_msg(OP_MY_INDEX, tk, sender); if (tk->state == ST_LEADER) { tk_log_info("sending update to %s", site_string(sender)); - return send_ticket(OP_UPDATE, tk, sender); + return send_msg(OP_UPDATE, tk, sender); } } /* they have a newer ticket, trouble if we're already leader * for it */ if (i < 0 && tk->state == ST_LEADER) { tk_log_warn("from %s: more up to date ticket at %s", site_string(sender), site_string(leader) ); return leader_handle_newer_ticket(tk, sender, leader, msg); } update_ticket_from_msg(tk, msg); tk->leader = leader; if (leader == local) { rv = test_external_prog(tk, 1); if (!rv) { /* if we were the leader but we rebooted in the * meantime; try to get the ticket again */ tk->state = ST_LEADER; - tk->retry_number = 0; tk_log_info("trying to reclaim the ticket"); rv = send_heartbeat(tk); ticket_activate_timeout(tk); } return rv; } else { if (!leader || leader == no_leader) { tk_log_info("ticket is not granted"); tk->state = ST_INIT; } else { tk_log_info("ticket granted at %s (says %s)", site_string(leader), site_string(sender)); tk->state = ST_FOLLOWER; } set_ticket_wakeup(tk); } return 0; } int raft_answer( struct ticket_config *tk, struct booth_site *from, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int cmd; int rv; rv = 0; cmd = ntohl(msg->header.cmd); - R(tk); tk_log_debug("got message %s from %s", state_to_string(cmd), site_string(from)); switch (cmd) { case OP_REQ_VOTE: rv = answer_REQ_VOTE(tk, from, leader, msg); break; case OP_VOTE_FOR: rv = process_VOTE_FOR(tk, from, leader, msg); break; case OP_HEARTBEAT: if (tk->leader == local && tk->state == ST_LEADER) rv = process_HEARTBEAT(tk, from, leader, msg); else if (tk->leader != local && (tk->state == ST_INIT ||tk->state == ST_FOLLOWER || tk->state == ST_CANDIDATE)) rv = answer_HEARTBEAT(tk, from, leader, msg); else { tk_log_warn("unexpected message %s, from %s", state_to_string(cmd), site_string(from)); rv = -EINVAL; } break; case OP_UPDATE: if (tk->leader != local && tk->state == ST_FOLLOWER) { rv = process_UPDATE(tk, from, leader, msg); } else { tk_log_warn("unexpected message %s, from %s", state_to_string(cmd), site_string(from)); rv = -EINVAL; } break; case OP_REJECTED: rv = process_REJECTED(tk, from, leader, msg); break; case OP_REVOKE: rv = process_REVOKE(tk, from, leader, msg); break; case OP_MY_INDEX: rv = process_MY_INDEX(tk, from, leader, msg); break; case OP_STATUS: - rv = send_ticket(OP_MY_INDEX, tk, from); + rv = send_msg(OP_MY_INDEX, tk, from); break; default: tk_log_error("unknown message %s, from %s", state_to_string(cmd), site_string(from)); rv = -EINVAL; } - R(tk); return rv; } diff --git a/src/ticket.c b/src/ticket.c index 5e16d08..0c344ad 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -1,728 +1,769 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include "ticket.h" #include "config.h" #include "pacemaker.h" #include "inline-fn.h" #include "log.h" #include "booth.h" #include "raft.h" #include "handler.h" #define TK_LINE 256 /* Untrusted input, must fit (incl. \0) in a buffer of max chars. */ int check_max_len_valid(const char *s, int max) { int i; for(i=0; iticket_count; i++) { if (!strcmp(booth_conf->ticket[i].name, ticket)) { if (found) *found = booth_conf->ticket + i; return 1; } } return 0; } int check_ticket(char *ticket, struct ticket_config **found) { if (found) *found = NULL; if (!booth_conf) return 0; if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name))) return 0; return find_ticket_by_name(ticket, found); } int check_site(char *site, int *is_local) { struct booth_site *node; if (!check_max_len_valid(site, sizeof(node->addr_string))) return 0; if (find_site_by_name(site, &node, 0)) { *is_local = node->local; return 1; } return 0; } int ticket_write(struct ticket_config *tk) { if (local->type != SITE) return -EINVAL; disown_if_expired(tk); if (tk->leader == local) { pcmk_handler.grant_ticket(tk); } else { pcmk_handler.revoke_ticket(tk); } return 0; } /* Ask an external program whether getting the ticket * makes sense. * Eg. if the services have a failcount of INFINITY, * we can't serve here anyway. */ int test_external_prog(struct ticket_config *tk, int start_election) { int rv; rv = run_handler(tk, tk->ext_verifier, 1); if (rv) { tk_log_warn("we are not allowed to acquire ticket"); /* Give it to somebody else. * Just send a VOTE_FOR message, so the * others can start elections. */ if (leader_and_valid(tk)) { disown_ticket(tk); if (start_election) { ticket_broadcast(tk, OP_VOTE_FOR, RLT_SUCCESS, OR_LOCAL_FAIL); } } } return rv; } /* Try to acquire a ticket * Could be manual grant or after ticket loss */ int acquire_ticket(struct ticket_config *tk, cmd_reason_t reason) { if (test_external_prog(tk, 0)) return RLT_EXT_FAILED; return new_election(tk, local, 1, reason); } /** Try to get the ticket for the local site. * */ int do_grant_ticket(struct ticket_config *tk, int options) { int rv; tk_log_info("granting ticket"); if (tk->leader == local) return RLT_SUCCESS; if (is_owned(tk)) return RLT_OVERGRANT; tk->delay_grant = time(NULL) + tk->term_duration + tk->acquire_after; if (options & OPT_IMMEDIATE) { tk_log_warn("granting ticket immediately! If there are " "unreachable sites, _hope_ you are sure that they don't " "have the ticket!"); tk->delay_grant = 0; } rv = acquire_ticket(tk, OR_ADMIN); if (rv) tk->delay_grant = 0; return rv; } /** Ticket revoke. * Only to be started from the leader. */ int do_revoke_ticket(struct ticket_config *tk) { tk_log_info("revoking ticket"); reset_ticket(tk); ticket_write(tk); return ticket_broadcast(tk, OP_REVOKE, RLT_SUCCESS, OR_ADMIN); } int list_ticket(char **pdata, unsigned int *len) { struct ticket_config *tk; char timeout_str[64]; char pending_str[64]; char *data, *cp; int i, alloc; *pdata = NULL; *len = 0; alloc = 256 + booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128); data = malloc(alloc); if (!data) return -ENOMEM; cp = data; foreach_ticket(i, tk) { if (tk->term_expires != 0) strftime(timeout_str, sizeof(timeout_str), "%F %T", localtime(&tk->term_expires)); else strcpy(timeout_str, "INF"); if (tk->leader == local && tk->delay_grant > time(NULL)) { strcpy(pending_str, " (pending until "); strftime(pending_str + strlen(" (pending until "), sizeof(pending_str) - strlen(" (pending until ") - 1, "%F %T", localtime(&tk->delay_grant)); strcat(pending_str, ")"); } else *pending_str = '\0'; cp += snprintf(cp, alloc - (cp - data), "ticket: %s, leader: %s, expires: %s, commit: %d%s\n", tk->name, ticket_leader_string(tk), timeout_str, tk->commit_index, pending_str); if (alloc - (cp - data) <= 0) return -ENOMEM; } *pdata = data; *len = cp - data; return 0; } void reset_ticket(struct ticket_config *tk) { disown_ticket(tk); tk->state = ST_INIT; tk->voted_for = NULL; } int setup_ticket(void) { struct ticket_config *tk; int i; foreach_ticket(i, tk) { reset_ticket(tk); if (local->type == SITE) { pcmk_handler.load_ticket(tk); if (time(NULL) >= tk->term_expires) { reset_ticket(tk); ticket_write(tk); } } /* if the ticket is uptodate and belongs to us, try with * the heartbeat */ if (tk->is_granted && tk->leader == local) { if (!test_external_prog(tk, 1)) { tk->state = ST_LEADER; send_heartbeat(tk); ticket_activate_timeout(tk); } } else { /* otherwise, query status */ tk_log_info("broadcasting state query"); ticket_broadcast(tk, OP_STATUS, RLT_SUCCESS, 0); } } return 0; } int ticket_answer_list(int fd, struct boothc_ticket_msg *msg) { char *data; int olen, rv; struct boothc_header hdr; rv = list_ticket(&data, &olen); if (rv < 0) return rv; init_header(&hdr, CMR_LIST, 0, RLT_SUCCESS, 0, sizeof(hdr) + olen); return send_header_plus(fd, &hdr, data, olen); } int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { log_warn("client asked to grant unknown ticket %s", msg->ticket.id); rv = RLT_INVALID_ARG; goto reply; } if (is_owned(tk)) { log_warn("client wants to grant an (already granted!) ticket %s", msg->ticket.id); rv = RLT_OVERGRANT; goto reply; } rv = do_grant_ticket(tk, ntohl(msg->header.options)); reply: init_header(&msg->header, CMR_GRANT, 0, rv ?: RLT_ASYNC, 0, sizeof(*msg)); return send_ticket_msg(fd, msg); } int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { log_warn("client wants to revoke an unknown ticket %s", msg->ticket.id); rv = RLT_INVALID_ARG; goto reply; } if (!is_owned(tk)) { log_info("client wants to revoke a free ticket %s", msg->ticket.id); rv = RLT_TICKET_IDLE; goto reply; } if (tk->leader != local) { log_info("the ticket %s is not granted here, " "redirect to %s", msg->ticket.id, ticket_leader_string(tk)); rv = RLT_REDIRECT; goto reply; } rv = do_revoke_ticket(tk); if (rv == 0) rv = RLT_ASYNC; reply: init_ticket_msg(msg, CMR_REVOKE, rv, 0, tk); return send_ticket_msg(fd, msg); } int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_result_t res, cmd_reason_t reason) { struct boothc_ticket_msg msg; init_ticket_msg(&msg, cmd, res, reason, tk); tk_log_debug("broadcasting '%s' (term=%d, valid=%d)", state_to_string(cmd), ntohl(msg.ticket.term), ntohl(msg.ticket.term_valid_for)); return transport()->broadcast(&msg, sizeof(msg)); } int new_round(struct ticket_config *tk, cmd_reason_t reason) { int rv = 0; struct timespec delay; disown_ticket(tk); /* New vote round; §5.2 */ if (local->type == SITE) { /* delay the next election start for up to 200ms */ delay.tv_sec = 0; delay.tv_nsec = 1000000L * (long)cl_rand_from_interval(0, 200); nanosleep(&delay, NULL); rv = new_election(tk, NULL, 1, reason); ticket_write(tk); } return rv; } static void log_lost_servers(struct ticket_config *tk) { struct booth_site *n; int i; + if (tk->retry_number != 1) + /* log those that we couldn't reach, but do + * that only on the first retry + */ + return; + for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if (!(tk->acks_received & n->bitmask)) { tk_log_warn("%s %s didn't acknowledge our request, " "will retry %d times", (n->type == ARBITRATOR ? "arbitrator" : "site"), site_string(n), tk->retries); } } } +static void resend_msg(struct ticket_config *tk) +{ + struct booth_site *n; + int i; + + if (!(tk->acks_received ^ local->bitmask)) { + ticket_broadcast(tk, tk->acks_expected, RLT_SUCCESS, 0); + } else { + for (i = 0; i < booth_conf->site_count; i++) { + n = booth_conf->site + i; + if (!(tk->acks_received & n->bitmask)) { + tk_log_debug("resending %s to %s", + state_to_string(tk->acks_expected), + site_string(n) + ); + send_msg(tk->acks_expected, tk, n); + } + } + } +} + +static void handle_resends(struct ticket_config *tk) +{ + int ack_cnt; + + tk->retry_number ++; + if (!majority_of_bits(tk, tk->acks_received)) { + ack_cnt = count_bits(tk->acks_received) - 1; + if (!ack_cnt) { + tk_log_warn("no answers to heartbeat (try #%d), " + "we are alone", + tk->retry_number); + } else { + tk_log_warn("not enough answers to heartbeat (try #%d): " + "only got %d answers", + tk->retry_number, + ack_cnt); + } + } else { + log_lost_servers(tk); + /* we have the majority, update the ticket, at + * least the local copy if we're still not + * allowed to commit + */ + leader_update_ticket(tk); + } + + if (tk->retry_number <= tk->retries) { + resend_msg(tk); + ticket_activate_timeout(tk); + } else { + tk_log_debug("giving up on sending retries"); + no_resends(tk); + set_ticket_wakeup(tk); + } +} + static void ticket_cron(struct ticket_config *tk) { time_t now; - int ack_cnt; struct booth_site *new_leader; now = time(NULL); - R(tk); /* Has an owner, has an expiry date, and expiry date in the past? * Losing the ticket must happen in _every_ state. */ if (tk->term_expires && is_owned(tk) && now >= tk->term_expires) { if (tk->leader != local) { tk_log_warn("lost at %s", site_string(tk->leader)); } else { tk_log_warn("lost majority (revoking locally)"); } /* Couldn't renew in time - ticket lost. */ new_round(tk, OR_TKT_LOST); return; } - R(tk); switch(tk->state) { case ST_INIT: /* init state, nothing to do */ break; case ST_FOLLOWER: /* nothing here either, ticket loss is caught earlier * */ break; case ST_CANDIDATE: /* §5.2 */ /* not everybody answered, but if we have majority... */ new_leader = majority_votes(tk); if (new_leader) { leader_elected(tk, new_leader); } else if (now > tk->election_end) { /* This is previous election timed out */ + tk_log_info("election timed out"); new_election(tk, NULL, 0, OR_AGAIN); } break; case ST_LEADER: - /* we get here after we broadcasted a heartbeat; - * by this time all sites should've acked the heartbeat - */ + /* timeout or ticket renewal? */ if (tk->acks_expected) { - tk->retry_number ++; - if (!majority_of_bits(tk, tk->acks_received)) { - ack_cnt = count_bits(tk->acks_received) - 1; - if (!ack_cnt) { - tk_log_warn("no answers to heartbeat (try #%d), " - "we are alone", - tk->retry_number); - } else { - tk_log_warn("not enough answers to heartbeat (try #%d): " - "only got %d answers", - tk->retry_number, - ack_cnt); - } - /* Don't give up, though - there's still some time until leadership is lost. */ - } else { - if (tk->retry_number <= 1) { - /* log those that we couldn't reach, but do - * that only on the first retry - */ - log_lost_servers(tk); - } - /* we have the majority, update the ticket, at - * least the local copy if we're still not - * allowed to commit - */ - leader_update_ticket(tk); - } + handle_resends(tk); } else { - /* this is ticket renewal, check what the - * external test says */ - if (test_external_prog(tk, 1)) - return; - } - - send_heartbeat(tk); - if (tk->retry_number < tk->retries) { - ticket_activate_timeout(tk); - } else { - tk->retry_number = 0; - set_ticket_wakeup(tk); + /* this is ticket renewal, run local test */ + if (!test_external_prog(tk, 1)) { + send_heartbeat(tk); + ticket_activate_timeout(tk); + } } break; default: break; } - R(tk); } void process_tickets(void) { struct ticket_config *tk; int i; struct timeval now; float sec_until; gettimeofday(&now, NULL); foreach_ticket(i, tk) { sec_until = timeval_to_float(tk->next_cron) - timeval_to_float(now); if (0) tk_log_debug("next cron %" PRIx64 ".%03d, " "now %" PRIx64 "%03d, in %f", (uint64_t)tk->next_cron.tv_sec, timeval_msec(tk->next_cron), (uint64_t)now.tv_sec, timeval_msec(now), sec_until); if (sec_until > 0.0) continue; tk_log_debug("ticket cron"); /* Set next value, handler may override. * This should already be handled via the state logic; * but to be on the safe side the renew repetition is * duplicated here, too. */ set_ticket_wakeup(tk); ticket_cron(tk); } } void tickets_log_info(void) { struct ticket_config *tk; int i; foreach_ticket(i, tk) { tk_log_info("state '%s' " "commit index %d " "leader %s " "expires %-24.24s", state_to_string(tk->state), tk->commit_index, ticket_leader_string(tk), ctime(&tk->term_expires)); } } static void update_acks( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t cmd; cmd = ntohl(msg->header.cmd); if (tk->acks_expected != cmd) return; /* got an ack! */ tk->acks_received |= sender->bitmask; tk_log_debug("got ACK from %s, %d/%d agree.", site_string(sender), count_bits(tk->acks_received), booth_conf->site_count); if (all_replied(tk)) { - tk->acks_expected = 0; + no_resends(tk); + set_ticket_wakeup(tk); } } /* UDP message receiver. */ int message_recv(struct boothc_ticket_msg *msg, int msglen) { uint32_t from; struct booth_site *source; struct ticket_config *tk; struct booth_site *leader; uint32_t leader_u; if (check_boothc_header(&msg->header, sizeof(*msg)) < 0 || msglen != sizeof(*msg)) { log_error("message receive error"); return -1; } from = ntohl(msg->header.from); if (!find_site_by_id(from, &source) || !source) { log_error("unknown sender: %08x", from); return -1; } if (!check_ticket(msg->ticket.id, &tk)) { log_warn("got invalid ticket name %s from %s", msg->ticket.id, site_string(source)); return -EINVAL; } leader_u = ntohl(msg->ticket.leader); if (!find_site_by_id(leader_u, &leader)) { tk_log_error("message with unknown leader %u received", leader_u); return -EINVAL; } update_acks(tk, source, leader, msg); return raft_answer(tk, source, leader, msg); } void set_ticket_wakeup(struct ticket_config *tk) { struct timeval tv, now; /* At least every hour, perhaps sooner. */ ticket_next_cron_in(tk, 3600); switch (tk->state) { case ST_LEADER: assert(tk->leader == local); gettimeofday(&now, NULL); tv = now; tv.tv_sec = next_vote_starts_at(tk); /* If timestamp is in the past, look again in one second. */ if (timeval_compare(tv, now) <= 0) tv.tv_sec = now.tv_sec + 1; ticket_next_cron_at(tk, tv); break; case ST_CANDIDATE: assert(tk->election_end); ticket_next_cron_at_coarse(tk, tk->election_end); break; case ST_INIT: case ST_FOLLOWER: /* If there is (or should be) some owner, check on it later on. * If no one is interested - don't care. */ if (is_owned(tk) && (local->type == SITE)) ticket_next_cron_at_coarse(tk, tk->term_expires + tk->acquire_after); break; default: tk_log_error("unknown ticket state: %d", tk->state); } } /* Given a state (in host byte order), return a human-readable (char*). * An array is used so that multiple states can be printed in a single printf(). */ char *state_to_string(uint32_t state_ho) { union mu { cmd_request_t s; char c[5]; }; static union mu cache[6] = { { 0 } }, *cur; static int current = 0; current ++; if (current >= sizeof(cache)/sizeof(cache[0])) current = 0; cur = cache + current; cur->s = htonl(state_ho); /* Shouldn't be necessary, union array is initialized with zeroes, and * these bytes never get written. */ cur->c[4] = 0; return cur->c; } int send_reject(struct booth_site *dest, struct ticket_config *tk, cmd_result_t code) { struct boothc_ticket_msg msg; - init_ticket_msg(&msg, OP_REJECTED, code, 0, tk); return booth_udp_send(dest, &msg, sizeof(msg)); } + +int send_msg ( + int cmd, + struct ticket_config *tk, + struct booth_site *dest + ) +{ + struct boothc_ticket_msg msg; + + if (cmd == OP_MY_INDEX) { + tk_log_info("sending status to %s", + site_string(dest)); + } + init_ticket_msg(&msg, cmd, RLT_SUCCESS, 0, tk); + return booth_udp_send(dest, &msg, sizeof(msg)); +} diff --git a/src/ticket.h b/src/ticket.h index 35db565..7a32b77 100644 --- a/src/ticket.h +++ b/src/ticket.h @@ -1,105 +1,106 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _TICKET_H #define _TICKET_H #include #include #include #include "config.h" #define DEFAULT_TICKET_EXPIRY 600 #define DEFAULT_TICKET_TIMEOUT 5 #define DEFAULT_RETRIES 10 #define foreach_ticket(i_,t_) for(i=0; (t_=booth_conf->ticket+i, iticket_count); i++) #define foreach_node(i_,n_) for(i=0; (n_=booth_conf->site+i, isite_count); i++) int check_ticket(char *ticket, struct ticket_config **tc); int check_site(char *site, int *local); int grant_ticket(struct ticket_config *ticket); int revoke_ticket(struct ticket_config *ticket); int list_ticket(char **pdata, unsigned int *len); int message_recv(struct boothc_ticket_msg *msg, int msglen); void reset_ticket(struct ticket_config *tk); int setup_ticket(void); int check_max_len_valid(const char *s, int max); int do_grant_ticket(struct ticket_config *ticket, int options); int do_revoke_ticket(struct ticket_config *tk); int find_ticket_by_name(const char *ticket, struct ticket_config **found); void set_ticket_wakeup(struct ticket_config *tk); int test_external_prog(struct ticket_config *tk, int start_election); int acquire_ticket(struct ticket_config *tk, cmd_reason_t reason); int new_round(struct ticket_config *tk, cmd_reason_t reason); int ticket_answer_list(int fd, struct boothc_ticket_msg *msg); int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg); int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg); int ticket_broadcast_proposed_state(struct ticket_config *tk, cmd_request_t state); int ticket_write(struct ticket_config *tk); void process_tickets(void); void tickets_log_info(void); char *state_to_string(uint32_t state_ho); int send_reject(struct booth_site *dest, struct ticket_config *tk, cmd_result_t code); +int send_msg (int cmd, struct ticket_config *tk, struct booth_site *dest); int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_result_t res, cmd_reason_t reason); static inline void ticket_next_cron_at(struct ticket_config *tk, struct timeval when) { tk->next_cron = when; } static inline void ticket_next_cron_at_coarse(struct ticket_config *tk, time_t when) { tk->next_cron.tv_sec = when; tk->next_cron.tv_usec = 0; } static inline void ticket_next_cron_in(struct ticket_config *tk, float seconds) { struct timeval tv; gettimeofday(&tv, NULL); tv.tv_sec += trunc(seconds); tv.tv_usec += (seconds - trunc(seconds)) * 1e6; ticket_next_cron_at(tk, tv); } static inline void ticket_activate_timeout(struct ticket_config *tk) { /* TODO: increase timeout when no answers */ ticket_next_cron_in(tk, tk->timeout); } #endif /* _TICKET_H */