diff --git a/docs/boothd.8.txt b/docs/boothd.8.txt index b43283c..c3765c9 100644 --- a/docs/boothd.8.txt +++ b/docs/boothd.8.txt @@ -1,367 +1,379 @@ BOOTHD(8) =========== :doctype: manpage NAME ---- boothd - The Booth Cluster Ticket Manager. SYNOPSIS -------- *boothd* 'daemon' ['-D'] [-c 'config'] *booth* ['client'] {'list'} [-S 'site'] ['-D'] [-c 'config'] *booth* ['client'] {'grant'|'revoke'} [-S 'site'] ['-D'] [-t] 'ticket' [-c 'config'] *booth* 'status' ['-D'] [-c 'config'] DESCRIPTION ----------- Booth manages tickets which authorizes one of the cluster sites located in geographically dispersed distances to run certain resources. It is designed to be an add-on to Pacemaker, which extends Pacemaker to support geographically distributed clustering. It is based on the RAFT protocol, see eg. for details. SHORT EXAMPLES -------------- --------------------- # boothd daemon # boothd client list # boothd client grant -t ticket-nfs # boothd client revoke -t ticket-nfs --------------------- OPTIONS ------- *-c*:: Configuration to use. + Can be a full path to a configuration file, or a short name; in the latter case, the directory '/etc/booth' and suffix '.conf' are added. Per default 'booth' is used, which results in the path '/etc/booth/booth.conf'. + The configuration name also determines the name of the PID file - for the defaults, '/var/run/booth/booth.pid'. *-D*:: Debug output/don't daemonize. Increases the debug output level; for 'boothd daemon', keeps the process in the foreground. *-h*, *--help*:: Give a short usage output. *-s*:: Site address. *-t*:: Ticket name. *-v*, *--version*:: Report version information. *-S*:: 'systemd' mode: don't fork. This is like '-D' but without the debug output. COMMANDS -------- Whether the binary is called as 'boothd' or 'booth' doesn't matter; the first argument determines the mode of operation. *'daemon'*:: Tells 'boothd' to serve a site. The locally configured interfaces are searched for an IP address that got defined in the configuration, so that Booth can operate in /arbitrator/ resp. /site/ mode. *'client'*:: Allows to list the ticket information (see also 'crm_ticket -L'), and to revoke or (initially) grant tickets to a site. + In this mode the configuration file is searched for an IP address that is locally reachable, ie. matches a configured subnet. This allows to run the client commands on another node in the same cluster, as long as the config file and the service IP is locally reachable. + Example: If the booth service IP is 192.168.55.200, and the local node has 192.168.55.15 configured on an interface, it knows which site it belongs to. + The client can also ask another site; use '-s' to tell where to connect to. *'status'*:: 'boothd' looks for the (locked) PID file and the UDP socket, prints some output to stdout (for use in shell scripts) and returns a OCF-compatible return code. With '-D', a human-readable message is printed to STDERR as well. CONFIGURATION FILE ------------------ A basic file looks like this: ----------------------- site="192.168.201.100" site="192.168.202.100" arbitrator="192.168.203.100" ticket="I-want-a-pony" ----------------------- You can use comment lines, by starting them with a hash-sign (''#''). Whitespace at the start and end of the line, and around the ''='', are ignored. The following key/value pairs are defined: *'port'*:: The UDP/TCP port to use. Default is '9929'. *'transport'*:: - The transport protocol to use for PAXOS exchanges. + The transport protocol to use for Raft exchanges. Currently only UDP is available. + Please note that the client mode always uses TCP to talk to a daemon; Booth will always bind and listen to *both* UDP and TCP ports. *'site'*, *'arbitrator'*:: - Defines a PAXOS member with the given IP, which should be a service IP. + Defines a Raft member with the given IP, which should be a service IP. + You will need at least three members for normal operation; an odd number is preferred. *'ticket'*:: Registers a ticket. Multiple tickets can be handled in a single Booth instance. The next items modify per-ticket defaults. They are stored as defaults for further tickets, and are used as value for the last defined ticket (if any). *'expire'*:: - The lease time for a ticket, in seconds. After that time the ticket gets + The lease time for a ticket, in seconds. After that time the ticket can be revoked, and another site can get it. + Typically 'booth' will try to renew a held ticket after half the lease time. *'timeout'*:: After that time 'booth' will re-send packets if there was an insufficient number of replies. + The default is '3'. *'weights'*:: A comma-separated list of integers that define the weight of individual - PAXOS members, in the same order as the 'site' and 'arbitrator' lines. + Raft members, in the same order as the 'site' and 'arbitrator' lines. + Default is '0' for all; this means that the ordering within the configuration file defines a kind of priority for conflicting requests. *'acquire-after'*:: Setting this to a positive value will make 'booth' try to acquire a ticket that got lost. + Ie. if the site that _had_ the ticket is not reachable any more, then 'acquire-after' seconds after ticket expiration other sites will try to activate the ticket. (Only one will succeed, though.) + A typical delay might be 60 seconds. *'retries'*:: Defines how often broadcast packets are sent out before the current action (grant, revoke) is aborted. + Default is 10; values lower than 3 are forbidden, and high values won't make much sense, too. + Please note that this counts only for a single packet; if ticket *renewal* runs into this limit (because the network was temporarily down), but the ticket is still valid afterwards, a new renewal run will be started automatically. *'site-user'*, *'site-group'*, *'arbitrator-user'*, *'arbitrator-group'*:: These define the credentials 'boothd' will be running with. + On a (Pacemaker) site the booth process will have to call 'crm_ticket', so the default is to use 'hacluster':'haclient'; for an arbitrator this user and group might not exists, so that will default to 'nobody':'nobody'. -*'before-acquire-handler':: +*'before-acquire-handler'*:: If set, this script/program will be called before 'boothd' tries to acquire or renew a ticket. Only a clean exit will allow 'boothd' to proceed; any other return value will cancel the operation. + This makes it possible to check whether it makes sense to try to acquire the ticket; eg. if a service in the dependency-chain has a failcount of 'INFINITY' on all available nodes, the service will be unable to run - and so another cluster (and not this one!) should try to start it. + Please assume that 'boothd' will wait synchronously for the result of that call, so having that program return quickly would be an advantage. + Please see below for details about available environment variables. A more verbose example of a configuration file might be ----------------------- transport = udp port = 9930 # D-85774 site="192.168.201.100" # D-90409 site="::ffff:192.168.202.100" # A-1120 arbitrator="192.168.203.100" ticket="I-want-a-pony" expire = 600 acquire-after = 60 timeout = 10 retries = 5 ----------------------- NOTES ----- Please note that Booth tickets are not meant to be real-time - a reasonable 'expire' time might be 300 seconds (5 minutes). Due to possible delays on the WAN connections it makes no sense to expect detection of problems and failover within a few seconds. 'booth' works with IPv6 addresses, too. 'booth' will start to renew a ticket before it expires, to account for transmission delays. This will happen so that (the bigger one of) half the 'expire' time, or 'timeout'*'retries'/2 seconds, will be left for the renewal. Of course, that means that with bad configuration values (eg. 'expire' 60 seconds, 'timeout' 3 seconds, and 'retries' > 40) the ticket renewal process will be started just after the ticket got acquired. HANDLERS -------- Currently, there's only one external handler defined (see the 'before-acquire-handler' configuration item above). It gets the following data via the environment: *'BOOTH_TICKET':: The ticket name, as given in the configuration file. (See 'ticket' item above.) *'BOOTH_LOCAL':: The local site specification, as defined in 'site'. *'BOOTH_CONF_PATH':: The path to the active configuration file. *'BOOTH_CONF_NAME':: The configuration name, as used by the '-c' commandline argument. *'BOOTH_TICKET_EXPIRES':: Timestamp for the ticket expiration (seconds since 1.1.1970), or '0'. FILES ----- *'/etc/booth/booth.conf'*:: The default configuration file name. See also the '-c' argument. *'/var/run/booth/'*:: Directory that holds PID/lock files. See also the 'status' command. +RAFT IMPLEMENTATION +------------------- + +Basically, each Pacemaker ticket corresponds to a separate Raft cluster. + +A ticket is granted _only_ to the Raft _Leader_, but a Leader needs not grant the ticket to Pacemaker. +To move a ticket, the Leader withdraws, and votes for the new Leader instead. + +So, the Raft "log" consists of -- nothing, more or less; there's no history to keep. + + SYSTEMD INTEGRATION ------------------- The Booth sources (and, very likely, packages too) include a 'systemd' unit file for 'boothd'. So don't forget to install 'boothd' into 'systemd' after configuration! ----------- # systemctl enable booth@{configurationname}.service # systemctl start booth@{configurationname}.service ----------- EXIT STATUS ----------- *0*:: Success. For the 'status' command: Daemon running. *1* (PCMK_OCF_UNKNOWN_ERROR):: General error code. *7* (PCMK_OCF_NOT_RUNNING):: No daemon process for that configuration active. BUGS ---- Probably. Please report them on GitHub: AUTHOR ------ 'boothd' was originally written (mostly) by Jiaju Zhang. Many people have contributed to it. In 2013 Philipp Marek took over maintainership. RESOURCES --------- GitHub: Documentation: COPYING ------- Copyright (C) 2011 Jiaju Zhang Copyright (C) 2013-2014 Philipp Marek Free use of this software is granted under the terms of the GNU General Public License (GPL). +// vim: set ft=asciidoc : diff --git a/script/wireshark-dissector.lua b/script/wireshark-dissector.lua index f227ac0..d45b7a5 100644 --- a/script/wireshark-dissector.lua +++ b/script/wireshark-dissector.lua @@ -1,67 +1,68 @@ -- dofile("wireshark-dissector.lua") -- do booth_proto = Proto("Booth","Booth") function T32(tree, buffer, start, format) local b = buffer(start, 4) return tree:add(b, string.format(format, b:uint())) end function booth_proto.dissector(buffer, pinfo, tree) local endbuf = buffer:len() pinfo.cols.protocol = "Booth" if (endbuf < 24) then pinfo.cols.info = "Booth - too small" else local hdr = tree:add(booth_proto, buffer(0, 24), "Booth header") local cmd = buffer(28, 4) local tcmd = T32(hdr, cmd, 0, "Cmd %08x, \"" .. cmd:string() .. "\""); local from = buffer(20, 4) local tfrom = T32(hdr, from, 0, "From %08x"); if bit.band(from:uint(), 0x80000000) > 0 then tfrom:add_expert_info(PI_PROTOCOL, PI_WARN, "Highest bit set") end local len = buffer(24, 4) local tlen = T32(hdr, len, 0, "Length %8d"); if len:uint() > 1000 then tlen:add_expert_info(PI_PROTOCOL, PI_WARN, "Length too big?") end T32(hdr, buffer, 32, "Result %08x"); T32(hdr, buffer, 12, "Magic %08x"); T32(hdr, buffer, 16, "Version %08x"); T32(hdr, buffer, 0, "IV %08x"); T32(hdr, buffer, 4, "Auth1 %08x"); T32(hdr, buffer, 8, "Auth2 %08x"); if (endbuf > 36) then local tick = tree:add(booth_proto, buffer(36, endbuf-36), "Booth data") local name = buffer(36, 64) tick:add(name, "Ticket name: ", name:string()) - T32(tick, buffer, 36+64 + 0, "Owner: %08x") - T32(tick, buffer, 36+64 + 4, "Ballot: %08x") - T32(tick, buffer, 36+64 + 8, "Prev. Ballot: %08x") - T32(tick, buffer, 36+64 + 12, "Expiry: %8d") + T32(tick, buffer, 36+64 + 0, "Leader: %08x") + T32(tick, buffer, 36+64 + 4, "Term: %08x") + T32(tick, buffer, 36+64 + 8, "Term valid for: %08x") + T32(tick, buffer, 36+64 + 12, "last Log index: %8d") + T32(tick, buffer, 36+64 + 16, "Leader commit: %8d") end pinfo.cols.info = "Booth, cmd " .. cmd:string() end tree:add(booth_proto, buffer(0, endbuf), "data") end local tbl = DissectorTable.get("udp.port") tbl:add(9929, booth_proto) local tbl = DissectorTable.get("tcp.port") tbl:add(9929, booth_proto) end diff --git a/src/booth.h b/src/booth.h index 737645a..0bb3f7c 100644 --- a/src/booth.h +++ b/src/booth.h @@ -1,231 +1,236 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _BOOTH_H #define _BOOTH_H #include #include #include #include #include #define BOOTH_RUN_DIR "/var/run/booth/" #define BOOTH_LOG_DIR "/var/log" #define BOOTH_LOGFILE_NAME "booth.log" #define BOOTH_DEFAULT_CONF_DIR "/etc/booth/" #define BOOTH_DEFAULT_CONF_NAME "booth" #define BOOTH_DEFAULT_CONF_EXT ".conf" #define BOOTH_DEFAULT_CONF \ BOOTH_DEFAULT_CONF_DIR BOOTH_DEFAULT_CONF_NAME BOOTH_DEFAULT_CONF_EXT #define DAEMON_NAME "boothd" #define BOOTH_PATH_LEN 127 #define BOOTH_DEFAULT_PORT 9929 /* TODO: remove */ #define BOOTH_PROTO_FAMILY AF_INET #define BOOTHC_MAGIC 0x5F1BA08C #define BOOTHC_VERSION 0x00010002 /** Timeout value for poll(). * Determines frequency of periodic jobs, eg. when send-retries are done. * See process_tickets(). */ #define POLL_TIMEOUT 1000 /** @{ */ /** The on-network data structures and constants. */ #define BOOTH_NAME_LEN 64 -#define NO_OWNER (-1) +/* NONE wouldn't be specific enough. */ +#define NO_ONE (-1) typedef unsigned char boothc_site [BOOTH_NAME_LEN]; typedef unsigned char boothc_ticket[BOOTH_NAME_LEN]; struct boothc_header { /** Authentication data; not used now. */ uint32_t iv; uint32_t auth1; uint32_t auth2; /** BOOTHC_MAGIC */ uint32_t magic; /** BOOTHC_VERSION */ uint32_t version; /** Packet source; site_id. See add_site(). */ uint32_t from; /** Length including header */ uint32_t length; /** The command respectively protocol state. See cmd_request_t. */ uint32_t cmd; /** Result of operation. 0 == OK */ uint32_t result; char data[0]; } __attribute__((packed)); struct ticket_msg { /** Ticket name. */ boothc_ticket id; - /** Current leader. May be NO_OWNER. See add_site(). -* For a OP_REQ_VOTE this is */ + /** Current leader. May be NO_ONE. See add_site(). + * For a OP_REQ_VOTE this is */ uint32_t leader; /** Current term. */ uint32_t term; uint32_t term_valid_for; +#if 0 union { uint32_t prev_log_term; uint32_t last_log_term; }; +#endif union { uint32_t prev_log_index; uint32_t last_log_index; }; uint32_t leader_commit; } __attribute__((packed)); struct boothc_ticket_msg { struct boothc_header header; struct ticket_msg ticket; } __attribute__((packed)); #define CHAR2CONST(a,b,c,d) ((a << 24) | (b << 16) | (c << 8) | d) #define STG2CONST(X) ({ const char _ggg[4] = X; return (uint32_t*)_ggg; }) typedef enum { /* 0x43 = "C"ommands */ CMD_LIST = CHAR2CONST('C', 'L', 's', 't'), CMD_GRANT = CHAR2CONST('C', 'G', 'n', 't'), CMD_REVOKE = CHAR2CONST('C', 'R', 'v', 'k'), /* Replies */ CMR_GENERAL = CHAR2CONST('G', 'n', 'l', 'R'), // Increase distance to CMR_GRANT CMR_LIST = CHAR2CONST('R', 'L', 's', 't'), CMR_GRANT = CHAR2CONST('R', 'G', 'n', 't'), CMR_REVOKE = CHAR2CONST('R', 'R', 'v', 'k'), /* Raft */ OP_REQ_VOTE = CHAR2CONST('R', 'V', 'o', 't'), OP_VOTE_FOR = CHAR2CONST('V', 't', 'F', 'r'), - OP_APP_ENTRY= CHAR2CONST('A', 'p', 'p', 'E'), - OP_REJECTED = CHAR2CONST('R', 'J', 'C', '!'), + OP_HEARTBEAT= CHAR2CONST('H', 'r', 't', 'B'), /* AppendEntry in Raft */ + OP_MY_INDEX = CHAR2CONST('M', 'I', 'd', 'x'), /* Answer to Heartbeat */ + OP_REJECTED = CHAR2CONST('R', 'J', 'C', '!'), } cmd_request_t; /* TODO: make readable constants */ typedef enum { /* for compatibility with other functions */ RLT_SUCCESS = 0, RLT_ASYNC = CHAR2CONST('A', 's', 'y', 'n'), RLT_SYNC_SUCC = CHAR2CONST('S', 'c', 'c', 's'), RLT_SYNC_FAIL = CHAR2CONST('F', 'a', 'i', 'l'), RLT_INVALID_ARG = CHAR2CONST('I', 'A', 'r', 'g'), RLT_OVERGRANT = CHAR2CONST('O', 'v', 'e', 'r'), RLT_PROBABLY_SUCCESS = CHAR2CONST('S', 'u', 'c', '?'), RLT_BUSY = CHAR2CONST('B', 'u', 's', 'y'), + RLT_TERM_OUTDATED = CHAR2CONST('t', 'O', 'd', 'a'), } cmd_result_t; /** @} */ /** @{ */ struct booth_site { /** Calculated ID. See add_site(). */ int site_id; int type; int local; /** Roles, like ACCEPTOR, PROPOSER, or LEARNER. Not really used ATM. */ int role; char addr_string[BOOTH_NAME_LEN]; int tcp_fd; int udp_fd; /* 0-based, used for indexing into per-ticket weights */ int index; uint64_t bitmask; unsigned short family; union { struct sockaddr_in sa4; struct sockaddr_in6 sa6; }; int saddrlen; int addrlen; } __attribute__((packed)); extern struct booth_site *local; /** @} */ struct booth_transport; struct client { int fd; const struct booth_transport *transport; void (*workfn)(int); void (*deadfn)(int); }; extern struct client *clients; extern struct pollfd *pollfds; int client_add(int fd, const struct booth_transport *tpt, void (*workfn)(int ci), void (*deadfn)(int ci)); int do_read(int fd, void *buf, size_t count); int do_write(int fd, void *buf, size_t count); void process_connection(int ci); void safe_copy(char *dest, char *value, size_t buflen, const char *description); struct command_line { int type; /* ACT_ */ int op; /* OP_ */ char configfile[BOOTH_PATH_LEN]; char lockfile[BOOTH_PATH_LEN]; char site[BOOTH_NAME_LEN]; struct boothc_ticket_msg msg; }; extern struct command_line cl; #endif /* _BOOTH_H */ diff --git a/src/config.c b/src/config.c index 373003f..a88df22 100644 --- a/src/config.c +++ b/src/config.c @@ -1,711 +1,711 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include "booth.h" #include "config.h" #include "raft.h" #include "ticket.h" #include "log.h" static int ticket_size = 0; static int ticket_realloc(void) { const int added = 5; int had, want; void *p; had = booth_conf->ticket_allocated; want = had + added; p = realloc(booth_conf->ticket, sizeof(struct ticket_config) * want); if (!booth_conf) { log_error("can't alloc more tickets"); return -ENOMEM; } booth_conf->ticket = p; memset(booth_conf->ticket + had, 0, sizeof(struct ticket_config) * added); booth_conf->ticket_allocated = want; return 0; } int add_site(char *address, int type); int add_site(char *addr_string, int type) { int rv; struct booth_site *site; uLong nid; uint32_t mask; rv = 1; if (booth_conf->site_count == MAX_NODES) { log_error("too many nodes"); goto out; } if (strlen(addr_string)+1 >= sizeof(booth_conf->site[0].addr_string)) { log_error("site address \"%s\" too long", addr_string); goto out; } site = booth_conf->site + booth_conf->site_count; site->family = BOOTH_PROTO_FAMILY; site->type = type; /* Make site_id start at a non-zero point. * Perhaps use hash over string or address? */ strcpy(site->addr_string, addr_string); nid = crc32(0L, NULL, 0); /* booth_config() uses memset(), so sizeof() is guaranteed to give * the same result everywhere - no uninitialized bytes. */ site->site_id = crc32(nid, site->addr_string, sizeof(site->addr_string)); - /* Make sure we will never collide with NO_OWNER, + /* Make sure we will never collide with NO_ONE, * or be negative (to get "get_local_id() < 0" working). */ mask = 1 << (sizeof(site->site_id)*8 -1); - assert(NO_OWNER & mask); + assert(NO_ONE & mask); site->site_id &= ~mask; site->index = booth_conf->site_count; site->bitmask = 1 << booth_conf->site_count; /* Catch site overflow */ assert(site->bitmask); booth_conf->site_bits |= site->bitmask; site->tcp_fd = -1; booth_conf->site_count++; rv = 0; memset(&site->sa6, 0, sizeof(site->sa6)); if (inet_pton(AF_INET, site->addr_string, &site->sa4.sin_addr) > 0) { site->family = AF_INET; site->sa4.sin_family = site->family; site->sa4.sin_port = htons(booth_conf->port); site->saddrlen = sizeof(site->sa4); site->addrlen = sizeof(site->sa4.sin_addr); } else if (inet_pton(AF_INET6, site->addr_string, &site->sa6.sin6_addr) > 0) { site->family = AF_INET6; site->sa6.sin6_family = site->family; site->sa6.sin6_flowinfo = 0; site->sa6.sin6_port = htons(booth_conf->port); site->saddrlen = sizeof(site->sa6); site->addrlen = sizeof(site->sa6.sin6_addr); } else { log_error("Address string \"%s\" is bad", site->addr_string); rv = EINVAL; } out: return rv; } inline static char *skip_while_in(const char *cp, int (*fn)(int), const char *allowed) { /* strchr() returns a pointer to the terminator if *cp == 0. */ while (*cp && (fn(*cp) || strchr(allowed, *cp))) cp++; /* discard "const" qualifier */ return (char*)cp; } inline static char *skip_while(char *cp, int (*fn)(int)) { while (fn(*cp)) cp++; return cp; } inline static char *skip_until(char *cp, char expected) { while (*cp && *cp != expected) cp++; return cp; } static inline int is_end_of_line(char *cp) { char c = *cp; return c == '\n' || c == 0 || c == '#'; } static int add_ticket(const char *name, struct ticket_config **tkp, const struct ticket_config *def) { int rv; struct ticket_config *tk; if (booth_conf->ticket_count == booth_conf->ticket_allocated) { rv = ticket_realloc(); if (rv < 0) return rv; } tk = booth_conf->ticket + booth_conf->ticket_count; booth_conf->ticket_count++; if (!check_max_len_valid(name, sizeof(tk->name))) { log_error("ticket name \"%s\" too long.", name); return -EINVAL; } if (find_ticket_by_name(name, NULL)) { log_error("ticket name \"%s\" used again.", name); return -EINVAL; } if (* skip_while_in(name, isalnum, "-/")) { log_error("ticket name \"%s\" invalid; only alphanumeric names.", name); return -EINVAL; } strcpy(tk->name, name); tk->timeout = def->timeout; - tk->expiry = def->expiry; + tk->term_duration = def->term_duration; tk->retries = def->retries; memcpy(tk->weight, def->weight, sizeof(tk->weight)); tk->state = ST_INIT; if (tkp) *tkp = tk; return 0; } /* returns number of weights, or -1 on bad input. */ static int parse_weights(const char *input, int weights[MAX_NODES]) { int i, v; char *cp; for(i=0; iproto = UDP; booth_conf->port = BOOTH_DEFAULT_PORT; /* Provide safe defaults. -1 is reserved, though. */ booth_conf->uid = -2; booth_conf->gid = -2; strcpy(booth_conf->site_user, "hacluster"); strcpy(booth_conf->site_group, "haclient"); strcpy(booth_conf->arb_user, "nobody"); strcpy(booth_conf->arb_group, "nobody"); parse_weights("", defaults.weight); defaults.ext_verifier = NULL; - defaults.expiry = DEFAULT_TICKET_EXPIRY; + defaults.term_duration = DEFAULT_TICKET_EXPIRY; defaults.timeout = DEFAULT_TICKET_TIMEOUT; defaults.retries = DEFAULT_RETRIES; defaults.acquire_after = 0; error = ""; log_debug("reading config file %s", path); while (fgets(line, sizeof(line), fp)) { lineno++; s = skip_while(line, isspace); if (is_end_of_line(s)) continue; key = s; /* Key */ end_of_key = skip_while_in(key, isalnum, "-_"); if (end_of_key == key) { error = "No key"; goto err; } if (!*end_of_key) goto exp_equal; /* whitespace, and something else but nothing more? */ s = skip_while(end_of_key, isspace); if (*s != '=') { exp_equal: error = "Expected '=' after key"; goto err; } s++; /* It's my buffer, and I terminate if I want to. */ /* But not earlier than that, because we had to check for = */ *end_of_key = 0; /* Value tokenizing */ s = skip_while(s, isspace); switch (*s) { case '"': case '\'': val = s+1; s = skip_until(val, *s); /* Terminate value */ if (!*s) { error = "Unterminated quoted string"; goto err; } /* Remove and skip quote */ *s = 0; s++; if (* skip_while(s, isspace)) { error = "Surplus data after value"; goto err; } *s = 0; break; case 0: no_value: error = "No value"; goto err; break; default: val = s; /* Rest of line. */ i = strlen(s); /* i > 0 because of "case 0" above. */ while (i > 0 && isspace(s[i-1])) i--; s += i; *s = 0; } if (val == s) goto no_value; if (strlen(key) > BOOTH_NAME_LEN || strlen(val) > BOOTH_NAME_LEN) { error = "key/value too long"; goto err; } if (strcmp(key, "transport") == 0) { if (got_transport) { error = "config file has multiple transport lines"; goto err; } if (strcasecmp(val, "UDP") == 0) booth_conf->proto = UDP; else if (strcasecmp(val, "SCTP") == 0) booth_conf->proto = SCTP; else { error = "invalid transport protocol"; goto err; } got_transport = 1; continue; } if (strcmp(key, "port") == 0) { booth_conf->port = atoi(val); continue; } if (strcmp(key, "name") == 0) { safe_copy(booth_conf->name, val, BOOTH_NAME_LEN, "name"); continue; } if (strcmp(key, "site") == 0) { if (add_site(val, SITE)) goto out; continue; } if (strcmp(key, "arbitrator") == 0) { if (add_site(val, ARBITRATOR)) goto out; continue; } if (strcmp(key, "ticket") == 0) { if (add_ticket(val, &last_ticket, &defaults)) goto out; /* last_ticket is valid until another one is needed - * and then it already has the new address and * is valid again. */ continue; } if (strcmp(key, "expire") == 0) { - defaults.expiry = strtol(val, &s, 0); - if (*s || s == val || defaults.expiry<10) { + defaults.term_duration = strtol(val, &s, 0); + if (*s || s == val || defaults.term_duration<10) { error = "Expected plain integer value >=10 for expire"; goto err; } if (last_ticket) - last_ticket->expiry = defaults.expiry; + last_ticket->term_duration = defaults.term_duration; continue; } if (strcmp(key, "site-user") == 0) { safe_copy(booth_conf->site_user, optarg, BOOTH_NAME_LEN, "site-user"); continue; } if (strcmp(key, "site-group") == 0) { safe_copy(booth_conf->site_group, optarg, BOOTH_NAME_LEN, "site-group"); continue; } if (strcmp(key, "arbitrator-user") == 0) { safe_copy(booth_conf->arb_user, optarg, BOOTH_NAME_LEN, "arbitrator-user"); continue; } if (strcmp(key, "arbitrator-group") == 0) { safe_copy(booth_conf->arb_group, optarg, BOOTH_NAME_LEN, "arbitrator-group"); continue; } if (strcmp(key, "timeout") == 0) { defaults.timeout = strtol(val, &s, 0); if (*s || s == val || defaults.timeout<1) { error = "Expected plain integer value >=1 for timeout"; goto err; } if (last_ticket) last_ticket->timeout = defaults.timeout; continue; } if (strcmp(key, "retries") == 0) { defaults.retries = strtol(val, &s, 0); if (*s || s == val || defaults.retries<3 || defaults.retries > 100) { error = "Expected plain integer value in the range [3, 100] for retries"; goto err; } if (last_ticket) last_ticket->retries = defaults.retries; continue; } if (strcmp(key, "acquire-after") == 0) { defaults.acquire_after = strtol(val, &s, 0); if (*s || s == val || defaults.acquire_after<0) { error = "Expected plain integer value >=1 for acquire-after"; goto err; } if (last_ticket) last_ticket->acquire_after = defaults.acquire_after; continue; } if (strcmp(key, "before-acquire-handler") == 0) { defaults.ext_verifier = strdup(val); if (*s || s == val || defaults.timeout<1) { error = "Expected plain integer value >=1 for timeout"; goto err; } if (last_ticket) last_ticket->ext_verifier = defaults.ext_verifier; continue; } if (strcmp(key, "weights") == 0) { if (parse_weights(val, defaults.weight) < 0) goto out; if (last_ticket) memcpy(last_ticket->weight, defaults.weight, sizeof(last_ticket->weight)); continue; } error = "Unknown item"; goto out; } if ((booth_conf->site_count % 2) == 0) { log_warn("An odd number of nodes is strongly recommended!"); } /* Default: make config name match config filename. */ if (!booth_conf->name[0]) { cp = strrchr(path, '/'); if (!cp) cp = path; /* TODO: locale? */ /* NUL-termination by memset. */ for(i=0; iname[i] = *(cp++); /* Last resort. */ if (!booth_conf->name[0]) strcpy(booth_conf->name, "booth"); } return 0; err: out: log_error("%s in config file line %d", error, lineno); free(booth_conf); booth_conf = NULL; return -1; } int check_config(int type) { struct passwd *pw; struct group *gr; char *cp, *input; if (!booth_conf) return -1; input = (type == ARBITRATOR) ? booth_conf->arb_user : booth_conf->site_user; if (!*input) goto u_inval; if (isdigit(input[0])) { booth_conf->uid = strtol(input, &cp, 0); if (*cp != 0) { u_inval: log_error("User \"%s\" cannot be resolved into a UID.", input); return ENOENT; } } else { pw = getpwnam(input); if (!pw) goto u_inval; booth_conf->uid = pw->pw_uid; } input = (type == ARBITRATOR) ? booth_conf->arb_group : booth_conf->site_group; if (!*input) goto g_inval; if (isdigit(input[0])) { booth_conf->gid = strtol(input, &cp, 0); if (*cp != 0) { g_inval: log_error("Group \"%s\" cannot be resolved into a UID.", input); return ENOENT; } } else { gr = getgrnam(input); if (!gr) goto g_inval; booth_conf->gid = gr->gr_gid; } /* TODO: check whether uid or gid is 0 again? * The admin may shoot himself in the foot, though. */ return 0; } int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type) { struct booth_site *n; int i; if (!booth_conf) return 0; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if ((n->type == SITE || any_type) && strcmp(n->addr_string, site) == 0) { *node = n; return 1; } } return 0; } int find_site_by_id(uint32_t site_id, struct booth_site **node) { struct booth_site *n; int i; - if (site_id == NO_OWNER) { + if (site_id == NO_ONE) { *node = NULL; return 1; } if (!booth_conf) return 0; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if (n->site_id == site_id) { *node = n; return 1; } } return 0; } const char *type_to_string(int type) { switch (type) { case ARBITRATOR: return "arbitrator"; case SITE: return "site"; case CLIENT: return "client"; } return "??invalid-type??"; } diff --git a/src/config.h b/src/config.h index 4f2505e..5806442 100644 --- a/src/config.h +++ b/src/config.h @@ -1,161 +1,174 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _CONFIG_H #define _CONFIG_H #include #include "booth.h" #include "raft.h" #include "transport.h" /** @{ */ /** Definitions for in-RAM data. */ #define MAX_NODES 16 #define TICKET_ALLOC 16 struct ticket_config { /** \name Configuration items. * @{ */ /** Name of ticket. */ boothc_ticket name; - /** How many seconds until expiration. */ - int expiry; + /** How many seconds a term lasts (if not refreshed). */ + int term_duration; /** Network related timeouts. */ int timeout; /** Retries before giving up. */ int retries; /** If >0, time to wait for a site to get fenced. * The ticket may be acquired after that timespan by * another site. */ - int acquire_after; + int acquire_after; /* TODO: needed? */ +#if 0 +#endif + /* Program to ask whether it makes sense to * acquire the ticket */ char *ext_verifier; /** Node weights. */ int weight[MAX_NODES]; /** @} */ /** \name Runtime values. * @{ */ /** Current state. */ server_state_e state; /** When something has to be done */ struct timeval next_cron; /** Current leader. This is effectively the log[] in Raft. */ struct booth_site *leader; - /** Timestamp of leadership expiration. */ + /** Timestamp of leadership expiration */ time_t term_expires; + /** End of election period */ + time_t election_end; + struct booth_site *voted_for; - /** Last ballot number that was agreed on. */ + + /** Who the various sites vote for. + * NO_OWNER = no vote yet. */ + struct booth_site *votes_for[MAX_NODES]; + /* bitmap */ + uint64_t votes_received; + + /** Last voting round that was seen. */ uint32_t current_term; /** @} */ /** */ uint32_t commit_index; /** */ uint32_t last_applied; uint32_t next_index[MAX_NODES]; uint32_t match_index[MAX_NODES]; /** \name Needed while proposals are being done. * @{ */ /** Whom to vote for the next time. * Needed to push a ticket to someone else. */ - struct booth_site *vote_for; + #if 0 /** Bitmap of sites that acknowledge that state. */ uint64_t proposal_acknowledges; /** When an incompletely acknowledged proposal gets done. * If all peers agree, that happens sooner. * See switch_state_to(). */ struct timeval proposal_switch; /** Timestamp of proposal expiration. */ time_t proposal_expires; #endif /** Number of send retries left. * Used on the new owner. * Starts at 0, counts up. */ int retry_number; /** @} */ }; struct booth_config { char name[BOOTH_NAME_LEN]; transport_layer_t proto; uint16_t port; /** Stores the OR of the individual host bitmasks. */ uint64_t site_bits; char site_user[BOOTH_NAME_LEN]; char site_group[BOOTH_NAME_LEN]; char arb_user[BOOTH_NAME_LEN]; char arb_group[BOOTH_NAME_LEN]; uid_t uid; gid_t gid; int site_count; struct booth_site site[MAX_NODES]; int ticket_count; int ticket_allocated; struct ticket_config *ticket; }; extern struct booth_config *booth_conf; int read_config(const char *path); int check_config(int type); int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type); int find_site_by_id(uint32_t site_id, struct booth_site **node); const char *type_to_string(int type); #endif /* _CONFIG_H */ diff --git a/src/handler.c b/src/handler.c index 8670385..fe0435b 100644 --- a/src/handler.c +++ b/src/handler.c @@ -1,68 +1,68 @@ /* * Copyright (C) 2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include "ticket.h" #include "config.h" #include "inline-fn.h" #include "log.h" #include "pacemaker.h" #include "booth.h" #include "handler.h" /** Runs an external handler. * See eg. 'before-acquire-handler'. * TODO: timeout, async operation?. */ int run_handler(struct ticket_config *tk, const char *cmd, int synchronous) { int rv; char expires[16]; assert(synchronous); - sprintf(expires, "%" PRId64, tk->expires); + sprintf(expires, "%" PRId64, tk->term_expires); rv = setenv("BOOTH_TICKET", tk->name, 1) || setenv("BOOTH_LOCAL", local->addr_string, 1) || setenv("BOOTH_CONF_NAME", booth_conf->name, 1) || setenv("BOOTH_CONF_PATH", cl.configfile, 1) || setenv("BOOTH_TICKET_EXPIRES", expires, 1); if (rv) { log_error("Cannot set environment: %d", errno); } else { rv = system(cmd); if (rv) log_error("Error calling \"%s\": %s", cmd, interpret_rv(rv)); else log_info("Ran \"%s\" successfully.", cmd); } return rv; } diff --git a/src/inline-fn.h b/src/inline-fn.h index e443b10..f28020c 100644 --- a/src/inline-fn.h +++ b/src/inline-fn.h @@ -1,281 +1,288 @@ /* * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _INLINE_FN_H #define _INLINE_FN_H #include #include #include #include #include "config.h" +#include "ticket.h" #include "transport.h" inline static uint32_t get_local_id(void) { return local ? local->site_id : -1; } inline static uint32_t get_node_id(struct booth_site *node) { - return node ? node->site_id : NO_OWNER; + return node ? node->site_id : NO_ONE; } -inline static int ticket_valid_for(const struct ticket_config *tk) +inline static int term_valid_for(const struct ticket_config *tk) { int left; - left = tk->expires - time(NULL); + left = tk->term_expires - time(NULL); return (left < 0) ? 0 : left; } /** Returns number of seconds left, if any. */ -inline static int owner_and_valid(const struct ticket_config *tk) +inline static int leader_and_valid(const struct ticket_config *tk) { - if (tk->owner != local) + if (tk->leader != local) return 0; - return ticket_valid_for(tk); + return term_valid_for(tk); } + static inline void init_header_bare(struct boothc_header *h) { h->magic = htonl(BOOTHC_MAGIC); h->version = htonl(BOOTHC_VERSION); h->from = htonl(local->site_id); h->iv = htonl(0); h->auth1 = htonl(0); h->auth2 = htonl(0); } static inline void init_header(struct boothc_header *h, int cmd, int result, int data_len) { init_header_bare(h); h->length = htonl(data_len); h->cmd = htonl(cmd); h->result = htonl(result); } static inline void init_ticket_site_header(struct boothc_ticket_msg *msg, int cmd) { init_header(&msg->header, cmd, 0, sizeof(*msg)); } static inline void init_ticket_msg(struct boothc_ticket_msg *msg, int cmd, int rv, struct ticket_config *tk) { assert(sizeof(msg->ticket.id) == sizeof(tk->name)); init_header(&msg->header, cmd, rv, sizeof(*msg)); if (!tk) { memset(&msg->ticket, 0, sizeof(msg->ticket)); } else { memcpy(msg->ticket.id, tk->name, sizeof(msg->ticket.id)); - msg->ticket.expiry = htonl(ticket_valid_for(tk)); - msg->ticket.owner = htonl(get_node_id(tk->owner)); - msg->ticket.ballot = htonl(tk->new_ballot); - msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); + msg->ticket.leader = htonl(get_node_id(tk->leader ?: tk->voted_for)); + msg->ticket.term = htonl(tk->current_term); + msg->ticket.term_valid_for = htonl(term_valid_for(tk)); + + msg->ticket.prev_log_index = htonl(tk->last_applied); + msg->ticket.leader_commit = htonl(tk->commit_index); } } static inline struct booth_transport const *transport(void) { return booth_transport + booth_conf->proto; } -static inline const char *ticket_owner_string(struct booth_site *site) +static inline const char *site_string(struct booth_site *site) { return site ? site->addr_string : "NONE"; } -static inline void disown_ticket(struct ticket_config *tk) +static inline const char *ticket_leader_string(struct ticket_config *tk) { - /* ONLY the "current state" is changed; - * current paxos rounds should not be affected. - * tk->proposed_owner = NULL; - */ - tk->owner = NULL; - time(&tk->expires); + return site_string(tk->leader); } -static inline void disown_if_expired(struct ticket_config *tk) -{ - if (time(NULL) >= tk->expires || - (!tk->proposed_owner && !tk->owner)) - disown_ticket(tk); -} - -static inline int all_agree(struct ticket_config *tk) +static inline void disown_ticket(struct ticket_config *tk) { - return tk->proposal_acknowledges == booth_conf->site_bits; + tk->leader = NULL; + time(&tk->term_expires); } -static inline int majority_agree(struct ticket_config *tk) +static inline int disown_if_expired(struct ticket_config *tk) { - /* Use ">" to get majority decision, even for an even number - * of participants. */ - return __builtin_popcount(tk->proposal_acknowledges) * 2 > - booth_conf->site_count; -} + if (time(NULL) >= tk->term_expires || + !tk->leader) { + disown_ticket(tk); + return 1; + } + return 0; +} /* We allow half of the uint32_t to be used; * half of that below, half of that above the current known "good" value. * 0 UINT32_MAX * |--------------------------+----------------+------------| * | | | * |--------+-------| allowed range * | - * current ballot + * current commit index * * So, on overflow it looks like that: * UINT32_MAX 0 * |--------------------------+-----------||---+------------| * | | | * |--------+-------| allowed range * | - * current ballot + * current commit index * * This should be possible by using the same datatype and relying * on the under/overflow semantics. * * * Having 30 bits available, and assuming an expire time of - * one minute and a (high) ballot step of 64 == 2^6 (because + * one minute and a (high) commit index step of 64 == 2^6 (because * of weights), we get 2^24 minutes of range - which is ~750 * years. "Should be enough for everybody." */ -static inline int ballot_is_higher_than(uint32_t b_high, uint32_t b_low) +static inline int index_is_higher_than(uint32_t c_high, uint32_t c_low) { uint32_t diff; - if (b_high == b_low) + if (c_high == c_low) return 0; - diff = b_high - b_low; + diff = c_high - c_low; if (diff < UINT32_MAX/4) return 1; - diff = b_low - b_high; + diff = c_low - c_high; if (diff < UINT32_MAX/4) return 0; - assert(!"ballot out of range - invalid"); + assert(!"commit index out of range - invalid"); } -static inline uint32_t ballot_max2(uint32_t a, uint32_t b) +static inline uint32_t index_max2(uint32_t a, uint32_t b) { - return ballot_is_higher_than(a, b) ? a : b; + return index_is_higher_than(a, b) ? a : b; } -static inline uint32_t ballot_max3(uint32_t a, uint32_t b, uint32_t c) +static inline uint32_t index_max3(uint32_t a, uint32_t b, uint32_t c) { - return ballot_max2( ballot_max2(a, b), c); + return index_max2( index_max2(a, b), c); } static inline double timeval_to_float(struct timeval tv) { return tv.tv_sec + tv.tv_usec*(double)1.0e-6; } static inline int timeval_msec(struct timeval tv) { int m; m = tv.tv_usec / 1000; if (m >= 1000) m = 999; return m; } static inline int timeval_compare(struct timeval tv1, struct timeval tv2) { if (tv1.tv_sec < tv2.tv_sec) return -1; if (tv1.tv_sec > tv2.tv_sec) return +1; if (tv1.tv_usec < tv2.tv_usec) return -1; if (tv1.tv_usec > tv2.tv_usec) return +1; return 0; } static inline int timeval_in_past(struct timeval which) { struct timeval tv; gettimeofday(&tv, NULL); return timeval_compare(tv, which) > 0; } -static inline time_t next_renewal_starts_at(struct ticket_config *tk) +static inline time_t next_vote_starts_at(struct ticket_config *tk) { time_t half_exp, retries_needed; /* If not owner, don't renew. */ - if (tk->owner != local) + if (tk->leader != local) return 0; /* Try to renew at half of expiry time. */ - half_exp = tk->expires - tk->expiry/2; + half_exp = tk->term_expires - tk->term_duration/2; /* Also start renewal if we couldn't get * a few message retransmission in the alloted * expiry time. */ - retries_needed = tk->expires - tk->timeout * tk->retries/2; + retries_needed = tk->term_expires - tk->timeout * tk->retries/2; /* Return earlier timestamp. */ return half_exp < retries_needed ? half_exp : retries_needed; } static inline int should_start_renewal(struct ticket_config *tk) { time_t now, when; - when = next_renewal_starts_at(tk); + when = next_vote_starts_at(tk); if (!when) return 0; time(&now); return when <= now; } +static inline int send_heartbeat(struct ticket_config *tk) +{ + return ticket_broadcast(tk, OP_HEARTBEAT, RLT_SUCCESS); +} + +static inline struct booth_site *my_vote(struct ticket_config *tk) +{ + return tk->votes_for[ local->index ]; +} + + + #endif diff --git a/src/pacemaker.c b/src/pacemaker.c index c371744..27d42d2 100644 --- a/src/pacemaker.c +++ b/src/pacemaker.c @@ -1,326 +1,326 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include "log.h" #include "pacemaker.h" #include "inline-fn.h" enum atomic_ticket_supported { YES=0, NO, FILENOTFOUND, /* Ie. UNKNOWN */ UNKNOWN = FILENOTFOUND, }; /* http://thedailywtf.com/Articles/What_Is_Truth_0x3f_.aspx */ enum atomic_ticket_supported atomicity = UNKNOWN; #define COMMAND_MAX 1024 /** Determines whether the installed crm_ticket can do atomic ticket grants, * _including_ multiple attribute changes. * * See * https://bugzilla.novell.com/show_bug.cgi?id=855099 * * Run "crm_ticket" without "--force"; * - the old version asks for "Y/N" via STDIN, and returns 0 * when reading "no"; * - the new version just reports an error without asking. */ static void test_atomicity(void) { int rv; if (atomicity != UNKNOWN) return; rv = system("echo n | crm_ticket -g -t any-ticket-name > /dev/null 2> /dev/null"); if (rv == -1) { log_error("Cannot run \"crm_ticket\"!"); /* BIG problem. Abort. */ exit(1); } if (WIFSIGNALED(rv)) { log_error("\"crm_ticket\" terminated by a signal!"); /* Problem. Abort. */ exit(1); } switch (WEXITSTATUS(rv)) { case 0: atomicity = NO; log_info("Old \"crm_ticket\" found, using non-atomic ticket updates."); break; case 1: atomicity = YES; log_info("New \"crm_ticket\" found, using atomic ticket updates."); break; default: log_error("Unexpected return value from \"crm_ticket\" (%d), " "falling back to non-atomic ticket updates.", rv); atomicity = NO; } assert(atomicity == YES || atomicity == NO); } const char * interpret_rv(int rv) { static char text[64]; int p; if (rv == 0) return "0"; p = sprintf(text, "rv %d", WEXITSTATUS(rv)); if (WIFSIGNALED(rv)) sprintf(text + p, " signal %d", WTERMSIG(rv)); return text; } static int pcmk_write_ticket_atomic(struct ticket_config *tk, int grant) { char cmd[COMMAND_MAX]; int rv; - /* The values are appended to "-v", so that NO_OWNER + /* The values are appended to "-v", so that NO_ONE * (which is -1) isn't seen as another option. */ snprintf(cmd, COMMAND_MAX, "crm_ticket -t '%s' " "%s --force " "-S owner -v%" PRIi32 " " "-S expires -v%" PRIi64 " " - "-S ballot -v%" PRIi64, + "-S term -v%" PRIi64, tk->name, (grant > 0 ? "-g" : grant < 0 ? "-r" : ""), - (int32_t)get_node_id(tk->owner), - (int64_t)tk->expires, - (int64_t)tk->last_ack_ballot); + (int32_t)get_node_id(tk->leader), + (int64_t)tk->term_expires, + (int64_t)tk->current_term); rv = system(cmd); log_info("command: '%s' was executed", cmd); if (rv != 0) log_error("error: \"%s\" failed, %s", cmd, interpret_rv(rv)); return rv; } static int pcmk_store_ticket_nonatomic(struct ticket_config *tk); static int pcmk_grant_ticket(struct ticket_config *tk) { char cmd[COMMAND_MAX]; int rv; test_atomicity(); if (atomicity == YES) return pcmk_write_ticket_atomic(tk, +1); rv = pcmk_store_ticket_nonatomic(tk); if (rv) return rv; snprintf(cmd, COMMAND_MAX, "crm_ticket -t %s -g --force", tk->name); log_info("command: '%s' was executed", cmd); rv = system(cmd); if (rv != 0) log_error("error: \"%s\" failed, %s", cmd, interpret_rv(rv)); return rv; } static int pcmk_revoke_ticket(struct ticket_config *tk) { char cmd[COMMAND_MAX]; int rv; test_atomicity(); if (atomicity == YES) return pcmk_write_ticket_atomic(tk, -1); rv = pcmk_store_ticket_nonatomic(tk); if (rv) return rv; snprintf(cmd, COMMAND_MAX, "crm_ticket -t %s -r --force", tk->name); log_info("command: '%s' was executed", cmd); rv = system(cmd); if (rv != 0) log_error("error: \"%s\" failed, %s", cmd, interpret_rv(rv)); return rv; } static int crm_ticket_set(const struct ticket_config *tk, const char *attr, int64_t val) { char cmd[COMMAND_MAX]; int i, rv; snprintf(cmd, COMMAND_MAX, "crm_ticket -t '%s' -S '%s' -v %" PRIi64, tk->name, attr, val); /* If there are errors, there's not much we can do but retry ... */ for (i=0; i<3 && (rv = system(cmd)); i++) ; log_debug("'%s' gave result %s", cmd, interpret_rv(rv)); return rv; } static int pcmk_store_ticket_nonatomic(struct ticket_config *tk) { int rv; /* Always try to store *each* attribute, even if there's an error * for one of them. */ - rv = crm_ticket_set(tk, "owner", (int32_t)get_node_id(tk->owner)); - rv = crm_ticket_set(tk, "expires", tk->expires) || rv; - rv = crm_ticket_set(tk, "ballot", tk->last_ack_ballot) || rv; + rv = crm_ticket_set(tk, "owner", (int32_t)get_node_id(tk->leader)); + rv = crm_ticket_set(tk, "expires", tk->term_expires) || rv; + rv = crm_ticket_set(tk, "term", tk->current_term) || rv; if (rv) log_error("setting crm_ticket attributes failed; %s", interpret_rv(rv)); else log_info("setting crm_ticket attributes successful"); return rv; } static int crm_ticket_get(struct ticket_config *tk, const char *attr, int64_t *data) { char cmd[COMMAND_MAX]; char line[256]; int rv; int64_t v; FILE *p; *data = -1; v = 0; snprintf(cmd, COMMAND_MAX, "crm_ticket -t '%s' -G '%s' --quiet", tk->name, attr); p = popen(cmd, "r"); if (p == NULL) { rv = errno; log_error("popen error %d (%s) for \"%s\"", rv, strerror(rv), cmd); return rv || -EINVAL; } if (fgets(line, sizeof(line) - 1, p) == NULL) { rv = ENODATA; goto out; } rv = EINVAL; if (sscanf(line, "%" PRIi64, &v) == 1) rv = 0; *data = v; out: rv = pclose(p); log_debug("command \"%s\" returned %s, value %" PRIi64, cmd, interpret_rv(rv), v); return rv; } static int pcmk_load_ticket(struct ticket_config *tk) { int rv; int64_t v; /* This here gets run during startup; testing that here means that * normal operation won't be interrupted with that test. */ test_atomicity(); rv = crm_ticket_get(tk, "expires", &v); if (!rv) { - tk->expires = v; + tk->term_expires = v; } - rv = crm_ticket_get(tk, "ballot", &v); + rv = crm_ticket_get(tk, "term", &v); if (!rv) { - tk->new_ballot = - tk->last_ack_ballot = v; + tk->current_term = v; } rv = crm_ticket_get(tk, "owner", &v); if (!rv) { /* No check, node could have been deconfigured. */ - find_site_by_id(v, &tk->proposed_owner); + find_site_by_id(v, &tk->leader); } - disown_if_expired(tk); + if (disown_if_expired(tk)) + pcmk_revoke_ticket(tk); - tk->proposal_acknowledges = local->bitmask; +// tk->proposal_acknowledges = local->bitmask; /* We load only when the state is completely unknown. */ tk->state = ST_INIT; return rv; } struct ticket_handler pcmk_handler = { .grant_ticket = pcmk_grant_ticket, .revoke_ticket = pcmk_revoke_ticket, .load_ticket = pcmk_load_ticket, }; diff --git a/src/raft.c b/src/raft.c new file mode 100644 index 0000000..3d29e1a --- /dev/null +++ b/src/raft.c @@ -0,0 +1,290 @@ +/* + * Copyright (C) 2014 Philipp Marek + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include +#include +#include +#include +#include +#include "booth.h" +#include "transport.h" +#include "inline-fn.h" +#include "config.h" +#include "raft.h" +#include "ticket.h" +#include "log.h" + + + +inline static void clear_election(struct ticket_config *tk) +{ + int i; + struct booth_site *site; + + log_info("clear election"); + tk->votes_received = 0; + foreach_node(i, site) + tk->votes_for[site->index] = NULL; +} + + +inline static void site_voted_for(struct ticket_config *tk, + struct booth_site *who, + struct booth_site *vote) +{ + log_info("site \"%s\" votes for \"%s\"", + who->addr_string, + vote->addr_string); + + if (!tk->votes_for[who->index]) { + tk->votes_for[who->index] = vote; + tk->votes_received |= who->bitmask; + } else { + if (tk->votes_for[who->index] != vote) + log_error("voted previously (but in same term!) for \"%s\"...", + tk->votes_for[who->index]->addr_string); + } +} + + +static struct booth_site *majority_votes(struct ticket_config *tk) +{ + int i, n; + struct booth_site *v; + int count[MAX_NODES] = { 0, }; + + + for(i=0; isite_count; i++) { + v = tk->votes_for[i]; + if (!v) + continue; + + n = v->index; + count[n]++; + log_info("Majority: %d \"%s\" wants %d \"%s\" => %d", + i, booth_conf->site[i].addr_string, + n, v->addr_string, + count[n]); + + if (count[n]*2 <= booth_conf->site_count) + continue; + + + log_info("Majority reached: %d of %d for \"%s\"", + count[n], booth_conf->site_count, + v->addr_string); + return v; + } + + return NULL; +} + +static int answer_HEARTBEAT ( + struct ticket_config *tk, + struct booth_site *sender, + struct booth_site *leader, + struct boothc_ticket_msg *msg + ) +{ + uint32_t term; + uint32_t index; + + term = ntohl(msg->ticket.term); + log_debug("leader: %s, have %s; term %d vs %d", + site_string(leader), ticket_leader_string(tk), + term, tk->current_term); + if (term < tk->current_term) + return 0; //send_reject(sender, tk, RLT_TERM_OUTDATED); + + /* § 5.3 */ + index = ntohl(msg->ticket.leader_commit); + if (index > tk->commit_index) + tk->commit_index = index; + + assert(tk->leader == leader); + + + return 0; +} + + +static int process_VOTE_FOR( + struct ticket_config *tk, + struct booth_site *sender, + struct booth_site *leader, + struct boothc_ticket_msg *msg + ) +{ + uint32_t term; + struct booth_site *new_leader; + + + term = ntohl(msg->ticket.term); + if (term < tk->current_term) + return send_reject(sender, tk, RLT_TERM_OUTDATED); + + if (term > tk->current_term) + clear_election(tk); + + site_voted_for(tk, sender, leader); + + + /* §5.2 */ + new_leader = majority_votes(tk); + if (new_leader) { + tk->leader = new_leader; + + if ( new_leader == local) { + tk->current_term++; + tk->state = ST_LEADER; + send_heartbeat(tk); + tk->voted_for = NULL; + } + else + tk->state = ST_FOLLOWER; + + } + + set_ticket_wakeup(tk); + return 0; +} + + +static int process_REJECTED( + struct ticket_config *tk, + struct booth_site *sender, + struct booth_site *leader, + struct boothc_ticket_msg *msg + ) +{ + return 0; +} + + +/* §5.2 */ +static int answer_REQ_VOTE( + struct ticket_config *tk, + struct booth_site *sender, + struct booth_site *leader, + struct boothc_ticket_msg *msg + ) +{ + uint32_t term; + struct boothc_ticket_msg omsg; + + + term = ntohl(msg->ticket.term); + + /* §5.1 */ + if (term < tk->current_term) + return send_reject(sender, tk, RLT_TERM_OUTDATED); + + /* §5.2, §5.4 */ + if (!tk->voted_for && + ntohl(msg->ticket.last_log_index) >= tk->last_applied) { + tk->voted_for = sender; + site_voted_for(tk, sender, leader); + goto yes_you_can; + } + + +yes_you_can: + init_ticket_msg(&omsg, OP_VOTE_FOR, RLT_SUCCESS, tk); + omsg.ticket.leader = htonl(get_node_id(tk->voted_for)); + + return transport()->broadcast(&omsg, sizeof(omsg)); +} + + +int new_election(struct ticket_config *tk, struct booth_site *preference) +{ + struct booth_site *new_leader; + time_t now; + + + time(&now); + log_debug("start new election?, now=%" PRIi64 ", end %" PRIi64, + now, tk->election_end); + if (now <= tk->election_end) + return 0; + + + /* §5.2 */ + tk->current_term++; + tk->election_end = now + tk->term_duration; + + log_debug("start new election! term=%d, until %" PRIi64, + tk->current_term, tk->election_end); + clear_election(tk); + + if(preference) + new_leader = preference; + else + new_leader = (local->type == SITE) ? local : NULL; + site_voted_for(tk, local, new_leader); + tk->voted_for = new_leader; + + tk->state = ST_CANDIDATE; + + ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS); + return 0; +} + + +int raft_answer( + struct ticket_config *tk, + struct booth_site *from, + struct booth_site *leader, + struct boothc_ticket_msg *msg + ) +{ + int cmd; + uint32_t term; + + cmd = ntohl(msg->header.cmd); + term = ntohl(msg->ticket.term); + + log_debug("got message %s from \"%s\", term %d vs. %d", + state_to_string(cmd), + from->addr_string, + term, tk->current_term); + + /* §5.1 */ + if (term > tk->current_term) { + tk->state = ST_FOLLOWER; + tk->current_term = term; + tk->leader = leader; + log_info("higher term %d vs. %d, following \"%s\"", + term, tk->current_term, + ticket_leader_string(tk)); + } + + + switch (cmd) { + case OP_REQ_VOTE: + return answer_REQ_VOTE (tk, from, leader, msg); + case OP_VOTE_FOR: + return process_VOTE_FOR(tk, from, leader, msg); + case OP_HEARTBEAT: + return answer_HEARTBEAT(tk, from, leader, msg); + case OP_REJECTED: + return process_REJECTED(tk, from, leader, msg); + } + log_error("unprocessed message, cmd %x", cmd); + return -EINVAL; +} diff --git a/src/raft.h b/src/raft.h new file mode 100644 index 0000000..3f03196 --- /dev/null +++ b/src/raft.h @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2014 Philipp Marek + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#ifndef _RAFT_H +#define _RAFT_H + +#include "booth.h" + + +typedef enum { + ST_INIT = CHAR2CONST('I', 'n', 'i', 't'), + ST_FOLLOWER = CHAR2CONST('F', 'l', 'l', 'w'), + ST_CANDIDATE = CHAR2CONST('C', 'n', 'd', 'i'), + ST_LEADER = CHAR2CONST('L', 'e', 'a', 'd'), +} server_state_e; + + +struct ticket_config; + +int raft_answer(struct ticket_config *tk, + struct booth_site *from, + struct booth_site *leader, + struct boothc_ticket_msg *msg); + +int new_election(struct ticket_config *tk, struct booth_site *new_leader); +int start_election(struct ticket_config *tk, struct booth_site *new_leader); + + +#endif /* _RAFT_H */ diff --git a/src/ticket.c b/src/ticket.c index 1ff01aa..16aaf72 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -1,799 +1,818 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include "ticket.h" #include "config.h" #include "pacemaker.h" #include "inline-fn.h" #include "log.h" #include "booth.h" #include "raft.h" #include "handler.h" #define TK_LINE 256 /* Untrusted input, must fit (incl. \0) in a buffer of max chars. */ int check_max_len_valid(const char *s, int max) { int i; for(i=0; iticket_count; i++) { if (!strcmp(booth_conf->ticket[i].name, ticket)) { if (found) *found = booth_conf->ticket + i; return 1; } } return 0; } int check_ticket(char *ticket, struct ticket_config **found) { if (found) *found = NULL; if (!booth_conf) return 0; if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name))) return 0; return find_ticket_by_name(ticket, found); } int check_site(char *site, int *is_local) { struct booth_site *node; if (!check_max_len_valid(site, sizeof(node->addr_string))) return 0; if (find_site_by_name(site, &node, 0)) { *is_local = node->local; return 1; } return 0; } +#if 0 /** Find out what others think about this ticket. * * If we're a SITE, we can ask (and have to tell) Pacemaker. * An ARBITRATOR can only ask others. */ static int ticket_send_catchup(struct ticket_config *tk) { int i, rv = 0; struct booth_site *site; struct boothc_ticket_msg msg; foreach_node(i, site) { if (!site->local) { init_ticket_msg(&msg, CMD_CATCHUP, RLT_SUCCESS, tk); log_debug("attempting catchup from %s", site->addr_string); rv = booth_udp_send(site, &msg, sizeof(msg)); } } ticket_activate_timeout(tk); return rv; } +#endif int ticket_write(struct ticket_config *tk) { if (local->type != SITE) return -EINVAL; disown_if_expired(tk); - if (tk->owner == local) { + if (tk->leader == local) { pcmk_handler.grant_ticket(tk); } else { pcmk_handler.revoke_ticket(tk); } return 0; } /* Ask an external program whether getting the ticket * makes sense. * Eg. if the services have a failcount of INFINITY, * we can't serve here anyway. */ int get_ticket_locally_if_allowed(struct ticket_config *tk) { int rv; if (!tk->ext_verifier) goto get_it; rv = run_handler(tk, tk->ext_verifier, 1); if (rv) { log_error("May not acquire ticket."); /* Give it to somebody else. * Just send a commit message, as the * others couldn't help anyway. */ - if (owner_and_valid(tk)) { + if (leader_and_valid(tk)) { disown_ticket(tk); +#if 0 tk->proposed_owner = NULL; /* Just go one further - others may easily override. */ tk->new_ballot++; ticket_broadcast_proposed_state(tk, OP_COMMITTED); tk->state = ST_STABLE; +#endif + ticket_broadcast(tk, OP_VOTE_FOR, RLT_SUCCESS); } return rv; } else { log_info("May keep ticket."); } get_it: + if (leader_and_valid(tk)) { + return send_heartbeat(tk); + } + else { + new_election(tk, local); + return ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS); + } +#if 0 return paxos_start_round(tk, local); +#endif } /** Try to get the ticket for the local site. * */ int do_grant_ticket(struct ticket_config *tk) { int rv; - if (tk->owner == local) + if (tk->leader == local) return RLT_SUCCESS; - if (tk->owner) + if (tk->leader) return RLT_OVERGRANT; rv = get_ticket_locally_if_allowed(tk); return rv; } /** Start a PAXOS round for revoking. * That can be started from any site. */ int do_revoke_ticket(struct ticket_config *tk) { int rv; - if (!tk->owner) + if (!tk->leader) return RLT_SUCCESS; + disown_ticket(tk); + ticket_write(tk); + return ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS); +#if 0 rv = paxos_start_round(tk, NULL); +#endif return rv; } int list_ticket(char **pdata, unsigned int *len) { struct ticket_config *tk; char timeout_str[64]; char *data, *cp; int i, alloc; *pdata = NULL; *len = 0; alloc = 256 + booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128); data = malloc(alloc); if (!data) return -ENOMEM; cp = data; foreach_ticket(i, tk) { - if (tk->expires != 0) + if (tk->term_expires != 0) strftime(timeout_str, sizeof(timeout_str), "%F %T", - localtime(&tk->expires)); + localtime(&tk->term_expires)); else strcpy(timeout_str, "INF"); cp += sprintf(cp, - "ticket: %s, owner: %s, expires: %s, ballot: %d\n", + "ticket: %s, leader: %s, expires: %s, commit: %d\n", tk->name, - tk->owner ? tk->owner->addr_string : "None", + ticket_leader_string(tk), timeout_str, - tk->last_ack_ballot); + tk->commit_index); *len = cp - data; assert(*len < alloc); } *pdata = data; return 0; } int setup_ticket(void) { struct ticket_config *tk; int i; /* TODO */ foreach_ticket(i, tk) { - tk->owner = NULL; - tk->expires = 0; + tk->leader = NULL; + tk->term_expires = 0; - abort_proposal(tk); +// abort_proposal(tk); - if (local->role & PROPOSER) { + if (local->type == SITE) { pcmk_handler.load_ticket(tk); } } return 0; } int ticket_answer_list(int fd, struct boothc_ticket_msg *msg) { char *data; int olen, rv; struct boothc_header hdr; rv = list_ticket(&data, &olen); if (rv < 0) return rv; init_header(&hdr, CMR_LIST, RLT_SUCCESS, sizeof(hdr) + olen); return send_header_plus(fd, &hdr, data, olen); } int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { log_error("Client asked to grant unknown ticket"); rv = RLT_INVALID_ARG; goto reply; } - if (tk->owner) { + if (tk->leader) { log_error("client wants to get an (already granted!) ticket \"%s\"", msg->ticket.id); rv = RLT_OVERGRANT; goto reply; } rv = do_grant_ticket(tk); reply: init_header(&msg->header, CMR_GRANT, rv ?: RLT_ASYNC, sizeof(*msg)); return send_ticket_msg(fd, msg); } int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { log_error("Client asked to grant unknown ticket"); rv = RLT_INVALID_ARG; goto reply; } - if (!tk->owner) { + if (!tk->leader) { log_info("client wants to revoke a free ticket \"%s\"", msg->ticket.id); /* Return a different result code? */ rv = RLT_SUCCESS; goto reply; } rv = do_revoke_ticket(tk); if (rv == 0) rv = RLT_ASYNC; reply: init_ticket_msg(msg, CMR_REVOKE, rv, tk); return send_ticket_msg(fd, msg); } +#if 0 /** Got a CMD_CATCHUP query. * In this file because it's mostly used during startup. */ static int ticket_answer_catchup( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { int rv; log_debug("got CATCHUP query for \"%s\" from %s", msg->ticket.id, from->addr_string); /* We do _always_ answer. * In case all booth daemons are restarted at the same time, nobody * would answer any questions, leading to timeouts and delays. * Just admit we don't know. */ rv = (tk->state == ST_INIT) ? RLT_PROBABLY_SUCCESS : RLT_SUCCESS; init_ticket_msg(msg, CMR_CATCHUP, rv, tk); /* On catchup, don't tell about ongoing proposals; * if we did, the other site might believe that the * ballot numbers have already been used. * Send the known ballot number, so that a PREPARE * gets accepted. */ msg->ticket.ballot = msg->ticket.prev_ballot; return booth_udp_send(from, msg, sizeof(*msg)); } /** Got a CMR_CATCHUP message. * Gets handled here because it's not PAXOS per se, * but only needed during startup. */ static int ticket_process_catchup( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { int rv; uint32_t prev_ballot; time_t peer_expiry; log_info("got CATCHUP answer for \"%s\" from %s; says owner %s with ballot %d", tk->name, from->addr_string, - ticket_owner_string(new_owner), ballot); + ticket_leader_string(new_owner), ballot); prev_ballot = ntohl(msg->ticket.prev_ballot); rv = ntohl(msg->header.result); if (rv != RLT_SUCCESS && rv != RLT_PROBABLY_SUCCESS) { log_error("dropped because of wrong rv: 0x%x", rv); return -EINVAL; } if (ballot == tk->new_ballot && ballot == tk->last_ack_ballot && new_owner == tk->owner) { /* Peer says the same thing we're believing. */ tk->proposal_acknowledges |= from->bitmask | local->bitmask; tk->expires = ntohl(msg->ticket.expiry) + time(NULL); if (should_switch_state_p(tk)) { if (tk->state == ST_INIT) tk->state = ST_STABLE; } disown_if_expired(tk); log_debug("catchup: peer ack 0x%" PRIx64 ", now state '%s'", tk->proposal_acknowledges, state_to_string(tk->state)); goto ex; } if (ticket_valid_for(tk) == 0 && !tk->owner) { /* We see the ticket as expired, and therefore don't know an owner. * So believe some other host. */ tk->state = ST_STABLE; log_debug("catchup: no owner locally, believe peer."); goto accept; } if (ballot >= tk->new_ballot && ballot >= tk->last_ack_ballot && rv == RLT_SUCCESS) { /* Peers seems to know better, but as yet we only have _her_ * word for that. */ log_debug("catchup: peer has higher ballot: %d >= %d/%d", ballot, tk->new_ballot, tk->last_ack_ballot); accept: peer_expiry = ntohl(msg->ticket.expiry) + time(NULL); tk->expires = (tk->expires > peer_expiry) ? tk->expires : peer_expiry; tk->new_ballot = ballot_max2(ballot, tk->new_ballot); tk->last_ack_ballot = ballot_max2(prev_ballot, tk->last_ack_ballot); tk->owner = new_owner; tk->proposal_acknowledges = from->bitmask; /* We stay in ST_INIT and wait for confirmation. */ goto ex; } if (ballot >= tk->last_ack_ballot && rv == RLT_PROBABLY_SUCCESS && tk->state == ST_INIT && tk->retry_number > 3) { /* Peer seems to know better than us, and there's no * convincing other report. Just take it. */ tk->state = ST_STABLE; log_debug("catchup: exceeded retries, peer has higher ballot."); goto accept; } if (ballot < tk->new_ballot || ballot < tk->last_ack_ballot) { /* Peer seems outdated ... tell it to reload? */ log_debug("catchup: peer outdated?"); #if 0 init_ticket_msg(&msg, CMD_DO_CATCHUP, RLT_SUCCESS, tk, &tk->current_state); #endif goto ex; } if (ballot >= tk->last_ack_ballot && local->type == SITE && new_owner == tk->owner) { /* We've got some information (local Pacemaker?), and a peer * says same owner, with same or higher ballot number. */ log_debug("catchup: peer agrees about owner."); goto ex; } log_debug("catchup: unhandled situation!"); ex: ticket_write(tk); if (tk->state == ST_STABLE) { /* If we believe to have enough information, we can try to * acquire the ticket (again). */ time(&tk->expires); } /* Allow further actions. */ ticket_activate_timeout(tk); return 0; } +#endif + + +int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_result_t res) +{ + struct boothc_ticket_msg msg; + + init_ticket_msg(&msg, cmd, res, tk); + log_debug("broadcasting '%s' for ticket \"%s\"", + state_to_string(cmd), tk->name); + + return transport()->broadcast(&msg, sizeof(msg)); +} +#if 0 /** Send new state request to all sites. * Perhaps this should take a flag for ACCEPTOR etc.? * No need currently, as all nodes are more or less identical. */ int ticket_broadcast_proposed_state(struct ticket_config *tk, cmd_request_t state) { struct boothc_ticket_msg msg; - if (state != tk->state) { - tk->proposal_acknowledges = local->bitmask; - tk->retry_number = 0; - } - tk->state = state; init_ticket_msg(&msg, state, RLT_SUCCESS, tk); - msg.ticket.owner = htonl(get_node_id(tk->proposed_owner)); + msg.ticket.leader = htonl(get_node_id(tk->proposed_owner)); log_debug("broadcasting '%s' for ticket \"%s\"", state_to_string(state), tk->name); /* Switch state after one second, if the majority says ok. */ gettimeofday(&tk->proposal_switch, NULL); tk->proposal_switch.tv_sec++; return transport()->broadcast(&msg, sizeof(msg)); } +#endif static void ticket_cron(struct ticket_config *tk) { time_t now; now = time(NULL); /* Has an owner, has an expiry date, and expiry date in the past? * Losing the ticket must happen in _every_ state. */ - if (tk->expires && - tk->owner && - now > tk->expires) { + if (tk->term_expires && + tk->leader && + now > tk->term_expires) { log_info("LOST ticket: \"%s\" no longer at %s", tk->name, - ticket_owner_string(tk->owner)); + ticket_leader_string(tk)); /* Couldn't renew in time - ticket lost. */ - tk->owner = NULL; disown_ticket(tk); - /* This gets us into ST_INIT again; we couldn't - * talk to a majority of sites, so we don't know - * whether somebody else has the ticket now. - * Keep asking until we know. */ - abort_proposal(tk); + + /* New vote round; §5.2 */ + if (local->type == SITE) + new_election(tk, NULL); +/* should be "always" that way + else + tk->state = ST_FOLLOWER; + */ +// abort_proposal(tk); TODO ticket_write(tk); ticket_activate_timeout(tk); /* May not try to re-acquire now, need to find out * what others think. */ return; } switch(tk->state) { case ST_INIT: /* Unknown state, ask others. */ - ticket_send_catchup(tk); +// ticket_send_catchup(tk); return; - case OP_COMMITTED: - case ST_STABLE: - - /* No matter whether the ticket just got lost by someone, - * or whether is wasn't active anywhere - if automatic - * acquiration is configured, try to get it active. - * Condition: - * - no owner, - * - no active proposal, - * - acquire_after has passed, - * - could activate locally. - * Now the sites can try to trump each other. */ - if (!tk->owner && - !tk->proposed_owner && - !tk->proposer && - tk->expires && - tk->acquire_after && - tk->expires + tk->acquire_after >= now && - local->type == SITE) { - if (!get_ticket_locally_if_allowed(tk)) - log_info("ACQUIRE ticket \"%s\" after timeout; ac=%d", tk->name, tk->acquire_after); - break; - } - - - /* Are we the current owner, and do we need to refresh? - * This is not the same as above. */ - if (should_start_renewal(tk)) { - if (!get_ticket_locally_if_allowed(tk)) - log_info("RENEW ticket \"%s\"", tk->name); - - /* TODO: remember when we started, and restart afresh after some retries */ - } + case ST_FOLLOWER: - break; - - case OP_PREPARING: - PREPARE_to_PROPOSE(tk); - break; - - case OP_PROPOSING: - PROPOSE_to_COMMIT(tk); - break; + case ST_CANDIDATE: + /* §5.2 */ + if (now > tk->election_end) + new_election(tk, NULL); + return; - case OP_PROMISING: - case OP_ACCEPTING: - case OP_RECOVERY: - case OP_REJECTED: - break; + case ST_LEADER: + tk->term_expires = now + tk->term_duration; + ticket_broadcast(tk, OP_HEARTBEAT, RLT_SUCCESS); default: break; } } void process_tickets(void) { struct ticket_config *tk; int i; struct timeval now; float sec_until; gettimeofday(&now, NULL); foreach_ticket(i, tk) { sec_until = timeval_to_float(tk->next_cron) - timeval_to_float(now); if (0) log_debug("ticket %s next cron %" PRIx64 ".%03d, " "now %" PRIx64 "%03d, in %f", tk->name, (uint64_t)tk->next_cron.tv_sec, timeval_msec(tk->next_cron), (uint64_t)now.tv_sec, timeval_msec(now), sec_until); if (sec_until > 0.0) continue; log_debug("ticket cron: doing %s", tk->name); /* Set next value, handler may override. * This should already be handled via the state logic; * but to be on the safe side the renew repetition is * duplicated here, too. */ set_ticket_wakeup(tk); ticket_cron(tk); } } void tickets_log_info(void) { struct ticket_config *tk; int i; foreach_ticket(i, tk) { log_info("Ticket %s: state '%s' " - "mask %" PRIx64 "/%" PRIx64 " " - "ballot %d (current %d) " + "commit index %d " + "leader \"%s\" " "expires %-24.24s", tk->name, state_to_string(tk->state), - tk->proposal_acknowledges, - booth_conf->site_bits, - tk->last_ack_ballot, tk->new_ballot, - ctime(&tk->expires)); + tk->commit_index, + ticket_leader_string(tk), + ctime(&tk->term_expires)); } } /* UDP message receiver. */ int message_recv(struct boothc_ticket_msg *msg, int msglen) { - int cmd, rv; + int rv; uint32_t from; - struct booth_site *dest; + struct booth_site *source; struct ticket_config *tk; - struct booth_site *new_owner_p; - uint32_t ballot, new_owner; + struct booth_site *leader; + uint32_t leader_u; if (check_boothc_header(&msg->header, sizeof(*msg)) < 0 || msglen != sizeof(*msg)) { log_error("message receive error"); return -1; } from = ntohl(msg->header.from); - if (!find_site_by_id(from, &dest) || !dest) { + if (!find_site_by_id(from, &source) || !source) { log_error("unknown sender: %08x", from); return -1; } if (!check_ticket(msg->ticket.id, &tk)) { log_error("got invalid ticket name \"%s\" from %s", - msg->ticket.id, dest->addr_string); + msg->ticket.id, source->addr_string); return -EINVAL; } - cmd = ntohl(msg->header.cmd); - ballot = ntohl(msg->ticket.ballot); - - new_owner = ntohl(msg->ticket.owner); - if (!find_site_by_id(new_owner, &new_owner_p)) { - log_error("Message with unknown owner %x received", new_owner); + leader_u = ntohl(msg->ticket.leader); + if (!find_site_by_id(leader_u, &leader)) { + log_error("Message with unknown owner %x received", leader_u); return -EINVAL; } + rv = raft_answer(tk, source, leader, msg); +#if 0 + cmd = ntohl(msg->header.cmd); switch (cmd) { case CMD_CATCHUP: - return ticket_answer_catchup(tk, dest, msg, ballot, new_owner_p); + return ticket_answer_catchup(tk, source, msg, ballot, new_owner_p); case CMR_CATCHUP: - return ticket_process_catchup(tk, dest, msg, ballot, new_owner_p); + return ticket_process_catchup(tk, source, msg, ballot, new_owner_p); default: /* only used in catchup, and not even really there ?? */ assert(ntohl(msg->header.result) == 0); - - rv = paxos_answer(tk, dest, msg, ballot, new_owner_p); - assert((tk->proposal_acknowledges & ~booth_conf->site_bits) == 0); + rv = raft_answer(tk, source, msg); +// TODO assert((tk->proposal_acknowledges & ~booth_conf->site_bits) == 0); return rv; } - return 0; +#endif + return rv; } void set_ticket_wakeup(struct ticket_config *tk) { struct timeval tv, now; - if (tk->owner == local) { + /* At least every hour, perhaps sooner. */ + ticket_next_cron_in(tk, 3600); + + switch (tk->state) { + case ST_LEADER: + assert(tk->leader == local); gettimeofday(&now, NULL); tv = now; - tv.tv_sec = next_renewal_starts_at(tk); + tv.tv_sec = next_vote_starts_at(tk); /* If timestamp is in the past, look again in one second. */ if (timeval_compare(tv, now) <= 0) tv.tv_sec = now.tv_sec + 1; ticket_next_cron_at(tk, tv); - } else { + break; + + case ST_CANDIDATE: + assert(tk->election_end); + ticket_next_cron_at_coarse(tk, tk->election_end); + break; + + case ST_FOLLOWER: /* If there is (or should be) some owner, check on her later on. * If no one is interested - don't care. */ - if ((tk->owner || tk->acquire_after) && + if ((tk->leader || tk->acquire_after) && (local->type == SITE)) - ticket_next_cron_in(tk, tk->expiry + tk->acquire_after); - else - ticket_next_cron_in(tk, 3600); + ticket_next_cron_at_coarse(tk, + tk->term_expires + tk->acquire_after); + break; + + default: + log_error("why here?"); } } /* Given a state (in host byte order), return a human-readable (char*). * An array is used so that multiple states can be printed in a single printf(). */ char *state_to_string(uint32_t state_ho) { union mu { cmd_request_t s; char c[5]; }; static union mu cache[6] = { { 0 } }, *cur; static int current = 0; current ++; if (current >= sizeof(cache)/sizeof(cache[0])) current = 0; cur = cache + current; cur->s = htonl(state_ho); /* Shouldn't be necessary, union array is initialized with zeroes, and * these bytes never get written. */ cur->c[4] = 0; return cur->c; } + + +int send_reject(struct booth_site *dest, struct ticket_config *tk, cmd_result_t code) +{ + struct boothc_ticket_msg msg; + + + init_ticket_msg(&msg, OP_REJECTED, code, tk); + return booth_udp_send(dest, &msg, sizeof(msg)); +} diff --git a/src/ticket.h b/src/ticket.h index 7f535bd..e0af8b2 100644 --- a/src/ticket.h +++ b/src/ticket.h @@ -1,96 +1,104 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _TICKET_H #define _TICKET_H #include #include #include #include "config.h" #define DEFAULT_TICKET_EXPIRY 600 #define DEFAULT_TICKET_TIMEOUT 10 #define DEFAULT_RETRIES 10 #define foreach_ticket(i_,t_) for(i=0; (t_=booth_conf->ticket+i, iticket_count); i++) #define foreach_node(i_,n_) for(i=0; (n_=booth_conf->site+i, isite_count); i++) int check_ticket(char *ticket, struct ticket_config **tc); int check_site(char *site, int *local); int do_grant_ticket(struct ticket_config *ticket); int revoke_ticket(struct ticket_config *ticket); int list_ticket(char **pdata, unsigned int *len); int message_recv(struct boothc_ticket_msg *msg, int msglen); int setup_ticket(void); int check_max_len_valid(const char *s, int max); int do_grant_ticket(struct ticket_config *tk); int do_revoke_ticket(struct ticket_config *tk); int find_ticket_by_name(const char *ticket, struct ticket_config **found); void set_ticket_wakeup(struct ticket_config *tk); int get_ticket_locally_if_allowed(struct ticket_config *tk); int ticket_answer_list(int fd, struct boothc_ticket_msg *msg); int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg); int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg); int ticket_broadcast_proposed_state(struct ticket_config *tk, cmd_request_t state); int ticket_write(struct ticket_config *tk); void process_tickets(void); void tickets_log_info(void); char *state_to_string(uint32_t state_ho); +int send_reject(struct booth_site *dest, struct ticket_config *tk, cmd_result_t code); +int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_result_t res); static inline void ticket_next_cron_at(struct ticket_config *tk, struct timeval when) { tk->next_cron = when; } +static inline void ticket_next_cron_at_coarse(struct ticket_config *tk, time_t when) +{ + tk->next_cron.tv_sec = when; + tk->next_cron.tv_usec = 0; +} + static inline void ticket_next_cron_in(struct ticket_config *tk, float seconds) { struct timeval tv; gettimeofday(&tv, NULL); tv.tv_sec += trunc(seconds); tv.tv_usec += (seconds - trunc(seconds)) * 1e6; ticket_next_cron_at(tk, tv); } static inline void ticket_activate_timeout(struct ticket_config *tk) { /* TODO: increase timeout when no answers */ ticket_next_cron_in(tk, tk->timeout); tk->retry_number ++; } #endif /* _TICKET_H */