diff --git a/src/paxos.c b/src/paxos.c index fa9434d..5eec830 100644 --- a/src/paxos.c +++ b/src/paxos.c @@ -1,489 +1,479 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include "booth.h" #include "transport.h" #include "inline-fn.h" #include "config.h" #include "paxos.h" #include "log.h" static uint32_t next_ballot_number(struct ticket_config *tk) { uint32_t b; /* TODO: getenv() for debugging */ b = tk->new_ballot; /* + unique number */ b += local->bitmask; /* + weight */ b += booth_conf->site_bits * tk->weight[ local->index ]; return b; } static inline void set_proposal_in_ticket(struct ticket_config *tk, struct booth_site *from, uint32_t ballot, struct booth_site *new_owner) { tk->proposer = from; tk->new_ballot = ballot; tk->proposed_owner = new_owner; tk->proposal_expires = 0; // TODO - needed? tk->proposal_acknowledges = from->bitmask | local->bitmask; /* We lose (?) */ tk->state = ST_STABLE; } int should_switch_state_p(struct ticket_config *tk) { if (all_agree(tk)) { log_debug("all agree"); return 1; } if (majority_agree(tk)) { /* Time passed, and more than half agree. */ if (timeval_in_past(tk->proposal_switch)) { log_debug("majority, and enough time passed"); return 2; } if (!tk->proposal_switch.tv_sec) { log_debug("majority, wait half a second"); /* Wait half a second before doing the state change. */ ticket_next_cron_in(tk, 0.5); tk->proposal_switch = tk->next_cron; } } return 0; } static int retries_exceeded(struct ticket_config *tk) { tk->retry_number ++; if (tk->retry_number > RETRIES) { log_info("ABORT %s for ticket \"%s\" - " "not enough answers after %d retries", tk->state == OP_PREPARING ? "prepare" : "propose", tk->name, tk->retry_number); abort_proposal(tk); } else { /* We ask others for a change; retry to get * consensus. * But don't ask again immediately after a * query, give the peers time to answer. */ if (timeval_in_past(tk->proposal_switch)) { ticket_broadcast_proposed_state(tk, tk->state); ticket_activate_timeout(tk); } } return 0; } static inline void change_ticket_owner(struct ticket_config *tk, uint32_t ballot, struct booth_site *new_owner) { - int next; - /* set "previous" value for next round */ tk->last_ack_ballot = tk->new_ballot = ballot; tk->owner = new_owner; tk->expires = time(NULL) + tk->expiry; tk->proposer = NULL; tk->state = ST_STABLE; - if (new_owner == local) { - next = tk->expiry / 2; - if (tk->timeout * RETRIES/2 < next) - next = tk->timeout; - ticket_next_cron_in(tk, next); - } - else - ticket_next_cron_in(tk, tk->expiry + tk->acquire_after); - + set_ticket_wakeup(tk); log_info("Now actively COMMITTED for \"%s\": new owner %s, ballot %d", tk->name, ticket_owner_string(tk->owner), ballot); ticket_write(tk); } void abort_proposal(struct ticket_config *tk) { log_info("ABORTing proposal."); tk->proposer = NULL; tk->proposed_owner = tk->owner; tk->retry_number = 0; /* Ask others (repeatedly) until we know the new owner. */ tk->state = ST_INIT; } int PROPOSE_to_COMMIT(struct ticket_config *tk) { if (should_switch_state_p(tk)) { change_ticket_owner(tk, tk->new_ballot, tk->proposed_owner); return ticket_broadcast_proposed_state(tk, OP_COMMITTED); } return retries_exceeded(tk); } int PREPARE_to_PROPOSE(struct ticket_config *tk) { if (should_switch_state_p(tk)) { return ticket_broadcast_proposed_state(tk, OP_PROPOSING); } return retries_exceeded(tk); } /** \defgroup msghdl Message handling functions. * * Not all use all arguments; but to keep the interface the same, * they're all just passed everything we have. * * See also enum \ref cmd_request_t. * @{ */ /** Start a PAXOS round, by sending out an OP_PREPARING. */ int paxos_start_round(struct ticket_config *tk, struct booth_site *new_owner) { if (tk->state != ST_STABLE) return RLT_BUSY; /* This may not be called repeatedly from cron, * because the ballot number would simply * get counted up without any benefit. * The message may get retransmitted, though. * Normal retry behaviour gets achieved during * state OP_PREPARING anyway. */ tk->proposer = local; tk->new_ballot = next_ballot_number(tk); tk->proposed_owner = new_owner; tk->retry_number = 0; ticket_activate_timeout(tk); /* TODO: shorten renew exchange by just sending * a new proposal? Ballot numbers should still be the * same everywhere, owner doesn't change. */ return ticket_broadcast_proposed_state(tk, OP_PREPARING); } /** Answering OP_PREPARING means sending out OP_PROMISING. */ inline static int answer_PREP( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { if (!(local->role & ACCEPTOR)) return 0; /* Ignore if packet is too late, and state is already active. */ if (tk->owner == new_owner && ballot == tk->last_ack_ballot) return 0; /* We have to be careful here. * Getting multiple copies of the same message must not trigger * rejections, but only repeated promises. */ if (from == tk->proposer && ballot == tk->new_ballot) goto promise; /* It doesn't matter whether it's the same or another host; * the only distinction is the ballot number. */ if (ballot > tk->new_ballot) { promise: msg->header.cmd = htonl(OP_PROMISING); msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); set_proposal_in_ticket(tk, from, ballot, new_owner); log_info("PROMISING for ticket \"%s\" (by %s) for %d", tk->name, from->addr_string, ballot); } else { msg->header.cmd = htonl(OP_REJECTED); msg->ticket.ballot = htonl(tk->new_ballot); msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); log_info("REJECTING (prep) for ticket \"%s\" from %s - have %d, wanted %d", tk->name, from->addr_string, tk->new_ballot, ballot); } init_header_bare(&msg->header); return booth_udp_send(from, msg, sizeof(*msg)); } /** Getting OP_REJECTED means abandoning the current operation. */ inline static int handle_REJ( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { if (tk->last_ack_ballot == ballot) { log_debug("got a late REJECTED; ignored, as " "ballot %d is already active.", tk->last_ack_ballot); return 0; } log_info("got REJECTED for ticket \"%s\", ballot %d (has %d), from %s", tk->name, tk->new_ballot, ballot, from->addr_string); abort_proposal(tk); /* TODO: should we check whether that sequence is increasing? */ tk->new_ballot = ballot_max2(tk->new_ballot, ballot); tk->last_ack_ballot = ballot_max2(tk->last_ack_ballot, ntohl(msg->ticket.prev_ballot)); /* No need to ask the others. */ tk->state = ST_STABLE; return 0; } /** After a few OP_PROMISING replies we can send out OP_PROPOSING. */ inline static int got_a_PROM( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { int had_that; if (tk->proposer == local && tk->state == OP_PREPARING && tk->new_ballot == ballot) { had_that = tk->proposal_acknowledges & from->bitmask; tk->proposal_acknowledges |= from->bitmask; log_info("Got PROMISE from %s for \"%s\", for %d, acks now 0x%" PRIx64, from->addr_string, tk->name, tk->new_ballot, tk->proposal_acknowledges); if (had_that) return 0; return PREPARE_to_PROPOSE(tk); } /* Packet just delayed? Silently ignore. */ if (ballot == tk->last_ack_ballot && (new_owner == tk->owner || new_owner == tk->proposed_owner)) return 0; /* Message sent to wrong host? */ log_debug("got unexpected PROMISE from %s for \"%s\"", from->addr_string, tk->name); return 0; } /** Answering OP_PROPOSING means sending out OP_ACCEPTING. */ inline static int answer_PROP( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { if (!(local->role & ACCEPTOR)) return 0; /* Repeated packet. */ if (new_owner == tk->owner && ballot == tk->new_ballot) goto accepting; /* If packet is late, ie. we already have that state, * just repeat the ack - perhaps it got lost. */ if (new_owner == tk->owner && ballot == tk->last_ack_ballot) goto accepting; /* We have to be careful here. * Getting multiple copies of the same message must not trigger * rejections, but only repeated OP_ACCEPTING messages. */ if (ballot > tk->last_ack_ballot && ballot == tk->new_ballot && ntohl(msg->ticket.prev_ballot) == tk->last_ack_ballot) { if (tk->proposer) { /* Send OP_REJECTED to previous proposer? */ log_info("new PROPOSAL for ticket \"%s\" overriding older one from %s", tk->name, from->addr_string); } tk->proposer = from; accepting: init_ticket_msg(msg, OP_ACCEPTING, RLT_SUCCESS, tk); log_info("sending ACCEPT for ticket \"%s\" (by %s) for %d - new owner %s", tk->name, from->addr_string, ballot, ticket_owner_string(new_owner)); change_ticket_owner(tk, ballot, new_owner); } else if (ballot == tk->last_ack_ballot && ballot == tk->new_ballot && ntohl(msg->ticket.prev_ballot) == tk->last_ack_ballot) { /* Silently ignore delayed messages. */ } else { msg->header.cmd = htonl(OP_REJECTED); msg->ticket.ballot = htonl(tk->new_ballot); msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); log_info("REJECTING (prop) for ticket \"%s\" from %s - have %d, wanted %d", tk->name, from->addr_string, tk->new_ballot, ballot); } init_header_bare(&msg->header); return booth_udp_send(from, msg, sizeof(*msg)); } /** After enough OP_ACCEPTING we can do the change, and send an OP_COMMITTED. */ inline static int got_an_ACC( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { if (tk->proposer == local && tk->state == OP_PROPOSING) { tk->proposal_acknowledges |= from->bitmask; log_info("Got ACCEPTING from %s for \"%s\", acks now 0x%" PRIx64, from->addr_string, tk->name, tk->proposal_acknowledges); return PROPOSE_to_COMMIT(tk); } return 0; } /** An OP_COMMITTED gets no answer; just record the new state. */ inline static int answer_COMM( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { /* We cannot check whether the packet is from an expected proposer - * perhaps this is the _only_ message of the whole handshake? */ if (ballot > tk->new_ballot && ntohl(msg->ticket.prev_ballot) == tk->last_ack_ballot) { change_ticket_owner(tk, ballot, new_owner); } /* Send ack? */ return 0; } /** @} */ int paxos_answer( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner_p) { int cmd; cmd = ntohl(msg->header.cmd); /* These are in roughly chronological order. * What the first machine sends is an OP_PREPARING * (see paxos_start_round()), which gets received * (below) from the others ... */ switch (cmd) { case OP_PREPARING: return answer_PREP(tk, from, msg, ballot, new_owner_p); case OP_REJECTED: return handle_REJ(tk, from, msg, ballot, new_owner_p); case OP_PROMISING: return got_a_PROM(tk, from, msg, ballot, new_owner_p); case OP_PROPOSING: return answer_PROP(tk, from, msg, ballot, new_owner_p); case OP_ACCEPTING: return got_an_ACC(tk, from, msg, ballot, new_owner_p); case OP_COMMITTED: return answer_COMM(tk, from, msg, ballot, new_owner_p); default: log_error("unprocessed message, cmd %x", cmd); return -EINVAL; } } diff --git a/src/ticket.c b/src/ticket.c index d8bd1ab..ec84a80 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -1,638 +1,659 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include "ticket.h" #include "config.h" #include "pacemaker.h" #include "inline-fn.h" #include "log.h" #include "booth.h" #include "paxos.h" #define TK_LINE 256 /* Untrusted input, must fit (incl. \0) in a buffer of max chars. */ int check_max_len_valid(const char *s, int max) { int i; for(i=0; iticket_count; i++) { if (!strcmp(booth_conf->ticket[i].name, ticket)) { if (found) *found = booth_conf->ticket + i; return 1; } } return 0; } int check_ticket(char *ticket, struct ticket_config **found) { if (found) *found = NULL; if (!booth_conf) return 0; if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name))) return 0; return find_ticket_by_name(ticket, found); } int check_site(char *site, int *is_local) { struct booth_site *node; if (!check_max_len_valid(site, sizeof(node->addr_string))) return 0; if (find_site_by_name(site, &node, 0)) { *is_local = node->local; return 1; } return 0; } /** Find out what others think about this ticket. * * If we're a SITE, we can ask (and have to tell) Pacemaker. * An ARBITRATOR can only ask others. */ static int ticket_send_catchup(struct ticket_config *tk) { int i, rv = 0; struct booth_site *site; struct boothc_ticket_msg msg; foreach_node(i, site) { if (!site->local) { init_ticket_msg(&msg, CMD_CATCHUP, RLT_SUCCESS, tk); log_debug("attempting catchup from %s", site->addr_string); rv = booth_udp_send(site, &msg, sizeof(msg)); } } ticket_activate_timeout(tk); return rv; } int ticket_write(struct ticket_config *tk) { if (local->type != SITE) return -EINVAL; disown_if_expired(tk); pcmk_handler.store_ticket(tk); if (tk->owner == local) { pcmk_handler.grant_ticket(tk); } else { pcmk_handler.revoke_ticket(tk); } return 0; } /** Try to get the ticket for the local site. * */ int do_grant_ticket(struct ticket_config *tk) { int rv; if (tk->owner == local) return RLT_SUCCESS; if (tk->owner) return RLT_OVERGRANT; rv = paxos_start_round(tk, local); return rv; } /** Start a PAXOS round for revoking. * That can be started from any site. */ int do_revoke_ticket(struct ticket_config *tk) { int rv; if (!tk->owner) return RLT_SUCCESS; rv = paxos_start_round(tk, NULL); return rv; } int list_ticket(char **pdata, unsigned int *len) { struct ticket_config *tk; char timeout_str[64]; char *data, *cp; int i, alloc; *pdata = NULL; *len = 0; alloc = 256 + booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128); data = malloc(alloc); if (!data) return -ENOMEM; cp = data; foreach_ticket(i, tk) { if (tk->expires != 0) strftime(timeout_str, sizeof(timeout_str), "%F %T", localtime(&tk->expires)); else strcpy(timeout_str, "INF"); cp += sprintf(cp, "ticket: %s, owner: %s, expires: %s, ballot: %d\n", tk->name, tk->owner ? tk->owner->addr_string : "None", timeout_str, tk->last_ack_ballot); *len = cp - data; assert(*len < alloc); } *pdata = data; return 0; } int setup_ticket(void) { struct ticket_config *tk; int i; /* TODO */ foreach_ticket(i, tk) { tk->owner = NULL; tk->expires = 0; abort_proposal(tk); if (local->role & PROPOSER) { pcmk_handler.load_ticket(tk); } } return 0; } int ticket_answer_list(int fd, struct boothc_ticket_msg *msg) { char *data; int olen, rv; struct boothc_header hdr; rv = list_ticket(&data, &olen); if (rv < 0) return rv; init_header(&hdr, CMR_LIST, RLT_SUCCESS, sizeof(hdr) + olen); return send_header_plus(fd, &hdr, data, olen); } int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { log_error("Client asked to grant unknown ticket"); rv = RLT_INVALID_ARG; goto reply; } if (tk->owner) { log_error("client wants to get an granted ticket %s", msg->ticket.id); rv = RLT_OVERGRANT; goto reply; } rv = do_grant_ticket(tk); reply: init_header(&msg->header, CMR_GRANT, rv ?: RLT_ASYNC, sizeof(*msg)); return send_ticket_msg(fd, msg); } int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { log_error("Client asked to grant unknown ticket"); rv = RLT_INVALID_ARG; goto reply; } if (!tk->owner) { log_info("client wants to revoke a free ticket \"%s\"", msg->ticket.id); rv = RLT_SUCCESS; goto reply; } rv = do_revoke_ticket(tk); reply: init_ticket_msg(msg, CMR_REVOKE, rv ?: RLT_ASYNC, tk); return send_ticket_msg(fd, msg); } /** Got a CMD_CATCHUP query. * In this file because it's mostly used during startup. */ static int ticket_answer_catchup( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { int rv; log_debug("got CATCHUP query for \"%s\" from %s", msg->ticket.id, from->addr_string); /* We do _always_ answer. * In case all booth daemons are restarted at the same time, nobody * would answer any questions, leading to timeouts and delays. * Just admit we don't know. */ rv = (tk->state == ST_INIT) ? RLT_PROBABLY_SUCCESS : RLT_SUCCESS; init_ticket_msg(msg, CMR_CATCHUP, rv, tk); return booth_udp_send(from, msg, sizeof(*msg)); } /** Got a CMR_CATCHUP message. * Gets handled here because it's not PAXOS per se, * but only needed during startup. */ static int ticket_process_catchup( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { int rv; log_info("got CATCHUP answer for \"%s\" from %s; says owner %s with ballot %d", tk->name, from->addr_string, ticket_owner_string(new_owner), ballot); rv = ntohl(msg->header.result); if (rv != RLT_SUCCESS && rv != RLT_PROBABLY_SUCCESS) { log_error("dropped because of wrong rv: 0x%x", rv); return -EINVAL; } if (ballot == tk->new_ballot && ballot == tk->last_ack_ballot && new_owner == tk->owner) { /* Peer says the same thing we're believing. */ tk->proposal_acknowledges |= from->bitmask; tk->expires = ntohl(msg->ticket.expiry) + time(NULL); if (should_switch_state_p(tk)) { if (tk->state == ST_INIT) tk->state = ST_STABLE; } disown_if_expired(tk); log_debug("catchup: peer ack 0x%" PRIx64 ", now state '%s'", tk->proposal_acknowledges, STATE_STRING(tk->state)); goto ex; } if (ticket_valid_for(tk) == 0 && !tk->owner) { /* We see the ticket as expired, and therefore don't know an owner. * So believe some other host. */ tk->state = ST_STABLE; log_debug("catchup: no owner locally, believe peer."); goto accept; } if (ballot >= tk->new_ballot && ballot >= tk->last_ack_ballot && rv == RLT_SUCCESS) { /* Peers seems to know better, but as yet we only have _her_ * word for that. */ log_debug("catchup: peer has higher ballot: %d >= %d/%d", ballot, tk->new_ballot, tk->last_ack_ballot); accept: tk->expires = ntohl(msg->ticket.expiry) + time(NULL); tk->new_ballot = ballot_max2(ballot, tk->new_ballot); tk->last_ack_ballot = ballot_max2(ballot, tk->last_ack_ballot); tk->owner = new_owner; tk->proposal_acknowledges = from->bitmask; /* We stay in ST_INIT and wait for confirmation. */ goto ex; } if (ballot >= tk->last_ack_ballot && rv == RLT_PROBABLY_SUCCESS && tk->state == ST_INIT && tk->retry_number > 3) { /* Peer seems to know better than us, and there's no * convincing other report. Just take it. */ tk->state = ST_STABLE; log_debug("catchup: exceeded retries, peer has higher ballot."); goto accept; } if (ballot < tk->new_ballot || ballot < tk->last_ack_ballot) { /* Peer seems outdated ... tell it to reload? */ log_debug("catchup: peer outdated?"); #if 0 init_ticket_msg(&msg, CMD_DO_CATCHUP, RLT_SUCCESS, tk, &tk->current_state); #endif goto ex; } log_debug("catchup: unhandled situation!"); ex: ticket_write(tk); return 0; } /** Send new state request to all sites. * Perhaps this should take a flag for ACCEPTOR etc.? * No need currently, as all nodes are more or less identical. */ int ticket_broadcast_proposed_state(struct ticket_config *tk, cmd_request_t state) { struct boothc_ticket_msg msg; if (state != tk->state) { tk->proposal_acknowledges = local->bitmask; tk->retry_number = 0; } tk->state = state; init_ticket_msg(&msg, state, RLT_SUCCESS, tk); msg.ticket.owner = htonl(get_node_id(tk->proposed_owner)); log_debug("broadcasting '%s' for ticket \"%s\"", STATE_STRING(state), tk->name); #include //sleep(1); /* Switch state after one second, if the majority says ok. */ gettimeofday(&tk->proposal_switch, NULL); tk->proposal_switch.tv_sec++; return transport()->broadcast(&msg, sizeof(msg)); } static void ticket_cron(struct ticket_config *tk) { time_t now; now = time(NULL); switch(tk->state) { case ST_INIT: /* Unknown state, ask others. */ ticket_send_catchup(tk); return; case OP_COMMITTED: case ST_STABLE: /* Has an owner, has an expiry date, and expiry date in the past? */ if (tk->expires && tk->owner && now > tk->expires) { log_info("LOST ticket: \"%s\" no longer at %s", tk->name, ticket_owner_string(tk->owner)); /* Couldn't renew in time - ticket lost. */ tk->owner = NULL; abort_proposal(tk); ticket_write(tk); if (tk->acquire_after) ticket_next_cron_in(tk, tk->acquire_after); } /* Do we need to refresh? */ if (tk->owner == local && ticket_valid_for(tk) < tk->expiry/2) { log_info("RENEW ticket \"%s\"", tk->name); paxos_start_round(tk, local); /* TODO: remember when we started, and restart afresh after some retries */ } break; case OP_PREPARING: PREPARE_to_PROPOSE(tk); break; case OP_PROPOSING: PROPOSE_to_COMMIT(tk); break; case OP_PROMISING: case OP_ACCEPTING: case OP_RECOVERY: case OP_REJECTED: break; default: break; } } void process_tickets(void) { struct ticket_config *tk; int i; struct timeval now; float sec_until; gettimeofday(&now, NULL); foreach_ticket(i, tk) { sec_until = timeval_to_float(tk->next_cron) - timeval_to_float(now); if (0) log_debug("ticket %s next cron %" PRIx64 ".%03d, " "now %" PRIx64 "%03d, in %f", tk->name, (uint64_t)tk->next_cron.tv_sec, timeval_msec(tk->next_cron), (uint64_t)now.tv_sec, timeval_msec(now), sec_until); if (sec_until > 0.0) continue; log_debug("ticket cron: doing %s", tk->name); - /* Set next value, handler may override. */ - tk->next_cron.tv_sec = INT_MAX; + + + /* Set next value, handler may override. + * This should already be handled via the state logic; + * but to be on the safe side the renew repetition is + * duplicated here, too. */ + set_ticket_wakeup(tk); + ticket_cron(tk); } } void tickets_log_info(void) { struct ticket_config *tk; int i; foreach_ticket(i, tk) { log_info("Ticket %s: state '%s' " "mask %" PRIx64 "/%" PRIx64 " " "ballot %d (current %d) " "expires %-24.24s", tk->name, STATE_STRING(tk->state), tk->proposal_acknowledges, booth_conf->site_bits, tk->last_ack_ballot, tk->new_ballot, ctime(&tk->expires)); } } /* UDP message receiver. */ int message_recv(struct boothc_ticket_msg *msg, int msglen) { int cmd, rv; uint32_t from; struct booth_site *dest; struct ticket_config *tk; struct booth_site *new_owner_p; uint32_t ballot, new_owner; if (check_boothc_header(&msg->header, sizeof(*msg)) < 0 || msglen != sizeof(*msg)) { log_error("message receive error"); return -1; } from = ntohl(msg->header.from); if (!find_site_by_id(from, &dest) || !dest) { log_error("unknown sender: %08x", from); return -1; } if (!check_ticket(msg->ticket.id, &tk)) { log_error("got invalid ticket name \"%s\" from %s", msg->ticket.id, dest->addr_string); return -EINVAL; } cmd = ntohl(msg->header.cmd); ballot = ntohl(msg->ticket.ballot); new_owner = ntohl(msg->ticket.owner); if (!find_site_by_id(new_owner, &new_owner_p)) { log_error("Message with unknown owner %x received", new_owner); return -EINVAL; } switch (cmd) { case CMD_CATCHUP: return ticket_answer_catchup(tk, dest, msg, ballot, new_owner_p); case CMR_CATCHUP: return ticket_process_catchup(tk, dest, msg, ballot, new_owner_p); default: /* only used in catchup, and not even really there ?? */ assert(ntohl(msg->header.result) == 0); rv = paxos_answer(tk, dest, msg, ballot, new_owner_p); assert((tk->proposal_acknowledges & ~booth_conf->site_bits) == 0); return rv; } return 0; } + + +void set_ticket_wakeup(struct ticket_config *tk) +{ + int next; + + if (tk->owner == local) { + next = tk->expiry / 2; + if (tk->timeout * RETRIES/2 < next) + next = tk->timeout; + ticket_next_cron_in(tk, next); + } + else + ticket_next_cron_in(tk, tk->expiry + tk->acquire_after); +} diff --git a/src/ticket.h b/src/ticket.h index 551f753..06b2b4b 100644 --- a/src/ticket.h +++ b/src/ticket.h @@ -1,91 +1,93 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _TICKET_H #define _TICKET_H #include #include #include #include "config.h" #define DEFAULT_TICKET_EXPIRY 600 #define DEFAULT_TICKET_TIMEOUT 10 #define foreach_ticket(i_,t_) for(i=0; (t_=booth_conf->ticket+i, iticket_count); i++) #define foreach_node(i_,n_) for(i=0; (n_=booth_conf->site+i, isite_count); i++) int check_ticket(char *ticket, struct ticket_config **tc); int check_site(char *site, int *local); int do_grant_ticket(struct ticket_config *ticket); int revoke_ticket(struct ticket_config *ticket); int list_ticket(char **pdata, unsigned int *len); int message_recv(struct boothc_ticket_msg *msg, int msglen); int setup_ticket(void); int check_max_len_valid(const char *s, int max); int do_grant_ticket(struct ticket_config *tk); int do_revoke_ticket(struct ticket_config *tk); int find_ticket_by_name(const char *ticket, struct ticket_config **found); +void set_ticket_wakeup(struct ticket_config *tk); + int ticket_answer_list(int fd, struct boothc_ticket_msg *msg); int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg); int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg); int ticket_broadcast_proposed_state(struct ticket_config *tk, cmd_request_t state); int ticket_write(struct ticket_config *tk); void process_tickets(void); void tickets_log_info(void); static inline void ticket_next_cron_at(struct ticket_config *tk, struct timeval when) { tk->next_cron = when; } static inline void ticket_next_cron_in(struct ticket_config *tk, float seconds) { struct timeval tv; gettimeofday(&tv, NULL); tv.tv_sec += trunc(seconds); tv.tv_usec += (seconds - trunc(seconds)) * 1e6; ticket_next_cron_at(tk, tv); } static inline void ticket_activate_timeout(struct ticket_config *tk) { /* TODO: increase timeout when no answers */ ticket_next_cron_in(tk, tk->timeout); tk->retry_number ++; } #endif /* _TICKET_H */