diff --git a/src/config.h b/src/config.h index 241e5ac..9530006 100644 --- a/src/config.h +++ b/src/config.h @@ -1,191 +1,191 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _CONFIG_H #define _CONFIG_H #include #include "booth.h" #include "raft.h" #include "transport.h" /** @{ */ /** Definitions for in-RAM data. */ #define MAX_NODES 16 #define TICKET_ALLOC 16 struct ticket_config { /** \name Configuration items. * @{ */ /** Name of ticket. */ boothc_ticket name; /** How many seconds a term lasts (if not refreshed). */ int term_duration; /** Network related timeouts. */ int timeout; /** Retries before giving up. */ int retries; /** If >0, time to wait for a site to get fenced. * The ticket may be acquired after that timespan by * another site. */ int acquire_after; /* TODO: needed? */ /* Program to ask whether it makes sense to * acquire the ticket */ char *ext_verifier; /** Node weights. */ int weight[MAX_NODES]; /** @} */ /** \name Runtime values. * @{ */ /** Current state. */ server_state_e state; /** When something has to be done */ struct timeval next_cron; /** Current leader. This is effectively the log[] in Raft. */ struct booth_site *leader; /** Is the ticket granted? */ int is_granted; /** Timestamp of leadership expiration */ time_t term_expires; /** End of election period */ time_t election_end; struct booth_site *voted_for; /** Who the various sites vote for. * NO_OWNER = no vote yet. */ struct booth_site *votes_for[MAX_NODES]; /* bitmap */ uint64_t votes_received; /** Last voting round that was seen. */ uint32_t current_term; /** Do ticket updates whenever we get enough heartbeats. * But do that only once. * This is reset to 0 whenever we broadcast heartbeat and set * to 1 once enough acks are received. */ uint32_t majority_acks_received; /** @} */ /** */ uint32_t commit_index; /** */ uint32_t last_applied; uint32_t next_index[MAX_NODES]; uint32_t match_index[MAX_NODES]; - uint64_t hb_received; + uint64_t acks_received; time_t hb_sent_at; /** \name Needed while proposals are being done. * @{ */ /** Whom to vote for the next time. * Needed to push a ticket to someone else. */ #if 0 /** Bitmap of sites that acknowledge that state. */ uint64_t proposal_acknowledges; /** When an incompletely acknowledged proposal gets done. * If all peers agree, that happens sooner. * See switch_state_to(). */ struct timeval proposal_switch; /** Timestamp of proposal expiration. */ time_t proposal_expires; #endif /** Number of send retries left. * Used on the new owner. * Starts at 0, counts up. */ int retry_number; /** @} */ }; struct booth_config { char name[BOOTH_NAME_LEN]; transport_layer_t proto; uint16_t port; /** Stores the OR of the individual host bitmasks. */ uint64_t site_bits; char site_user[BOOTH_NAME_LEN]; char site_group[BOOTH_NAME_LEN]; char arb_user[BOOTH_NAME_LEN]; char arb_group[BOOTH_NAME_LEN]; uid_t uid; gid_t gid; int site_count; struct booth_site site[MAX_NODES]; int ticket_count; int ticket_allocated; struct ticket_config *ticket; }; extern struct booth_config *booth_conf; int read_config(const char *path); int check_config(int type); int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type); int find_site_by_id(uint32_t site_id, struct booth_site **node); const char *type_to_string(int type); #include #define R(tk_) printf("## %12s:%3d state %s, %d:%d, " \ "leader %s, exp %s", __FILE__, __LINE__, \ state_to_string(tk_->state), tk_->current_term, \ tk_->commit_index, site_string(tk_->leader), ctime(&tk_->term_expires)) #endif /* _CONFIG_H */ diff --git a/src/handler.c b/src/handler.c index 00c4492..b51fca8 100644 --- a/src/handler.c +++ b/src/handler.c @@ -1,70 +1,70 @@ /* * Copyright (C) 2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include "ticket.h" #include "config.h" #include "inline-fn.h" #include "log.h" #include "pacemaker.h" #include "booth.h" #include "handler.h" /** Runs an external handler. * See eg. 'before-acquire-handler'. * TODO: timeout, async operation?. */ int run_handler(struct ticket_config *tk, const char *cmd, int synchronous) { int rv; char expires[16]; if (!cmd) return 0; assert(synchronous); sprintf(expires, "%" PRId64, (int64_t)(tk->term_expires)); rv = setenv("BOOTH_TICKET", tk->name, 1) || setenv("BOOTH_LOCAL", local->addr_string, 1) || setenv("BOOTH_CONF_NAME", booth_conf->name, 1) || setenv("BOOTH_CONF_PATH", cl.configfile, 1) || setenv("BOOTH_TICKET_EXPIRES", expires, 1); if (rv) { log_error("Cannot set environment: %d", errno); } else { rv = system(cmd); if (rv) - log_error("Error calling \"%s\": %s", + log_warn("handler \"%s\" exited with error %s", cmd, interpret_rv(rv)); else - log_info("Ran \"%s\" successfully.", cmd); + log_info("handler \"%s\" exited with success", cmd); } return rv; } diff --git a/src/inline-fn.h b/src/inline-fn.h index 62f30bc..aa6578b 100644 --- a/src/inline-fn.h +++ b/src/inline-fn.h @@ -1,314 +1,314 @@ /* * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _INLINE_FN_H #define _INLINE_FN_H #include #include #include #include #include "config.h" #include "ticket.h" #include "transport.h" inline static uint32_t get_local_id(void) { return local ? local->site_id : -1; } inline static uint32_t get_node_id(struct booth_site *node) { return node ? node->site_id : NO_ONE; } inline static int term_time_left(const struct ticket_config *tk) { int left; left = tk->term_expires - time(NULL); return (left < 0) ? 0 : left; } /** Returns number of seconds left, if any. */ inline static int leader_and_valid(const struct ticket_config *tk) { if (tk->leader != local) return 0; return term_time_left(tk); } /** Is this some leader? */ inline static int is_owned(const struct ticket_config *tk) { return (tk->leader && tk->leader != no_leader); } static inline void init_header_bare(struct boothc_header *h) { h->magic = htonl(BOOTHC_MAGIC); h->version = htonl(BOOTHC_VERSION); h->from = htonl(local->site_id); h->iv = htonl(0); h->auth1 = htonl(0); h->auth2 = htonl(0); } static inline void init_header(struct boothc_header *h, int cmd, int result, int data_len) { init_header_bare(h); h->length = htonl(data_len); h->cmd = htonl(cmd); h->result = htonl(result); } static inline void init_ticket_site_header(struct boothc_ticket_msg *msg, int cmd) { init_header(&msg->header, cmd, 0, sizeof(*msg)); } static inline void init_ticket_msg(struct boothc_ticket_msg *msg, int cmd, int rv, struct ticket_config *tk) { assert(sizeof(msg->ticket.id) == sizeof(tk->name)); init_header(&msg->header, cmd, rv, sizeof(*msg)); if (!tk) { memset(&msg->ticket, 0, sizeof(msg->ticket)); } else { memcpy(msg->ticket.id, tk->name, sizeof(msg->ticket.id)); msg->ticket.leader = htonl(get_node_id( (tk->leader && tk->leader != no_leader) ? tk->leader : tk->voted_for)); msg->ticket.term = htonl(tk->current_term); msg->ticket.term_valid_for = htonl(term_time_left(tk)); msg->ticket.leader_commit = htonl(tk->commit_index); } } static inline struct booth_transport const *transport(void) { return booth_transport + booth_conf->proto; } static inline const char *site_string(struct booth_site *site) { return site ? site->addr_string : "NONE"; } static inline const char *ticket_leader_string(struct ticket_config *tk) { return site_string(tk->leader); } static inline void disown_ticket(struct ticket_config *tk) { tk->leader = no_leader; tk->is_granted = 0; time(&tk->term_expires); } static inline int disown_if_expired(struct ticket_config *tk) { if (time(NULL) >= tk->term_expires || !tk->leader) { disown_ticket(tk); return 1; } return 0; } /* We allow half of the uint32_t to be used; * half of that below, half of that above the current known "good" value. * 0 UINT32_MAX * |--------------------------+----------------+------------| * | | | * |--------+-------| allowed range * | * current commit index * * So, on overflow it looks like that: * UINT32_MAX 0 * |--------------------------+-----------||---+------------| * | | | * |--------+-------| allowed range * | * current commit index * * This should be possible by using the same datatype and relying * on the under/overflow semantics. * * * Having 30 bits available, and assuming an expire time of * one minute and a (high) commit index step of 64 == 2^6 (because * of weights), we get 2^24 minutes of range - which is ~750 * years. "Should be enough for everybody." */ static inline int index_is_higher_than(uint32_t c_high, uint32_t c_low) { uint32_t diff; if (c_high == c_low) return 0; diff = c_high - c_low; if (diff < UINT32_MAX/4) return 1; diff = c_low - c_high; if (diff < UINT32_MAX/4) return 0; assert(!"commit index out of range - invalid"); } static inline uint32_t index_max2(uint32_t a, uint32_t b) { return index_is_higher_than(a, b) ? a : b; } static inline uint32_t index_max3(uint32_t a, uint32_t b, uint32_t c) { return index_max2( index_max2(a, b), c); } static inline double timeval_to_float(struct timeval tv) { return tv.tv_sec + tv.tv_usec*(double)1.0e-6; } static inline int timeval_msec(struct timeval tv) { int m; m = tv.tv_usec / 1000; if (m >= 1000) m = 999; return m; } static inline int timeval_compare(struct timeval tv1, struct timeval tv2) { if (tv1.tv_sec < tv2.tv_sec) return -1; if (tv1.tv_sec > tv2.tv_sec) return +1; if (tv1.tv_usec < tv2.tv_usec) return -1; if (tv1.tv_usec > tv2.tv_usec) return +1; return 0; } static inline int timeval_in_past(struct timeval which) { struct timeval tv; gettimeofday(&tv, NULL); return timeval_compare(tv, which) > 0; } static inline time_t next_vote_starts_at(struct ticket_config *tk) { time_t half_exp, retries_needed; /* If not owner, don't renew. */ if (tk->leader != local) return 0; /* Try to renew at half of expiry time. */ half_exp = tk->term_expires - tk->term_duration/2; /* Also start renewal if we couldn't get * a few message retransmission in the alloted * expiry time. */ retries_needed = tk->term_expires - tk->timeout * tk->retries/2; /* Return earlier timestamp. */ return half_exp < retries_needed ? half_exp : retries_needed; } static inline int should_start_renewal(struct ticket_config *tk) { time_t now, when; when = next_vote_starts_at(tk); if (!when) return 0; time(&now); return when <= now; } static inline int send_heartbeat(struct ticket_config *tk) { - tk->hb_received = local->bitmask; + tk->acks_received = local->bitmask; tk->hb_sent_at = time(NULL); tk->majority_acks_received = 0; return ticket_broadcast(tk, OP_HEARTBEAT, RLT_SUCCESS); } static inline struct booth_site *my_vote(struct ticket_config *tk) { return tk->votes_for[ local->index ]; } static inline int count_bits(uint64_t val) { return __builtin_popcount(val); } static inline int majority_of_bits(struct ticket_config *tk, uint64_t val) { /* Use ">" to get majority decision, even for an even number * of participants. */ return count_bits(val) * 2 > booth_conf->site_count; } #endif diff --git a/src/pacemaker.c b/src/pacemaker.c index d986747..1f12cf1 100644 --- a/src/pacemaker.c +++ b/src/pacemaker.c @@ -1,329 +1,329 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include "log.h" #include "pacemaker.h" #include "inline-fn.h" enum atomic_ticket_supported { YES=0, NO, FILENOTFOUND, /* Ie. UNKNOWN */ UNKNOWN = FILENOTFOUND, }; /* http://thedailywtf.com/Articles/What_Is_Truth_0x3f_.aspx */ enum atomic_ticket_supported atomicity = UNKNOWN; #define COMMAND_MAX 1024 /** Determines whether the installed crm_ticket can do atomic ticket grants, * _including_ multiple attribute changes. * * See * https://bugzilla.novell.com/show_bug.cgi?id=855099 * * Run "crm_ticket" without "--force"; * - the old version asks for "Y/N" via STDIN, and returns 0 * when reading "no"; * - the new version just reports an error without asking. */ static void test_atomicity(void) { int rv; if (atomicity != UNKNOWN) return; rv = system("echo n | crm_ticket -g -t any-ticket-name > /dev/null 2> /dev/null"); if (rv == -1) { log_error("Cannot run \"crm_ticket\"!"); /* BIG problem. Abort. */ exit(1); } if (WIFSIGNALED(rv)) { log_error("\"crm_ticket\" terminated by a signal!"); /* Problem. Abort. */ exit(1); } switch (WEXITSTATUS(rv)) { case 0: atomicity = NO; log_info("Old \"crm_ticket\" found, using non-atomic ticket updates."); break; case 1: atomicity = YES; log_info("New \"crm_ticket\" found, using atomic ticket updates."); break; default: log_error("Unexpected return value from \"crm_ticket\" (%d), " "falling back to non-atomic ticket updates.", rv); atomicity = NO; } assert(atomicity == YES || atomicity == NO); } const char * interpret_rv(int rv) { static char text[64]; int p; if (rv == 0) return "0"; p = sprintf(text, "rv %d", WEXITSTATUS(rv)); if (WIFSIGNALED(rv)) sprintf(text + p, " signal %d", WTERMSIG(rv)); return text; } static int pcmk_write_ticket_atomic(struct ticket_config *tk, int grant) { char cmd[COMMAND_MAX]; int rv; /* The values are appended to "-v", so that NO_ONE * (which is -1) isn't seen as another option. */ snprintf(cmd, COMMAND_MAX, "crm_ticket -t '%s' " "%s --force " "-S owner -v%" PRIi32 " " "-S expires -v%" PRIi64 " " "-S term -v%" PRIi64, tk->name, (grant > 0 ? "-g" : grant < 0 ? "-r" : ""), (int32_t)get_node_id(tk->leader), (int64_t)tk->term_expires, (int64_t)tk->current_term); rv = system(cmd); - log_info("command: '%s' was executed", cmd); + log_debug("command: '%s' was executed", cmd); if (rv != 0) log_error("error: \"%s\" failed, %s", cmd, interpret_rv(rv)); return rv; } static int pcmk_store_ticket_nonatomic(struct ticket_config *tk); static int pcmk_grant_ticket(struct ticket_config *tk) { char cmd[COMMAND_MAX]; int rv; test_atomicity(); if (atomicity == YES) return pcmk_write_ticket_atomic(tk, +1); rv = pcmk_store_ticket_nonatomic(tk); if (rv) return rv; snprintf(cmd, COMMAND_MAX, "crm_ticket -t %s -g --force", tk->name); - log_info("command: '%s' was executed", cmd); + log_debug("command: '%s' was executed", cmd); rv = system(cmd); if (rv != 0) log_error("error: \"%s\" failed, %s", cmd, interpret_rv(rv)); return rv; } static int pcmk_revoke_ticket(struct ticket_config *tk) { char cmd[COMMAND_MAX]; int rv; test_atomicity(); if (atomicity == YES) return pcmk_write_ticket_atomic(tk, -1); rv = pcmk_store_ticket_nonatomic(tk); if (rv) return rv; snprintf(cmd, COMMAND_MAX, "crm_ticket -t %s -r --force", tk->name); - log_info("command: '%s' was executed", cmd); + log_debug("command: '%s' was executed", cmd); rv = system(cmd); if (rv != 0) log_error("error: \"%s\" failed, %s", cmd, interpret_rv(rv)); return rv; } static int crm_ticket_set(const struct ticket_config *tk, const char *attr, int64_t val) { char cmd[COMMAND_MAX]; int i, rv; snprintf(cmd, COMMAND_MAX, "crm_ticket -t '%s' -S '%s' -v %" PRIi64, tk->name, attr, val); /* If there are errors, there's not much we can do but retry ... */ for (i=0; i<3 && (rv = system(cmd)); i++) ; log_debug("'%s' gave result %s", cmd, interpret_rv(rv)); return rv; } static int pcmk_store_ticket_nonatomic(struct ticket_config *tk) { int rv; /* Always try to store *each* attribute, even if there's an error * for one of them. */ rv = crm_ticket_set(tk, "owner", (int32_t)get_node_id(tk->leader)); rv = crm_ticket_set(tk, "expires", tk->term_expires) || rv; rv = crm_ticket_set(tk, "term", tk->current_term) || rv; if (rv) log_error("setting crm_ticket attributes failed; %s", interpret_rv(rv)); else log_info("setting crm_ticket attributes successful"); return rv; } static int crm_ticket_get(struct ticket_config *tk, const char *attr, int64_t *data) { char cmd[COMMAND_MAX]; char line[256]; int rv; int64_t v; FILE *p; *data = -1; v = 0; snprintf(cmd, COMMAND_MAX, "crm_ticket -t '%s' -G '%s' --quiet", tk->name, attr); p = popen(cmd, "r"); if (p == NULL) { rv = errno; log_error("popen error %d (%s) for \"%s\"", rv, strerror(rv), cmd); return rv || -EINVAL; } if (fgets(line, sizeof(line) - 1, p) == NULL) { rv = ENODATA; goto out; } rv = EINVAL; if (!strncmp(line, "false", 5)) { v = 0; rv = 0; } else if (!strncmp(line, "true", 4)) { v = 1; rv = 0; } else if (sscanf(line, "%" PRIi64, &v) == 1) { rv = 0; } *data = v; out: rv = pclose(p); log_debug("command \"%s\" returned %s, value %" PRIi64, cmd, interpret_rv(rv), v); return rv; } static int pcmk_load_ticket(struct ticket_config *tk) { int rv; int64_t v; /* This here gets run during startup; testing that here means that * normal operation won't be interrupted with that test. */ test_atomicity(); rv = crm_ticket_get(tk, "expires", &v); if (!rv) { tk->term_expires = v; } rv = crm_ticket_get(tk, "term", &v); if (!rv) { tk->current_term = v; } rv = crm_ticket_get(tk, "owner", &v); if (!rv) { /* No check, node could have been deconfigured. */ find_site_by_id(v, &tk->leader); } rv = crm_ticket_get(tk, "granted", &v); if (!rv) { tk->is_granted = v; } return rv; } struct ticket_handler pcmk_handler = { .grant_ticket = pcmk_grant_ticket, .revoke_ticket = pcmk_revoke_ticket, .load_ticket = pcmk_load_ticket, }; diff --git a/src/raft.c b/src/raft.c index a98f4ad..a267d54 100644 --- a/src/raft.c +++ b/src/raft.c @@ -1,772 +1,776 @@ /* * Copyright (C) 2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include "booth.h" #include "transport.h" #include "inline-fn.h" #include "config.h" #include "raft.h" #include "ticket.h" #include "log.h" inline static void clear_election(struct ticket_config *tk) { int i; struct booth_site *site; log_debug("clear election"); tk->votes_received = 0; foreach_node(i, site) tk->votes_for[site->index] = NULL; } inline static void record_vote(struct ticket_config *tk, struct booth_site *who, struct booth_site *vote) { - log_info("site \"%s\" votes for \"%s\"", + log_debug("site %s votes for %s", site_string(who), site_string(vote)); if (!tk->votes_for[who->index]) { tk->votes_for[who->index] = vote; tk->votes_received |= who->bitmask; } else { if (tk->votes_for[who->index] != vote) - log_error("voted previously (but in same term!) for \"%s\"...", - tk->votes_for[who->index]->addr_string); + log_warn("voted previously (but in same term!) for %s...", + site_string(tk->votes_for[who->index])); } } static int cmp_msg_ticket(struct ticket_config *tk, struct boothc_ticket_msg *msg) { if (tk->current_term != ntohl(msg->ticket.term)) { return tk->current_term - ntohl(msg->ticket.term); } return tk->commit_index - ntohl(msg->ticket.leader_commit); } static void update_term_from_msg(struct ticket_config *tk, struct boothc_ticket_msg *msg) { uint32_t i; i = ntohl(msg->ticket.term); /* if we failed to start the election, then accept the term * from the leader * */ if (tk->state == ST_CANDIDATE) { tk->current_term = i; } else { tk->current_term = max(i, tk->current_term); } /* § 5.3 */ i = ntohl(msg->ticket.leader_commit); tk->commit_index = max(i, tk->commit_index); } static void update_ticket_from_msg(struct ticket_config *tk, struct boothc_ticket_msg *msg) { int duration; duration = tk->term_duration; if (msg) duration = min(duration, ntohl(msg->ticket.term_valid_for)); tk->term_expires = time(NULL) + duration; if (msg) { update_term_from_msg(tk, msg); } } static void become_follower(struct ticket_config *tk, struct boothc_ticket_msg *msg) { update_ticket_from_msg(tk, msg); tk->state = ST_FOLLOWER; } struct booth_site *majority_votes(struct ticket_config *tk) { int i, n; struct booth_site *v; int count[MAX_NODES] = { 0, }; for(i=0; isite_count; i++) { v = tk->votes_for[i]; if (!v) continue; n = v->index; count[n]++; - log_info("Majority: %d \"%s\" wants %d \"%s\" => %d", - i, booth_conf->site[i].addr_string, - n, v->addr_string, + log_debug("Majority: %d %s wants %d %s => %d", + i, site_string(&booth_conf->site[i]), + n, site_string(v), count[n]); if (count[n]*2 <= booth_conf->site_count) continue; - log_info("Majority reached: %d of %d for \"%s\"", + log_debug("Majority reached: %d of %d for %s", count[n], booth_conf->site_count, - v->addr_string); + site_string(v)); return v; } return NULL; } static int all_voted(struct ticket_config *tk) { int i, cnt = 0; for(i=0; isite_count; i++) { if (tk->votes_for[i]) { cnt++; } } return (cnt == booth_conf->site_count); } static int newer_term(struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg, int in_election) { uint32_t term; term = ntohl(msg->ticket.term); /* §5.1 */ if (term > tk->current_term) { tk->state = ST_FOLLOWER; if (!in_election) { tk->leader = leader; - log_debug("from %s: higher term %d vs. %d, following \"%s\"", - sender->addr_string, + log_debug("from %s: higher term %d vs. %d, following %s", + site_string(sender), term, tk->current_term, ticket_leader_string(tk)); } else { tk->leader = no_leader; log_debug("from %s: higher term %d vs. %d (election)", - sender->addr_string, + site_string(sender), term, tk->current_term); } tk->term_expires = time(NULL) + tk->term_duration; tk->current_term = term; return 1; } return 0; } static int term_too_low(struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg) { uint32_t term; term = ntohl(msg->ticket.term); /* §5.1 */ - if (term < tk->current_term) - { - log_info("sending REJECT, term too low."); + if (term < tk->current_term) { + log_info("sending reject to %s, its term too low " + "(%d vs. %d)", site_string(leader), + term, tk->current_term + ); send_reject(sender, tk, RLT_TERM_OUTDATED); return 1; } return 0; } /* For follower. */ static int answer_HEARTBEAT ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; struct boothc_ticket_msg omsg; term = ntohl(msg->ticket.term); log_debug("leader: %s, have %s; term %d vs %d", site_string(leader), ticket_leader_string(tk), term, tk->current_term); + /* if we're candidate, it may be that we got a heartbeat from + * a legitimate leader, so don't ignore a lower term + */ if (tk->state != ST_CANDIDATE && term < tk->current_term) { - log_info("ignoring lower term %d vs. %d, from \"%s\"", + log_info("ignoring lower term %d vs. %d, from %s", term, tk->current_term, ticket_leader_string(tk)); return 0; } /* Needed? */ newer_term(tk, sender, leader, msg, 0); become_follower(tk, msg); /* Racy??? */ assert(sender == leader || !leader); tk->leader = leader; /* Ack the heartbeat (we comply). */ init_ticket_msg(&omsg, OP_HEARTBEAT, RLT_SUCCESS, tk); return booth_udp_send(sender, &omsg, sizeof(omsg)); } static int process_UPDATE ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; term = ntohl(msg->ticket.term); log_debug("leader: %s, have %s; term %d vs %d", site_string(leader), ticket_leader_string(tk), term, tk->current_term); /* No reject. (?) */ if (term < tk->current_term) { - log_warn("ignoring lower term %d vs. %d, from \"%s\"", + log_info("ignoring lower term %d vs. %d, from %s", term, tk->current_term, ticket_leader_string(tk)); return 0; } update_ticket_from_msg(tk, msg); ticket_write(tk); /* run ticket_cron if the ticket expires */ set_ticket_wakeup(tk); return 0; } static int process_REVOKE ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { if (tk->leader != sender) { log_error("from %s: non-leader wants to revoke ticket %s (ignoring)", - sender->addr_string, tk->name); + site_string(sender), tk->name); return 1; } else if (tk->state != ST_FOLLOWER) { log_error("from %s: unexpected ticket %s revoke in state %s (ignoring)", - sender->addr_string, + site_string(sender), state_to_string(tk->state), tk->name); return 1; } else { log_info("from %s: leader revokes ticket %s", - sender->addr_string, tk->name); + site_string(sender), tk->name); reset_ticket(tk); ticket_write(tk); } return 0; } /* update the ticket on the leader, write it to the CIB, and send out the update message to others with the new expiry time */ static int leader_update_ticket(struct ticket_config *tk) { struct boothc_ticket_msg msg; tk->term_expires = time(NULL) + tk->term_duration; tk->retry_number = 0; ticket_write(tk); set_ticket_wakeup(tk); init_ticket_msg(&msg, OP_UPDATE, RLT_SUCCESS, tk); return transport()->broadcast(&msg, sizeof(msg)); } /* For leader. */ static int process_HEARTBEAT( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; + term = ntohl(msg->ticket.term); if (newer_term(tk, sender, leader, msg, 0)) { - /* Uh oh. Higher term?? Should we simply believe that? */ - log_error("Got higher term number from"); + /* unexpected higher term */ + log_warn("got higher term from %s (%d vs. %d)", + site_string(sender), + term, tk->current_term); return 0; } - - term = ntohl(msg->ticket.term); - /* Don't send a reject. */ if (term < tk->current_term) { /* Doesn't know what he's talking about - perhaps * doesn't receive our packets? */ - log_error("Stale/wrong heartbeat from \"%s\": " - "term %d instead of %d", + log_warn("from %s: unexpected " + "term %d instead of %d (ignoring)", site_string(sender), term, tk->current_term); return 0; } if (term == tk->current_term && leader == tk->leader) { - /* Hooray, an ACK! */ - /* So at least _someone_ is listening. */ - tk->hb_received |= sender->bitmask; + /* got an ack! */ + tk->acks_received |= sender->bitmask; - log_debug("Got heartbeat ACK from \"%s\", %d/%d agree.", + log_debug("got heartbeat ACK from %s, %d/%d agree.", site_string(sender), - count_bits(tk->hb_received), + count_bits(tk->acks_received), booth_conf->site_count); - if (majority_of_bits(tk, tk->hb_received)) { + if (majority_of_bits(tk, tk->acks_received)) { /* OK, at least half of the nodes are reachable; * Update the ticket and send update messages out */ if( !tk->majority_acks_received ) { /* Write the ticket to the CIB and set the next * wakeup time (but do that only once) */ tk->majority_acks_received = 1; return leader_update_ticket(tk); } } } return 0; } void leader_elected( struct ticket_config *tk, struct booth_site *new_leader ) { if (new_leader) { tk->leader = new_leader; tk->term_expires = time(NULL) + tk->term_duration; tk->election_end = 0; tk->voted_for = NULL; if (new_leader == local) { tk->commit_index++; tk->state = ST_LEADER; send_heartbeat(tk); } else become_follower(tk, NULL); } } static int process_VOTE_FOR( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { if (term_too_low(tk, sender, leader, msg)) return 0; if (newer_term(tk, sender, leader, msg, 0)) { clear_election(tk); } record_vote(tk, sender, leader); if (tk->state != ST_CANDIDATE) { /* lost candidate status, somebody rejected our proposal */ return 0; } /* only if all voted can we take the ticket now, otherwise * wait for timeout in ticket_cron */ if (all_voted(tk)) { /* §5.2 */ leader_elected(tk, majority_votes(tk)); set_ticket_wakeup(tk); } return 0; } static int process_REJECTED( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t rv; rv = ntohl(msg->header.result); if (tk->state == ST_CANDIDATE && rv == RLT_TERM_OUTDATED) { log_warn("from %s: ticket %s outdated (term %d), following %s", site_string(sender), tk->name, ntohl(msg->ticket.term), site_string(leader) ); tk->leader = leader; become_follower(tk, msg); return 0; } if (tk->state == ST_CANDIDATE && rv == RLT_TERM_STILL_VALID) { - log_warn("from %s: there's a leader that I don't see: \"%s\"", + log_warn("from %s: there's a leader I didn't see: %s, following", site_string(sender), site_string(leader)); tk->leader = leader; become_follower(tk, msg); return 0; } log_warn("from %s: in state %s, got %s (unexpected reject)", site_string(sender), state_to_string(tk->state), state_to_string(rv)); tk->leader = leader; become_follower(tk, msg); return 0; } static int send_ticket ( int cmd, struct ticket_config *tk, struct booth_site *to_site ) { struct boothc_ticket_msg omsg; init_ticket_msg(&omsg, cmd, RLT_SUCCESS, tk); return booth_udp_send(to_site, &omsg, sizeof(omsg)); } /* §5.2 */ static int answer_REQ_VOTE( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; int valid; struct boothc_ticket_msg omsg; term = ntohl(msg->ticket.term); /* Important: Ignore duplicated packets! */ valid = term_time_left(tk); if (valid && term == tk->current_term && sender == tk->leader) { log_debug("Duplicate OP_VOTE_FOR ignored."); return 0; } if (valid) { - log_info("no election allowed, term valid for %d??", valid); + log_warn("no election allowed for %s, term still valid for %d", + tk->name, valid); return send_reject(sender, tk, RLT_TERM_STILL_VALID); } if (term_too_low(tk, sender, leader, msg)) return 0; + /* if it's a newer term or ... */ if (newer_term(tk, sender, leader, msg, 1)) { clear_election(tk); goto vote_for_sender; } + /* ... we didn't vote yet, then vote for the sender */ /* §5.2, §5.4 */ if (!tk->voted_for) { vote_for_sender: tk->voted_for = sender; record_vote(tk, sender, leader); - goto yes_you_can; } -yes_you_can: init_ticket_msg(&omsg, OP_VOTE_FOR, RLT_SUCCESS, tk); omsg.ticket.leader = htonl(get_node_id(tk->voted_for)); return booth_udp_send(sender, &omsg, sizeof(omsg)); } int new_election(struct ticket_config *tk, struct booth_site *preference, int update_term) { struct booth_site *new_leader; time_t now; time(&now); log_debug("start new election?, now=%" PRIi64 ", end %" PRIi64, (int64_t)now, (int64_t)(tk->election_end)); if (now <= tk->election_end) return 0; /* §5.2 */ /* If there was _no_ answer, don't keep incrementing the term number * indefinitely. If there was no peer, there'll probably be no one * listening now either. However, we don't know if we were * invoked due to a timeout (caller does). */ if (update_term) tk->current_term++; tk->term_expires = 0; tk->election_end = now + tk->term_duration; log_debug("start new election! term=%d, until %" PRIi64, tk->current_term, (int64_t)tk->election_end); clear_election(tk); if(preference) new_leader = preference; else new_leader = (local->type == SITE) ? local : NULL; record_vote(tk, local, new_leader); tk->voted_for = new_leader; tk->state = ST_CANDIDATE; ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS); ticket_activate_timeout(tk); return 0; } /* we were a leader and somebody says that they have a more up * to date ticket * there was probably connectivity loss * tricky */ static int leader_handle_newer_ticket( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { if (leader == no_leader || !leader || leader == local) { /* at least nobody else owns the ticket */ /* it is not kosher to update from their copy, but since * they don't own the ticket, nothing bad can happen */ update_term_from_msg(tk, msg); /* get the ticket again, if we can */ return acquire_ticket(tk); } /* eek, two leaders, split brain */ /* normally shouldn't happen; run election */ log_error("from %s: ticket %s at %s! (disowning ticket)", site_string(sender), tk->name, site_string(leader) ); disown_ticket(tk); ticket_write(tk); log_error("Two ticket owners! Possible bug. Please report at https://github.com/ClusterLabs/booth/issues/new."); return acquire_ticket(tk); } /* reply to STATUS */ static int process_MY_INDEX ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int i; if (!msg->ticket.term_valid_for) { /* ticket not valid */ return 0; } i = cmp_msg_ticket(tk, msg); if (i > 0) { /* let them know about our newer ticket */ send_ticket(OP_MY_INDEX, tk, sender); if (tk->state == ST_LEADER) return send_ticket(OP_UPDATE, tk, sender); } if (i == 0) { return 0; } /* they have a newer ticket, trouble if we're already leader * for it */ if (tk->state == ST_LEADER) { log_warn("from %s: more uptodate ticket %s at %s", site_string(sender), tk->name, site_string(leader) ); return leader_handle_newer_ticket(tk, sender, leader, msg); } update_ticket_from_msg(tk, msg); if (leader == local) { /* if we were the leader but we rebooted in the * meantime; try to get the ticket again */ return acquire_ticket(tk); } else { tk->leader = leader; tk->state = (!leader || leader == no_leader) ? ST_INIT : ST_FOLLOWER; } return 0; } int raft_answer( struct ticket_config *tk, struct booth_site *from, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int cmd; int rv; rv = 0; cmd = ntohl(msg->header.cmd); R(tk); - log_debug("got message %s from \"%s\"", + log_debug("got message %s from %s", state_to_string(cmd), - from->addr_string); + site_string(from)); switch (cmd) { case OP_REQ_VOTE: rv = answer_REQ_VOTE(tk, from, leader, msg); break; case OP_VOTE_FOR: rv = process_VOTE_FOR(tk, from, leader, msg); break; case OP_HEARTBEAT: if (tk->leader == local && tk->state == ST_LEADER) rv = process_HEARTBEAT(tk, from, leader, msg); else if (tk->leader != local && (tk->state == ST_FOLLOWER || tk->state == ST_CANDIDATE)) rv = answer_HEARTBEAT(tk, from, leader, msg); else { - log_error("unexpected message, cmd %s, from %s", + log_warn("unexpected message %s, from %s", state_to_string(cmd), - from->addr_string); + site_string(from)); rv = -EINVAL; } break; case OP_UPDATE: if (tk->leader != local && tk->state == ST_FOLLOWER) { rv = process_UPDATE(tk, from, leader, msg); } else { - log_error("unexpected message, cmd %s, from %s", + log_warn("unexpected message %s, from %s", state_to_string(cmd), - from->addr_string); + site_string(from)); rv = -EINVAL; } break; case OP_REJECTED: rv = process_REJECTED(tk, from, leader, msg); break; case OP_REVOKE: rv = process_REVOKE(tk, from, leader, msg); break; case OP_MY_INDEX: rv = process_MY_INDEX(tk, from, leader, msg); break; case OP_STATUS: rv = send_ticket(OP_MY_INDEX, tk, from); break; default: - log_error("unprocessed message, cmd %s, from %s", - state_to_string(cmd), - from->addr_string); + log_error("unknown message %s, from %s", + state_to_string(cmd), site_string(from)); rv = -EINVAL; } R(tk); return rv; } diff --git a/src/ticket.c b/src/ticket.c index ea631de..18efa3e 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -1,665 +1,661 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include "ticket.h" #include "config.h" #include "pacemaker.h" #include "inline-fn.h" #include "log.h" #include "booth.h" #include "raft.h" #include "handler.h" #define TK_LINE 256 /* Untrusted input, must fit (incl. \0) in a buffer of max chars. */ int check_max_len_valid(const char *s, int max) { int i; for(i=0; iticket_count; i++) { if (!strcmp(booth_conf->ticket[i].name, ticket)) { if (found) *found = booth_conf->ticket + i; return 1; } } return 0; } int check_ticket(char *ticket, struct ticket_config **found) { if (found) *found = NULL; if (!booth_conf) return 0; if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name))) return 0; return find_ticket_by_name(ticket, found); } int check_site(char *site, int *is_local) { struct booth_site *node; if (!check_max_len_valid(site, sizeof(node->addr_string))) return 0; if (find_site_by_name(site, &node, 0)) { *is_local = node->local; return 1; } return 0; } #if 0 /** Find out what others think about this ticket. * * If we're a SITE, we can ask (and have to tell) Pacemaker. * An ARBITRATOR can only ask others. */ static int ticket_send_catchup(struct ticket_config *tk) { int i, rv = 0; struct booth_site *site; struct boothc_ticket_msg msg; foreach_node(i, site) { if (!site->local) { init_ticket_msg(&msg, CMD_CATCHUP, RLT_SUCCESS, tk); log_debug("attempting catchup from %s", site->addr_string); rv = booth_udp_send(site, &msg, sizeof(msg)); } } ticket_activate_timeout(tk); return rv; } #endif int ticket_write(struct ticket_config *tk) { if (local->type != SITE) return -EINVAL; disown_if_expired(tk); if (tk->leader == local) { pcmk_handler.grant_ticket(tk); } else { pcmk_handler.revoke_ticket(tk); } return 0; } /* Ask an external program whether getting the ticket * makes sense. * Eg. if the services have a failcount of INFINITY, * we can't serve here anyway. */ int acquire_ticket(struct ticket_config *tk) { int rv; if (!tk->ext_verifier) goto get_it; rv = run_handler(tk, tk->ext_verifier, 1); if (rv) { - log_error("May not acquire ticket."); + log_warn("we are not allowed to acquire ticket %s", + tk->name); /* Give it to somebody else. * Just send a commit message, as the * others couldn't help anyway. */ if (leader_and_valid(tk)) { disown_ticket(tk); #if 0 tk->proposed_owner = NULL; /* Just go one further - others may easily override. */ tk->new_ballot++; ticket_broadcast_proposed_state(tk, OP_COMMITTED); tk->state = ST_STABLE; #endif ticket_broadcast(tk, OP_VOTE_FOR, RLT_SUCCESS); } return rv; } get_it: rv = new_election(tk, local, 1); return rv; } /** Try to get the ticket for the local site. * */ int do_grant_ticket(struct ticket_config *tk) { int rv; if (tk->leader == local) return RLT_SUCCESS; if (is_owned(tk)) return RLT_OVERGRANT; rv = acquire_ticket(tk); return rv; } /** Ticket revoke. * Only to be started from the leader. */ int do_revoke_ticket(struct ticket_config *tk) { log_info("revoking ticket %s", tk->name); reset_ticket(tk); ticket_write(tk); return ticket_broadcast(tk, OP_REVOKE, RLT_SUCCESS); } int list_ticket(char **pdata, unsigned int *len) { struct ticket_config *tk; char timeout_str[64]; char *data, *cp; int i, alloc; *pdata = NULL; *len = 0; alloc = 256 + booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128); data = malloc(alloc); if (!data) return -ENOMEM; cp = data; foreach_ticket(i, tk) { if (tk->term_expires != 0) strftime(timeout_str, sizeof(timeout_str), "%F %T", localtime(&tk->term_expires)); else strcpy(timeout_str, "INF"); cp += sprintf(cp, "ticket: %s, leader: %s, expires: %s, commit: %d\n", tk->name, ticket_leader_string(tk), timeout_str, tk->commit_index); *len = cp - data; assert(*len < alloc); } *pdata = data; return 0; } void reset_ticket(struct ticket_config *tk) { disown_ticket(tk); tk->current_term = 0; tk->commit_index = 0; tk->state = ST_INIT; } int setup_ticket(void) { struct ticket_config *tk; int i; foreach_ticket(i, tk) { reset_ticket(tk); if (local->type == SITE) { pcmk_handler.load_ticket(tk); if (time(NULL) >= tk->term_expires) { reset_ticket(tk); ticket_write(tk); } } /* we'll start election if the ticket seems to be * uptodate */ if (tk->is_granted && tk->leader == local) { tk->state = ST_FOLLOWER; } else { /* otherwise, query status */ ticket_broadcast(tk, OP_STATUS, RLT_SUCCESS); } } return 0; } int ticket_answer_list(int fd, struct boothc_ticket_msg *msg) { char *data; int olen, rv; struct boothc_header hdr; rv = list_ticket(&data, &olen); if (rv < 0) return rv; init_header(&hdr, CMR_LIST, RLT_SUCCESS, sizeof(hdr) + olen); return send_header_plus(fd, &hdr, data, olen); } int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { - log_error("Client asked to grant unknown ticket"); + log_warn("client asked to grant unknown ticket %s", + msg->ticket.id); rv = RLT_INVALID_ARG; goto reply; } if (is_owned(tk)) { - log_error("client wants to get an (already granted!) ticket \"%s\"", + log_warn("client wants to grant an (already granted!) ticket %s", msg->ticket.id); rv = RLT_OVERGRANT; goto reply; } rv = do_grant_ticket(tk); reply: init_header(&msg->header, CMR_GRANT, rv ?: RLT_ASYNC, sizeof(*msg)); return send_ticket_msg(fd, msg); } int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { - log_error("Client wants to revoke an unknown ticket %s", + log_warn("client wants to revoke an unknown ticket %s", msg->ticket.id); rv = RLT_INVALID_ARG; goto reply; } if (!is_owned(tk)) { - log_info("client wants to revoke a free ticket \"%s\"", + log_info("client wants to revoke a free ticket %s", msg->ticket.id); /* Return a different result code? */ rv = RLT_SUCCESS; goto reply; } if (tk->leader != local) { - log_info("we do not own the ticket \"%s\", " + log_info("we do not own the ticket %s, " "redirect to leader %s", msg->ticket.id, ticket_leader_string(tk)); - /* Return a different result code? */ rv = RLT_REDIRECT; goto reply; } rv = do_revoke_ticket(tk); if (rv == 0) rv = RLT_ASYNC; reply: init_ticket_msg(msg, CMR_REVOKE, rv, tk); return send_ticket_msg(fd, msg); } int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_result_t res) { struct boothc_ticket_msg msg; init_ticket_msg(&msg, cmd, res, tk); - log_debug("broadcasting '%s' for ticket \"%s\" (term=%d, valid=%d)", + log_debug("broadcasting '%s' for ticket %s (term=%d, valid=%d)", state_to_string(cmd), tk->name, ntohl(msg.ticket.term), ntohl(msg.ticket.term_valid_for)); return transport()->broadcast(&msg, sizeof(msg)); } +static void ticket_lost(struct ticket_config *tk) +{ + log_warn("LOST ticket: %s no longer at %s", + tk->name, + ticket_leader_string(tk)); + + /* Couldn't renew in time - ticket lost. */ + disown_ticket(tk); + + /* New vote round; §5.2 */ + if (local->type == SITE) { + new_election(tk, NULL, 1); + } + + ticket_write(tk); +} + + static void ticket_cron(struct ticket_config *tk) { time_t now; - int rv; int vote_cnt; struct booth_site *new_leader; now = time(NULL); R(tk); /* Has an owner, has an expiry date, and expiry date in the past? * Losing the ticket must happen in _every_ state. */ if (tk->term_expires && is_owned(tk) && now >= tk->term_expires) { - log_info("LOST ticket: \"%s\" no longer at %s", - tk->name, - ticket_leader_string(tk)); - - /* Couldn't renew in time - ticket lost. */ - disown_ticket(tk); - - /* New vote round; §5.2 */ - if (local->type == SITE) { - new_election(tk, NULL, 1); - } - - ticket_write(tk); - - /* May not try to re-acquire now, need to find out - * what others think. */ + ticket_lost(tk); return; } - R(tk); switch(tk->state) { case ST_INIT: /* init state, nothing to do */ break; case ST_FOLLOWER: /* in case we got restarted and this ticket belongs to * us */ if (tk->is_granted && tk->leader == local) { acquire_ticket(tk); } break; case ST_CANDIDATE: /* §5.2 */ /* not everybody answered, but if we have majority... */ new_leader = majority_votes(tk); if (new_leader) { leader_elected(tk, new_leader); set_ticket_wakeup(tk); } else if (now > tk->election_end) { /* This is previous election timed out */ new_election(tk, NULL, 0); } break; case ST_LEADER: /* we get here after we broadcasted a heartbeat; * by this time all sites should've acked the heartbeat */ - vote_cnt = count_bits(tk->hb_received) - 1; + vote_cnt = count_bits(tk->acks_received) - 1; if ((vote_cnt+1) < booth_conf->site_count) { - if (!majority_of_bits(tk, tk->hb_received)) { + if (!majority_of_bits(tk, tk->acks_received)) { tk->retry_number ++; if (!vote_cnt) { log_warn("no answers to heartbeat for ticket %s on try #%d, " - "we are most probably alone!", + "we are alone", tk->name, tk->retry_number); } else { log_warn("not enough answers to heartbeat for ticket %s on try #%d: " - "only got %d answers (mask 0x%" PRIx64 ")!", + "only got %d answers", tk->name, tk->retry_number, - vote_cnt, - tk->hb_received); + vote_cnt); } /* Don't give up, though - there's still some time until leadership is lost. */ } } - rv = run_handler(tk, tk->ext_verifier, 1); - if (rv) { - tk->state = ST_FOLLOWER; - disown_ticket(tk); - // resp. no owner anymore, new takers? - ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS); - ticket_write(tk); - } else { - send_heartbeat(tk); + send_heartbeat(tk); + if (tk->retry_number < tk->retries) { ticket_activate_timeout(tk); + } else { + set_ticket_wakeup(tk); } break; default: break; } R(tk); } void process_tickets(void) { struct ticket_config *tk; int i; struct timeval now; float sec_until; gettimeofday(&now, NULL); foreach_ticket(i, tk) { sec_until = timeval_to_float(tk->next_cron) - timeval_to_float(now); if (0) log_debug("ticket %s next cron %" PRIx64 ".%03d, " "now %" PRIx64 "%03d, in %f", tk->name, (uint64_t)tk->next_cron.tv_sec, timeval_msec(tk->next_cron), (uint64_t)now.tv_sec, timeval_msec(now), sec_until); if (sec_until > 0.0) continue; log_debug("ticket cron: doing %s", tk->name); /* Set next value, handler may override. * This should already be handled via the state logic; * but to be on the safe side the renew repetition is * duplicated here, too. */ set_ticket_wakeup(tk); ticket_cron(tk); } } void tickets_log_info(void) { struct ticket_config *tk; int i; foreach_ticket(i, tk) { log_info("Ticket %s: state '%s' " "commit index %d " - "leader \"%s\" " + "leader %s " "expires %-24.24s", tk->name, state_to_string(tk->state), tk->commit_index, ticket_leader_string(tk), ctime(&tk->term_expires)); } } /* UDP message receiver. */ int message_recv(struct boothc_ticket_msg *msg, int msglen) { uint32_t from; struct booth_site *source; struct ticket_config *tk; struct booth_site *leader; uint32_t leader_u; if (check_boothc_header(&msg->header, sizeof(*msg)) < 0 || msglen != sizeof(*msg)) { log_error("message receive error"); return -1; } from = ntohl(msg->header.from); if (!find_site_by_id(from, &source) || !source) { log_error("unknown sender: %08x", from); return -1; } if (!check_ticket(msg->ticket.id, &tk)) { - log_error("got invalid ticket name \"%s\" from %s", - msg->ticket.id, source->addr_string); + log_warn("got invalid ticket name %s from %s", + msg->ticket.id, site_string(source)); return -EINVAL; } leader_u = ntohl(msg->ticket.leader); if (!find_site_by_id(leader_u, &leader)) { - log_error("Message with unknown owner %x received", leader_u); + log_error("Message with unknown owner %u received", leader_u); return -EINVAL; } return raft_answer(tk, source, leader, msg); } void set_ticket_wakeup(struct ticket_config *tk) { struct timeval tv, now; /* At least every hour, perhaps sooner. */ ticket_next_cron_in(tk, 3600); switch (tk->state) { case ST_LEADER: assert(tk->leader == local); gettimeofday(&now, NULL); tv = now; tv.tv_sec = next_vote_starts_at(tk); /* If timestamp is in the past, look again in one second. */ if (timeval_compare(tv, now) <= 0) tv.tv_sec = now.tv_sec + 1; ticket_next_cron_at(tk, tv); break; case ST_CANDIDATE: assert(tk->election_end); ticket_next_cron_at_coarse(tk, tk->election_end); break; case ST_INIT: case ST_FOLLOWER: /* If there is (or should be) some owner, check on it later on. * If no one is interested - don't care. */ if (is_owned(tk) && (local->type == SITE)) ticket_next_cron_at_coarse(tk, tk->term_expires + tk->acquire_after); break; default: log_error("unknown ticket state: %d", tk->state); } } /* Given a state (in host byte order), return a human-readable (char*). * An array is used so that multiple states can be printed in a single printf(). */ char *state_to_string(uint32_t state_ho) { union mu { cmd_request_t s; char c[5]; }; static union mu cache[6] = { { 0 } }, *cur; static int current = 0; current ++; if (current >= sizeof(cache)/sizeof(cache[0])) current = 0; cur = cache + current; cur->s = htonl(state_ho); /* Shouldn't be necessary, union array is initialized with zeroes, and * these bytes never get written. */ cur->c[4] = 0; return cur->c; } int send_reject(struct booth_site *dest, struct ticket_config *tk, cmd_result_t code) { struct boothc_ticket_msg msg; init_ticket_msg(&msg, OP_REJECTED, code, tk); return booth_udp_send(dest, &msg, sizeof(msg)); } diff --git a/src/transport.c b/src/transport.c index fe96f34..884ae5a 100644 --- a/src/transport.c +++ b/src/transport.c @@ -1,733 +1,733 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include "booth.h" #include "inline-fn.h" #include "log.h" #include "config.h" #include "ticket.h" #include "transport.h" #define BOOTH_IPADDR_LEN (sizeof(struct in6_addr)) #define NETLINK_BUFSIZE 16384 #define SOCKET_BUFFER_SIZE 160000 #define FRAME_SIZE_MAX 10000 struct booth_site *local = NULL; static int (*deliver_fn) (void *msg, int msglen); static void parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int len) { while (RTA_OK(rta, len)) { if (rta->rta_type <= max) tb[rta->rta_type] = rta; rta = RTA_NEXT(rta,len); } } static int find_address(unsigned char ipaddr[BOOTH_IPADDR_LEN], int family, int prefixlen, int fuzzy_allowed, struct booth_site **me, int *address_bits_matched) { int i; struct booth_site *node; int bytes, bits_left, mask; unsigned char node_bits, ip_bits; uint8_t *n_a; int matched, did_match; bytes = prefixlen / 8; bits_left = prefixlen % 8; /* One bit left to check means ignore 7 lowest bits. */ mask = ~( (1 << (8 - bits_left)) -1); did_match = 0; for (i = 0; i < booth_conf->site_count; i++) { node = booth_conf->site + i; if (family != node->family) continue; n_a = node_to_addr_pointer(node); for(matched = 0; matched < node->addrlen; matched++) if (ipaddr[matched] != n_a[matched]) break; if (matched == node->addrlen) { /* Full match. */ *address_bits_matched = matched * 8; found: *me = node; did_match = 1; continue; } if (!fuzzy_allowed) continue; /* Check prefix, whole bytes */ if (matched < bytes) continue; if (matched * 8 < *address_bits_matched) continue; if (!bits_left) goto found; node_bits = n_a[bytes]; ip_bits = ipaddr[bytes]; if (((node_bits ^ ip_bits) & mask) == 0) { /* _At_least_ prefixlen bits matched. */ *address_bits_matched = prefixlen; goto found; } } return did_match; } int _find_myself(int family, struct booth_site **mep, int fuzzy_allowed); int _find_myself(int family, struct booth_site **mep, int fuzzy_allowed) { int fd; struct sockaddr_nl nladdr; struct booth_site *me; unsigned char ipaddr[BOOTH_IPADDR_LEN]; static char rcvbuf[NETLINK_BUFSIZE]; struct { struct nlmsghdr nlh; struct rtgenmsg g; } req; int address_bits_matched; if (local) goto found; me = NULL; address_bits_matched = 0; if (mep) *mep = NULL; fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE); if (fd < 0) { log_error("failed to create netlink socket"); return 0; } setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); memset(&nladdr, 0, sizeof(nladdr)); nladdr.nl_family = AF_NETLINK; memset(&req, 0, sizeof(req)); req.nlh.nlmsg_len = sizeof(req); req.nlh.nlmsg_type = RTM_GETADDR; req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST; req.nlh.nlmsg_pid = 0; req.nlh.nlmsg_seq = 1; req.g.rtgen_family = family; if (sendto(fd, (void *)&req, sizeof(req), 0, (struct sockaddr*)&nladdr, sizeof(nladdr)) < 0) { close(fd); log_error("failed to send data to netlink socket"); return 0; } while (1) { int status; struct nlmsghdr *h; struct iovec iov = { rcvbuf, sizeof(rcvbuf) }; struct msghdr msg = { (void *)&nladdr, sizeof(nladdr), &iov, 1, NULL, 0, 0 }; status = recvmsg(fd, &msg, 0); if (!status) { close(fd); log_error("failed to recvmsg from netlink socket"); return 0; } h = (struct nlmsghdr *)rcvbuf; if (h->nlmsg_type == NLMSG_DONE) break; if (h->nlmsg_type == NLMSG_ERROR) { close(fd); log_error("netlink socket recvmsg error"); return 0; } while (NLMSG_OK(h, status)) { if (h->nlmsg_type == RTM_NEWADDR) { struct ifaddrmsg *ifa = NLMSG_DATA(h); struct rtattr *tb[IFA_MAX+1]; int len = h->nlmsg_len - NLMSG_LENGTH(sizeof(*ifa)); memset(tb, 0, sizeof(tb)); parse_rtattr(tb, IFA_MAX, IFA_RTA(ifa), len); memset(ipaddr, 0, BOOTH_IPADDR_LEN); /* prefer IFA_LOCAL if it exists, for p-t-p * interfaces, otherwise use IFA_ADDRESS */ if (tb[IFA_LOCAL]) { memcpy(ipaddr, RTA_DATA(tb[IFA_LOCAL]), BOOTH_IPADDR_LEN); } else { memcpy(ipaddr, RTA_DATA(tb[IFA_ADDRESS]), BOOTH_IPADDR_LEN); } /* First try with exact addresses, then optionally with subnet matching. */ if (ifa->ifa_prefixlen > address_bits_matched) find_address(ipaddr, ifa->ifa_family, ifa->ifa_prefixlen, fuzzy_allowed, &me, &address_bits_matched); } h = NLMSG_NEXT(h, status); } } close(fd); if (!me) return 0; me->local = 1; local = me; found: if (mep) *mep = local; return 1; } int find_myself(struct booth_site **mep, int fuzzy_allowed) { return _find_myself(AF_INET6, mep, fuzzy_allowed) || _find_myself(AF_INET, mep, fuzzy_allowed); } /** Checks the header fields for validity. * cf. init_header(). * For @len_incl_data < 0 the length is not checked. * Return <0 if error, else bytes read. */ int check_boothc_header(struct boothc_header *h, int len_incl_data) { int l; if (h->magic != htonl(BOOTHC_MAGIC)) { log_error("magic error %x", ntohl(h->magic)); return -EINVAL; } if (h->version != htonl(BOOTHC_VERSION)) { log_error("version error %x", ntohl(h->version)); return -EINVAL; } l = ntohl(h->length); if (l < sizeof(*h)) { log_error("length %d out of range", l); return -EINVAL; } if (len_incl_data < 0) return 0; if (l != len_incl_data) { log_error("length error - got %d, wanted %d", l, len_incl_data); return -EINVAL; } return len_incl_data; } static void process_tcp_listener(int ci) { int fd, i, one = 1; socklen_t addrlen = sizeof(struct sockaddr); struct sockaddr addr; fd = accept(clients[ci].fd, &addr, &addrlen); if (fd < 0) { log_error("process_tcp_listener: accept error %d %d", fd, errno); return; } setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one)); i = client_add(fd, clients[ci].transport, process_connection, NULL); log_debug("client connection %d fd %d", i, fd); } int setup_tcp_listener(int test_only) { int s, rv; int one = 1; s = socket(local->family, SOCK_STREAM, 0); if (s == -1) { log_error("failed to create tcp socket %s", strerror(errno)); return s; } rv = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); if (rv == -1) { log_error("failed to set the SO_REUSEADDR option"); return rv; } rv = bind(s, &local->sa6, local->saddrlen); if (test_only) { rv = (rv == -1) ? errno : 0; close(s); return rv; } if (rv == -1) { log_error("failed to bind socket %s", strerror(errno)); return rv; } rv = listen(s, 5); if (rv == -1) { log_error("failed to listen on socket %s", strerror(errno)); return rv; } return s; } static int booth_tcp_init(void * unused __attribute__((unused))) { int rv; if (get_local_id() < 0) return -1; rv = setup_tcp_listener(0); if (rv < 0) return rv; client_add(rv, booth_transport + TCP, process_tcp_listener, NULL); return 0; } static int connect_nonb(int sockfd, const struct sockaddr *saptr, socklen_t salen, int sec) { int flags, n, error; socklen_t len; fd_set rset, wset; struct timeval tval; flags = fcntl(sockfd, F_GETFL, 0); fcntl(sockfd, F_SETFL, flags | O_NONBLOCK); error = 0; if ( (n = connect(sockfd, saptr, salen)) < 0) if (errno != EINPROGRESS) return -1; if (n == 0) goto done; /* connect completed immediately */ FD_ZERO(&rset); FD_SET(sockfd, &rset); wset = rset; tval.tv_sec = sec; tval.tv_usec = 0; if ((n = select(sockfd + 1, &rset, &wset, NULL, sec ? &tval : NULL)) == 0) { /* leave outside function to close */ /* timeout */ /* close(sockfd); */ errno = ETIMEDOUT; return -1; } if (FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) { len = sizeof(error); if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) return -1; /* Solaris pending error */ } else { log_error("select error: sockfd not set"); return -1; } done: fcntl(sockfd, F_SETFL, flags); /* restore file status flags */ if (error) { /* leave outside function to close */ /* close(sockfd); */ errno = error; return -1; } return 0; } int booth_tcp_open(struct booth_site *to) { int s, rv; if (to->tcp_fd >= STDERR_FILENO) goto found; s = socket(to->family, SOCK_STREAM, 0); if (s == -1) { log_error("cannot create socket of family %d", to->family); return -1; } rv = connect_nonb(s, (struct sockaddr *)&to->sa6, to->saddrlen, 10); if (rv == -1) { if( errno == ETIMEDOUT) - log_error("connect to \"%s\" got a timeout", to->addr_string); + log_error("connect to %s got a timeout", site_string(to)); else - log_error("connect to \"%s\" got an error: %s", to->addr_string, + log_error("connect to %s got an error: %s", site_string(to), strerror(errno)); goto error; } to->tcp_fd = s; found: return 1; error: if (s >= 0) close(s); return -1; } int booth_tcp_send(struct booth_site *to, void *buf, int len) { return do_write(to->tcp_fd, buf, len); } static int booth_tcp_recv(struct booth_site *from, void *buf, int len) { int got; /* Needs timeouts! */ got = do_read(from->tcp_fd, buf, len); if (got < 0) return got; return len; } static int booth_tcp_close(struct booth_site *to) { if (to) { if (to->tcp_fd > STDERR_FILENO) close(to->tcp_fd); to->tcp_fd = -1; } return 0; } static int booth_tcp_exit(void) { return 0; } static int setup_udp_server(void) { int rv, fd; int one = 1; unsigned int recvbuf_size; fd = socket(local->family, SOCK_DGRAM, 0); if (fd == -1) { log_error("failed to create UDP socket %s", strerror(errno)); goto ex; } rv = fcntl(fd, F_SETFL, O_NONBLOCK); if (rv == -1) { log_error("failed to set non-blocking operation " "on UDP socket: %s", strerror(errno)); goto ex; } rv = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); if (rv == -1) { log_error("failed to set the SO_REUSEADDR option"); goto ex; } rv = bind(fd, (struct sockaddr *)&local->sa6, local->saddrlen); if (rv == -1) { log_error("failed to bind UDP socket to [%s]:%d: %s", - local->addr_string, booth_conf->port, + site_string(local), booth_conf->port, strerror(errno)); goto ex; } recvbuf_size = SOCKET_BUFFER_SIZE; rv = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, sizeof(recvbuf_size)); if (rv == -1) { log_error("failed to set recvbuf size"); goto ex; } local->udp_fd = fd; return 0; ex: if (fd >= 0) close(fd); return -1; } /* Receive/process callback for UDP */ static void process_recv(int ci) { struct sockaddr_storage sa; int rv; socklen_t sa_len; char buffer[256]; /* Used for unit tests */ struct boothc_ticket_msg *msg; sa_len = sizeof(sa); msg = (void*)buffer; rv = recvfrom(clients[ci].fd, buffer, sizeof(buffer), MSG_NOSIGNAL | MSG_DONTWAIT, (struct sockaddr *)&sa, &sa_len); if (rv == -1) return; deliver_fn(msg, rv); } static int booth_udp_init(void *f) { int rv; rv = setup_udp_server(); if (rv < 0) return rv; deliver_fn = f; client_add(local->udp_fd, booth_transport + UDP, process_recv, NULL); return 0; } int booth_udp_send(struct booth_site *to, void *buf, int len) { int rv; rv = sendto(local->udp_fd, buf, len, MSG_NOSIGNAL, (struct sockaddr *)&to->sa6, to->saddrlen); if (rv == len) { rv = 0; } else if (rv < 0) { rv = errno; - log_error("Cannot send to \"%s\": %d %s", - to->addr_string, + log_error("Cannot send to %s: %d %s", + site_string(to), errno, strerror(errno)); } else { rv = EBUSY; - log_error("Packet sent to \"%s\" got truncated", - to->addr_string); + log_error("Packet sent to %s got truncated", + site_string(to)); } return rv; } static int booth_udp_broadcast(void *buf, int len) { int i, rv, rvs; struct booth_site *site; if (!booth_conf || !booth_conf->site_count) return -1; rvs = 0; foreach_node(i, site) { if (site != local) { rv = booth_udp_send(site, buf, len); if (!rvs) rvs = rv; } } return rvs; } static int booth_udp_exit(void) { return 0; } /* SCTP transport layer has not been developed yet */ static int booth_sctp_init(void *f __attribute__((unused))) { return 0; } static int booth_sctp_send(struct booth_site * to __attribute__((unused)), void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int booth_sctp_broadcast(void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int return_0_booth_site(struct booth_site *v __attribute((unused))) { return 0; } static int return_0(void) { return 0; } const struct booth_transport booth_transport[TRANSPORT_ENTRIES] = { [TCP] = { .name = "TCP", .init = booth_tcp_init, .open = booth_tcp_open, .send = booth_tcp_send, .recv = booth_tcp_recv, .close = booth_tcp_close, .exit = booth_tcp_exit }, [UDP] = { .name = "UDP", .init = booth_udp_init, .open = return_0_booth_site, .send = booth_udp_send, .close = return_0_booth_site, .broadcast = booth_udp_broadcast, .exit = booth_udp_exit }, [SCTP] = { .name = "SCTP", .init = booth_sctp_init, .open = return_0_booth_site, .send = booth_sctp_send, .broadcast = booth_sctp_broadcast, .exit = return_0, } }; const struct booth_transport *local_transport = booth_transport+TCP; int send_header_only(int fd, struct boothc_header *hdr) { int rv; rv = do_write(fd, hdr, sizeof(*hdr)); return rv; } int send_ticket_msg(int fd, struct boothc_ticket_msg *msg) { int rv; rv = do_write(fd, msg, sizeof(*msg)); return rv; } int send_header_plus(int fd, struct boothc_header *hdr, void *data, int len) { int rv; int l; if (data == hdr->data) { l = sizeof(*hdr) + len; assert(l == ntohl(hdr->length)); /* One struct */ rv = do_write(fd, hdr, l); } else { /* Header and data in two locations */ rv = send_header_only(fd, hdr); if (rv >= 0 && len) rv = do_write(fd, data, len); } return rv; }