diff --git a/src/paxos.c b/src/paxos.c index 256ef87..f85ac2a 100644 --- a/src/paxos.c +++ b/src/paxos.c @@ -1,392 +1,393 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include "booth.h" #include "transport.h" #include "inline-fn.h" #include "config.h" #include "paxos.h" #include "log.h" static uint32_t next_ballot_number(struct ticket_config *tk) { uint32_t b; /* TODO: getenv() for debugging */ b = tk->new_ballot; /* + unique number */ b += local->bitmask; /* + weight */ b += booth_conf->site_bits * tk->weight[ local->index ]; return b; } static inline void set_proposal_in_ticket(struct ticket_config *tk, struct booth_site *from, uint32_t ballot, struct booth_site *new_owner) { tk->proposer = from; tk->new_ballot = ballot; tk->proposed_owner = new_owner; tk->proposal_expires = 0; // TODO - needed? tk->proposal_acknowledges = from->bitmask | local->bitmask; /* We lose (?) */ tk->state = ST_STABLE; } static inline void change_ticket_owner(struct ticket_config *tk, uint32_t ballot, struct booth_site *new_owner) { int next; /* set "previous" value for next round */ tk->last_ack_ballot = tk->new_ballot = ballot; tk->owner = new_owner; tk->expires = time(NULL) + tk->expiry; tk->proposer = NULL; tk->state = ST_STABLE; if (new_owner == local) { next = tk->expiry / 2; if (tk->timeout * RETRIES/2 < next) next = tk->timeout; ticket_next_cron_in(tk, next); } else ticket_next_cron_in(tk, tk->expiry + tk->acquire_after); log_info("Now actively COMMITTED for \"%s\": new owner %s, ballot %d", tk->name, ticket_owner_string(tk->owner), ballot); ticket_write(tk); } void abort_proposal(struct ticket_config *tk) { tk->proposer = NULL; tk->proposed_owner = tk->owner; tk->retry_number = 0; /* Ask others (repeatedly) until we know the new owner. */ tk->state = ST_INIT; } /** \defgroup msghdl Message handling functions. * * Not all use all arguments; but to keep the interface the same, * they're all just passed everything we have. * * See also enum \ref cmd_request_t. * @{ */ /** Start a PAXOS round, by sending out an OP_PREPARING. */ int paxos_start_round(struct ticket_config *tk, struct booth_site *new_owner) { if (tk->state != ST_STABLE) return RLT_BUSY; /* This may not be done from cron, because the ballot number would simply * get counted up without any benefit. * The message may get retransmitted, though. */ tk->proposer = local; tk->new_ballot = next_ballot_number(tk); tk->proposed_owner = new_owner; tk->retry_number = 0; ticket_activate_timeout(tk); return ticket_broadcast_proposed_state(tk, OP_PREPARING); } /** Answering OP_PREPARING means sending out OP_PROMISING. */ inline static int answer_PREP( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { if (!(local->role & ACCEPTOR)) return 0; /* We have to be careful here. * Getting multiple copies of the same message must not trigger * rejections, but only repeated promises. */ if (from == tk->proposer && ballot == tk->new_ballot) goto promise; /* It doesn't matter whether it's the same or another host; * the only distinction is the ballot number. */ if (ballot > tk->new_ballot) { promise: msg->header.cmd = htonl(OP_PROMISING); msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); set_proposal_in_ticket(tk, from, ballot, new_owner); log_info("PROMISING for ticket \"%s\" (by %s) for %d", tk->name, from->addr_string, ballot); } else { msg->header.cmd = htonl(OP_REJECTED); msg->ticket.ballot = htonl(tk->new_ballot); msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); log_info("REJECTING (prep) for ticket \"%s\" from %s - have %d, wanted %d", tk->name, from->addr_string, tk->new_ballot, ballot); } init_header_bare(&msg->header); return booth_udp_send(from, msg, sizeof(*msg)); } /** Getting OP_REJECTED means abandoning the current operation. */ inline static int answer_REJ( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { log_info("got REJECTED for ticket \"%s\", ballot %d (has %d), from %s", tk->name, tk->new_ballot, ballot, from->addr_string); abort_proposal(tk); /* TODO: should we check whether that sequence is increasing? */ tk->new_ballot = ballot; tk->last_ack_ballot = ntohl(msg->ticket.prev_ballot); /* No need to ask the others. */ tk->state = ST_STABLE; return 0; } /** After a few OP_PROMISING replies we can send out OP_PROPOSING. */ inline static int got_a_PROM( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { if (tk->proposer == local || tk->state == OP_PREPARING) { tk->proposal_acknowledges |= from->bitmask; log_info("Got PROMISE from %s for \"%s\", now %" PRIx64, from->addr_string, tk->name, tk->proposal_acknowledges); /* TODO: only check for count? */ if (promote_ticket_state(tk)) { ticket_activate_timeout(tk); return ticket_broadcast_proposed_state(tk, OP_PROPOSING); } /* Wait for further data */ return 0; } /* Packet just delayed? Silently ignore. */ if (ballot == tk->last_ack_ballot && (new_owner == tk->owner || new_owner == tk->proposed_owner)) return 0; /* Message sent to wrong host? */ log_debug("got unexpected PROMISE from %s for \"%s\"", from->addr_string, tk->name); return 0; } /** Answering OP_PROPOSING means sending out OP_ACCEPTING. */ inline static int answer_PROP( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { if (!(local->role & ACCEPTOR)) return 0; if (from == tk->proposer && ballot == tk->new_ballot) goto accepting; /* We have to be careful here. * Getting multiple copies of the same message must not trigger * rejections, but only repeated OP_ACCEPTING messages. */ if (ballot > tk->last_ack_ballot && ballot == tk->new_ballot && ntohl(msg->ticket.prev_ballot) == tk->last_ack_ballot) { if (tk->proposer) { /* Send OP_REJECTED to previous proposer? */ log_info("new PROPOSAL for ticket \"%s\" overriding older one from %s", tk->name, from->addr_string); } tk->proposer = from; accepting: init_ticket_msg(msg, OP_ACCEPTING, RLT_SUCCESS, tk); - log_info("ACCEPTING for ticket \"%s\" (by %s) for %d - new owner %s", + log_info("sending ACCEPT for ticket \"%s\" (by %s) for %d - new owner %s", tk->name, from->addr_string, ballot, ticket_owner_string(new_owner)); + change_ticket_owner(tk, ballot, new_owner); } else { msg->header.cmd = htonl(OP_REJECTED); msg->ticket.ballot = htonl(tk->new_ballot); msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); log_info("REJECTING (prop) for ticket \"%s\" from %s - have %d, wanted %d", tk->name, from->addr_string, tk->new_ballot, ballot); } init_header_bare(&msg->header); return booth_udp_send(from, msg, sizeof(*msg)); } /** After enough OP_ACCEPTING we can do the change, and send an OP_COMMITTED. */ inline static int got_an_ACC( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { int rv; if (tk->proposer == local && tk->state == OP_PROPOSING) { tk->proposal_acknowledges |= from->bitmask; log_info("Got ACCEPTING from %s for \"%s\", now %" PRIx64, from->addr_string, tk->name, tk->proposal_acknowledges); /* TODO: only check for count? */ if (promote_ticket_state(tk)) { change_ticket_owner(tk, tk->new_ballot, tk->proposed_owner); rv = ticket_broadcast_proposed_state(tk, OP_COMMITTED); return rv; } } return 0; } /** An OP_COMMITTED gets no answer; just record the new state. */ inline static int answer_COMM( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { /* We cannot check whether the packet is from an expected proposer - * perhaps this is the _only_ message of the whole handshake? */ if (ballot > tk->new_ballot && ntohl(msg->ticket.prev_ballot) == tk->last_ack_ballot) { change_ticket_owner(tk, ballot, new_owner); } /* Send ack? */ return 0; } /** @} */ int paxos_answer( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner_p) { int cmd; cmd = ntohl(msg->header.cmd); /* These are in roughly chronological order. * What the first machine sends is an OP_PREPARING * (see paxos_start_round()), which gets received * (below) from the others ... */ switch (cmd) { case OP_PREPARING: return answer_PREP(tk, from, msg, ballot, new_owner_p); case OP_REJECTED: return answer_REJ(tk, from, msg, ballot, new_owner_p); case OP_PROMISING: return got_a_PROM(tk, from, msg, ballot, new_owner_p); case OP_PROPOSING: return answer_PROP(tk, from, msg, ballot, new_owner_p); case OP_ACCEPTING: return got_an_ACC(tk, from, msg, ballot, new_owner_p); case OP_COMMITTED: return answer_COMM(tk, from, msg, ballot, new_owner_p); default: log_error("unprocessed message, cmd %x", cmd); return -EINVAL; } }