diff --git a/src/booth.h b/src/booth.h index 95eec6f..336a8a5 100644 --- a/src/booth.h +++ b/src/booth.h @@ -1,244 +1,244 @@ /* * Copyright (C) 2011 Jiaju Zhang - * Copyright (C) 2013 Philipp Marek + * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _BOOTH_H #define _BOOTH_H #include #include #include #include #include #define BOOTH_RUN_DIR "/var/run/booth/" #define BOOTH_LOG_DIR "/var/log" #define BOOTH_LOGFILE_NAME "booth.log" #define BOOTH_DEFAULT_CONF_DIR "/etc/booth/" #define BOOTH_DEFAULT_CONF_NAME "booth" #define BOOTH_DEFAULT_CONF_EXT ".conf" #define BOOTH_DEFAULT_CONF \ BOOTH_DEFAULT_CONF_DIR BOOTH_DEFAULT_CONF_NAME BOOTH_DEFAULT_CONF_EXT #define DAEMON_NAME "boothd" #define BOOTH_PATH_LEN 127 #define BOOTH_DEFAULT_PORT 9929 /* TODO: remove */ #define BOOTH_PROTO_FAMILY AF_INET #define BOOTHC_MAGIC 0x5F1BA08C #define BOOTHC_VERSION 0x00010002 /** Timeout value for poll(). * Determines frequency of periodic jobs, eg. when send-retries are done. * See process_tickets(). */ #define POLL_TIMEOUT 1000 /** @{ */ /** The on-network data structures and constants. */ #define BOOTH_NAME_LEN 64 #define NO_OWNER (-1) typedef unsigned char boothc_site [BOOTH_NAME_LEN]; typedef unsigned char boothc_ticket[BOOTH_NAME_LEN]; struct boothc_header { /** BOOTHC_MAGIC */ uint32_t magic; /** BOOTHC_VERSION */ uint32_t version; /** Packet source; site_id. See add_site(). */ uint32_t from; /** Length including header */ uint32_t length; /** The command respectively protocol state. See cmd_request_t. */ uint32_t cmd; /** Result of operation. 0 == OK */ uint32_t result; char data[0]; } __attribute__((packed)); struct ticket_msg { /** Ticket name. */ boothc_ticket id; /** Owner. May be NO_OWNER. See add_site(). */ uint32_t owner; /** Current ballot number. Might be < prev_ballot if overflown. */ uint32_t ballot; /** Previous ballot. */ uint32_t prev_ballot; /* Would we want to say _whose_ proposal is more important * when sending OP_REJECTED ? */ /** Seconds until expiration. */ uint32_t expiry; } __attribute__((packed)); struct boothc_ticket_msg { struct boothc_header header; struct ticket_msg ticket; } __attribute__((packed)); /** State and message IDs. * * These numbers are unlikely to conflict with other enums. * All have to be swabbed to network order before sending. * * \dot * digraph states { * node [shape=box]; * ST_INIT [label="ST_INIT"]; * * subgraph messages { // messages * rank=same; * node [shape=point, rank=same]; * edge [style=tapered, penwidth=3, arrowtail=none, arrowhead=none, dir=forward]; * * ST_INIT:e -> ST_INITs [label="sends out CMD_CATCHUP"]; * } * * ST_INIT -> ST_STABLE [label="recv CMR_CATCHUP"]; * ST_STABLE; * * ST_STABLE -> OP_PROPOSING [label="booth call to assign ticket"]; * } * \enddot * * */ #define CHAR2CONST(a,b,c,d) ((a << 24) | (b << 16) | (c << 8) | d) #define STG2CONST(X) ({ const char _ggg[4] = X; return (uint32_t*)_ggg; }) typedef enum { /* 0x43 = "C"ommands */ CMD_LIST = CHAR2CONST('C', 'L', 's', 't'), CMD_GRANT = CHAR2CONST('C', 'G', 'n', 't'), CMD_REVOKE = CHAR2CONST('C', 'R', 'v', 'k'), CMD_CATCHUP = CHAR2CONST('C', 'C', 't', 'p'), /* Replies */ CMR_GENERAL = CHAR2CONST('G', 'n', 'l', 'R'), // Increase distance to CMR_GRANT CMR_LIST = CHAR2CONST('R', 'L', 's', 't'), CMR_GRANT = CHAR2CONST('R', 'G', 'n', 't'), CMR_REVOKE = CHAR2CONST('R', 'R', 'v', 'k'), CMR_CATCHUP = CHAR2CONST('R', 'C', 't', 'p'), /* Paxos */ OP_PREPARING = CHAR2CONST('P', 'r', 'e', 'p'), OP_PROMISING = CHAR2CONST('P', 'r', 'o', 'm'), OP_PROPOSING = CHAR2CONST('P', 'r', 'o', 'p'), OP_ACCEPTING = CHAR2CONST('A', 'c', 'p', 't'), OP_RECOVERY = CHAR2CONST('R', 'c', 'v', 'y'), OP_COMMITTED = CHAR2CONST('C', 'm', 'm', 't'), OP_REJECTED = CHAR2CONST('R', 'J', 'C', '!'), /* These are not used over the wire */ ST_INIT = CHAR2CONST('I', 'n', 'i', 't'), ST_STABLE = CHAR2CONST('S', 't', 'b', 'l'), } cmd_request_t; /* TODO: make readable constants */ typedef enum { /* for compatibility with other functions */ RLT_SUCCESS = 0, RLT_ASYNC = CHAR2CONST('A', 's', 'y', 'n'), RLT_SYNC_SUCC = CHAR2CONST('S', 'c', 'c', 's'), RLT_SYNC_FAIL = CHAR2CONST('F', 'a', 'i', 'l'), RLT_INVALID_ARG = CHAR2CONST('I', 'A', 'r', 'g'), RLT_OVERGRANT = CHAR2CONST('O', 'v', 'e', 'r'), RLT_PROBABLY_SUCCESS = CHAR2CONST('S', 'u', 'c', '?'), RLT_BUSY = CHAR2CONST('B', 'u', 's', 'y'), } cmd_result_t; /** @} */ /** @{ */ struct booth_site { /** Calculated ID. See add_site(). */ int site_id; int type; int local; /** Roles, like ACCEPTOR, PROPOSER, or LEARNER. Not really used ATM. */ int role; char addr_string[BOOTH_NAME_LEN]; int tcp_fd; int udp_fd; /* 0-based, used for indexing into per-ticket weights */ int index; uint64_t bitmask; unsigned short family; union { struct sockaddr_in sa4; struct sockaddr_in6 sa6; }; int saddrlen; int addrlen; } __attribute__((packed)); extern struct booth_site *local; /** @} */ struct booth_transport; struct client { int fd; const struct booth_transport *transport; void (*workfn)(int); void (*deadfn)(int); }; extern struct client *clients; extern struct pollfd *pollfds; int client_add(int fd, const struct booth_transport *tpt, void (*workfn)(int ci), void (*deadfn)(int ci)); int do_read(int fd, void *buf, size_t count); int do_write(int fd, void *buf, size_t count); void process_connection(int ci); void safe_copy(char *dest, char *value, size_t buflen, const char *description); #endif /* _BOOTH_H */ diff --git a/src/config.c b/src/config.c index 6412018..5a38532 100644 --- a/src/config.c +++ b/src/config.c @@ -1,589 +1,589 @@ /* * Copyright (C) 2011 Jiaju Zhang - * Copyright (C) 2013 Philipp Marek + * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include "booth.h" #include "config.h" #include "paxos.h" #include "ticket.h" #include "log.h" static int ticket_size = 0; static int ticket_realloc(void) { const int added = 5; int had, want; void *p; had = booth_conf->ticket_allocated; want = had + added; p = realloc(booth_conf->ticket, sizeof(struct ticket_config) * want); if (!booth_conf) { log_error("can't alloc more tickets"); return -ENOMEM; } booth_conf->ticket = p; memset(booth_conf->ticket + had, 0, sizeof(struct ticket_config) * added); booth_conf->ticket_allocated = want; return 0; } int add_site(char *address, int type); int add_site(char *addr_string, int type) { int rv; struct booth_site *site; uLong nid; uint32_t mask; rv = 1; if (booth_conf->site_count == MAX_NODES) { log_error("too many nodes"); goto out; } if (strlen(addr_string)+1 >= sizeof(booth_conf->site[0].addr_string)) { log_error("site address \"%s\" too long", addr_string); goto out; } site = booth_conf->site + booth_conf->site_count; site->family = BOOTH_PROTO_FAMILY; site->type = type; /* Make site_id start at a non-zero point. * Perhaps use hash over string or address? */ strcpy(site->addr_string, addr_string); nid = crc32(0L, NULL, 0); /* booth_config() uses memset(), so sizeof() is guaranteed to give * the same result everywhere - no uninitialized bytes. */ site->site_id = crc32(nid, site->addr_string, sizeof(site->addr_string)); /* Make sure we will never collide with NO_OWNER, * or be negative (to get "get_local_id() < 0" working). */ mask = 1 << (sizeof(site->site_id)*8 -1); assert(NO_OWNER & mask); site->site_id &= ~mask; site->index = booth_conf->site_count; site->bitmask = 1 << booth_conf->site_count; /* Catch site overflow */ assert(site->bitmask); booth_conf->site_bits |= site->bitmask; site->tcp_fd = -1; if (site->type == SITE) site->role = PROPOSER | ACCEPTOR | LEARNER; else if (site->type == ARBITRATOR) site->role = ACCEPTOR | LEARNER; booth_conf->site_count++; rv = 0; memset(&site->sa6, 0, sizeof(site->sa6)); if (inet_pton(AF_INET, site->addr_string, &site->sa4.sin_addr) > 0) { site->family = AF_INET; site->sa4.sin_family = site->family; site->sa4.sin_port = htons(booth_conf->port); site->saddrlen = sizeof(site->sa4); site->addrlen = sizeof(site->sa4.sin_addr); } else if (inet_pton(AF_INET6, site->addr_string, &site->sa6.sin6_addr) > 0) { site->family = AF_INET6; site->sa6.sin6_family = site->family; site->sa6.sin6_flowinfo = 0; site->sa6.sin6_port = htons(booth_conf->port); site->saddrlen = sizeof(site->sa6); site->addrlen = sizeof(site->sa6.sin6_addr); } else { log_error("Address string \"%s\" is bad", site->addr_string); rv = EINVAL; } out: return rv; } inline static char *skip_while_in(const char *cp, int (*fn)(int), const char *allowed) { /* strchr() returns a pointer to the terminator if *cp == 0. */ while (*cp && (fn(*cp) || strchr(allowed, *cp))) cp++; /* discard "const" qualifier */ return (char*)cp; } inline static char *skip_while(char *cp, int (*fn)(int)) { while (fn(*cp)) cp++; return cp; } inline static char *skip_until(char *cp, char expected) { while (*cp && *cp != expected) cp++; return cp; } static inline int is_end_of_line(char *cp) { char c = *cp; return c == '\n' || c == 0 || c == '#'; } static int add_ticket(const char *name, struct ticket_config **tkp, const struct ticket_config *def) { int rv; struct ticket_config *tk; if (booth_conf->ticket_count == booth_conf->ticket_allocated) { rv = ticket_realloc(); if (rv < 0) return rv; } tk = booth_conf->ticket + booth_conf->ticket_count; booth_conf->ticket_count++; if (!check_max_len_valid(name, sizeof(tk->name))) { log_error("ticket name \"%s\" too long.", name); return -EINVAL; } if (find_ticket_by_name(name, NULL)) { log_error("ticket name \"%s\" used again.", name); return -EINVAL; } if (* skip_while_in(name, isalnum, "-/")) { log_error("ticket name \"%s\" invalid; only alphanumeric names.", name); return -EINVAL; } strcpy(tk->name, name); tk->timeout = def->timeout; tk->expiry = def->expiry; memcpy(tk->weight, def->weight, sizeof(tk->weight)); tk->state = ST_INIT; if (tkp) *tkp = tk; return 0; } /* returns number of weights, or -1 on bad input. */ static int parse_weights(const char *input, int weights[MAX_NODES]) { int i, v; char *cp; for(i=0; iproto = UDP; booth_conf->port = BOOTH_DEFAULT_PORT; parse_weights("", defaults.weight); defaults.expiry = DEFAULT_TICKET_EXPIRY; defaults.timeout = DEFAULT_TICKET_TIMEOUT; error = ""; log_debug("reading config file %s", path); while (fgets(line, sizeof(line), fp)) { lineno++; s = skip_while(line, isspace); if (is_end_of_line(s)) continue; key = s; /* Key */ end_of_key = skip_while_in(key, isalnum, "-_"); if (end_of_key == key) { error = "No key"; goto err; } if (!*end_of_key) goto exp_equal; /* whitespace, and something else but nothing more? */ s = skip_while(end_of_key, isspace); if (*s != '=') { exp_equal: error = "Expected '=' after key"; goto err; } s++; /* It's my buffer, and I terminate if I want to. */ /* But not earlier than that, because we had to check for = */ *end_of_key = 0; /* Value tokenizing */ s = skip_while(s, isspace); switch (*s) { case '"': case '\'': val = s+1; s = skip_until(val, *s); /* Terminate value */ if (!*s) { error = "Unterminated quoted string"; goto err; } /* Remove and skip quote */ *s = 0; s++; if (* skip_while(s, isspace)) { error = "Surplus data after value"; goto err; } *s = 0; break; case 0: no_value: error = "No value"; goto err; break; default: val = s; /* Rest of line. */ i = strlen(s); /* i > 0 because of "case 0" above. */ while (i > 0 && isspace(s[i-1])) i--; s += i; *s = 0; } if (val == s) goto no_value; if (strlen(key) > BOOTH_NAME_LEN || strlen(val) > BOOTH_NAME_LEN) { error = "key/value too long"; goto err; } if (strcmp(key, "transport") == 0) { if (got_transport) { error = "config file has multiple transport lines"; goto err; } if (strcasecmp(val, "UDP") == 0) booth_conf->proto = UDP; else if (strcasecmp(val, "SCTP") == 0) booth_conf->proto = SCTP; else { error = "invalid transport protocol"; goto err; } got_transport = 1; } if (strcmp(key, "port") == 0) booth_conf->port = atoi(val); if (strcmp(key, "name") == 0) { if(strlen(val)+1 >= BOOTH_NAME_LEN) { error = "Config name too long."; goto err; } } if (strcmp(key, "site") == 0) { if (add_site(val, SITE)) goto out; } if (strcmp(key, "arbitrator") == 0) { if (add_site(val, ARBITRATOR)) goto out; } if (strcmp(key, "ticket") == 0) { if (add_ticket(val, &last_ticket, &defaults)) goto out; /* last_ticket is valid until another one is needed - * and then it already has the new address and * is valid again. */ } if (strcmp(key, "expire") == 0) { defaults.expiry = strtol(val, &s, 0); if (*s || s == val || defaults.expiry<10) { error = "Expected plain integer value >=10 for expire"; goto err; } if (last_ticket) last_ticket->expiry = defaults.expiry; } if (strcmp(key, "timeout") == 0) { defaults.timeout = strtol(val, &s, 0); if (*s || s == val || defaults.timeout<1) { error = "Expected plain integer value >=1 for timeout"; goto err; } if (last_ticket) last_ticket->timeout = defaults.timeout; } if (strcmp(key, "acquire-after") == 0) { defaults.acquire_after = strtol(val, &s, 0); if (*s || s == val || defaults.acquire_after<1) { error = "Expected plain integer value >=1 for acquire-after"; goto err; } if (last_ticket) last_ticket->acquire_after = defaults.acquire_after; } if (strcmp(key, "weights") == 0) { if (parse_weights(val, defaults.weight) < 0) goto out; if (last_ticket) memcpy(last_ticket->weight, defaults.weight, sizeof(last_ticket->weight)); } } if ((booth_conf->site_count % 2) == 0) { log_warn("An odd number of nodes is strongly recommended!"); } /* Default: make config name match config filename. */ if (!booth_conf->name[0]) { cp = strrchr(path, '/'); if (!cp) cp = path; /* TODO: locale? */ /* NUL-termination by memset. */ for(i=0; iname[i] = *(cp++); /* Last resort. */ if (!booth_conf->name[0]) strcpy(booth_conf->name, "booth"); } return 0; err: out: log_error("%s in config file line %d", error, lineno); free(booth_conf); booth_conf = NULL; return -1; } int check_config(int type) { if (!booth_conf) return -1; return 0; } int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type) { struct booth_site *n; int i; if (!booth_conf) return 0; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if ((n->type == SITE || any_type) && strcmp(n->addr_string, site) == 0) { *node = n; return 1; } } return 0; } int find_site_by_id(uint32_t site_id, struct booth_site **node) { struct booth_site *n; int i; if (site_id == NO_OWNER) { *node = NULL; return 1; } if (!booth_conf) return 0; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if (n->site_id == site_id) { *node = n; return 1; } } return 0; } const char *type_to_string(int type) { switch (type) { case ARBITRATOR: return "arbitrator"; case SITE: return "site"; case CLIENT: return "client"; } return "??invalid-type??"; } diff --git a/src/config.h b/src/config.h index 5db437c..3fa4b3c 100644 --- a/src/config.h +++ b/src/config.h @@ -1,141 +1,141 @@ /* * Copyright (C) 2011 Jiaju Zhang - * Copyright (C) 2013 Philipp Marek + * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _CONFIG_H #define _CONFIG_H #include #include "booth.h" #include "transport.h" /** @{ */ /** Definitions for in-RAM data. */ #define MAX_NODES 16 #define TICKET_ALLOC 16 #define RETRIES 10 struct ticket_config { /** \name Configuration items. * @{ */ /** Name of ticket. */ boothc_ticket name; /** How many seconds until expiration. */ int expiry; /** Network related timeouts. */ int timeout; /** If >0, time to wait for a site to get fenced. * The ticket may be acquired after that timespan by * another site. */ int acquire_after; /** Node weights. */ int weight[MAX_NODES]; /** @} */ /** \name Runtime values. * @{ */ /** Current state. */ cmd_request_t state; /** When something has to be done */ time_t next_cron; /** Current owner of ticket. */ struct booth_site *owner; /** Timestamp of expiration. */ time_t expires; /** Last ballot number that was agreed on. */ uint32_t last_ack_ballot; /** @} */ /** \name Needed while proposals are being done. * @{ */ /** Who tries to change the current status. */ struct booth_site *proposer; /** Current owner of ticket. */ struct booth_site *proposed_owner; /** New/current ballot number. * Might be < prev_ballot if overflown. * This only every goes "up" (logically). */ uint32_t new_ballot; /** Bitmap of sites that acknowledge that state. */ uint64_t proposal_acknowledges; /** Timestamp of proposal expiration. */ time_t proposal_expires; /** Number of send retries left. * Used on the new owner. * Starts at 0, counts up. */ int retry_number; /** @} */ }; struct booth_config { char name[BOOTH_NAME_LEN]; transport_layer_t proto; uint16_t port; /** Stores the OR of the individual host bitmasks. */ uint64_t site_bits; int site_count; struct booth_site site[MAX_NODES]; int ticket_count; int ticket_allocated; struct ticket_config *ticket; }; extern struct booth_config *booth_conf; int read_config(const char *path); int check_config(int type); int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type); int find_site_by_id(uint32_t site_id, struct booth_site **node); const char *type_to_string(int type); #define STATE_STRING(s_) ({ union { cmd_request_t s; char c[5]; } d; d.s = htonl(s_); d.c[4] = 0; d.c; }) #endif /* _CONFIG_H */ diff --git a/src/inline-fn.h b/src/inline-fn.h index 7770f1b..93db877 100644 --- a/src/inline-fn.h +++ b/src/inline-fn.h @@ -1,187 +1,187 @@ /* - * Copyright (C) 2013 Philipp Marek + * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _INLINE_FN_H #define _INLINE_FN_H #include #include #include #include "config.h" #include "transport.h" inline static uint32_t get_local_id(void) { return local ? local->site_id : -1; } inline static uint32_t get_node_id(struct booth_site *node) { return node ? node->site_id : NO_OWNER; } inline static int ticket_valid_for(const struct ticket_config *tk) { int left; left = tk->expires - time(NULL); return (left < 0) ? 0 : left; } /** Returns number of seconds left, if any. */ inline static int owner_and_valid(const struct ticket_config *tk) { if (tk->owner != local) return 0; return ticket_valid_for(tk); } static inline void init_header_bare(struct boothc_header *h) { h->magic = htonl(BOOTHC_MAGIC); h->version = htonl(BOOTHC_VERSION); h->from = htonl(local->site_id); } static inline void init_header(struct boothc_header *h, int cmd, int result, int data_len) { init_header_bare(h); h->length = htonl(data_len); h->cmd = htonl(cmd); h->result = htonl(result); } static inline void init_ticket_site_header(struct boothc_ticket_msg *msg, int cmd) { init_header(&msg->header, cmd, 0, sizeof(*msg)); } static inline void init_ticket_msg(struct boothc_ticket_msg *msg, int cmd, int rv, struct ticket_config *tk) { assert(sizeof(msg->ticket.id) == sizeof(tk->name)); init_header(&msg->header, cmd, rv, sizeof(*msg)); if (!tk) { memset(&msg->ticket, 0, sizeof(msg->ticket)); } else { memcpy(msg->ticket.id, tk->name, sizeof(msg->ticket.id)); msg->ticket.expiry = htonl(ticket_valid_for(tk)); msg->ticket.owner = htonl(get_node_id(tk->owner)); msg->ticket.ballot = htonl(tk->new_ballot); msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); } } static inline struct booth_transport const *transport(void) { return booth_transport + booth_conf->proto; } static inline const char *ticket_owner_string(struct booth_site *site) { return site ? site->addr_string : "NONE"; } static inline void disown_ticket(struct ticket_config *tk) { tk->owner = NULL; tk->proposed_owner = NULL; tk->expires = 0; } static inline void disown_if_expired(struct ticket_config *tk) { if (time(NULL) >= tk->expires || !tk->proposed_owner) disown_ticket(tk); } static inline int promote_ticket_state(struct ticket_config *tk) { /* Use ">" to get majority decision, even for an even number * of participants. */ return __builtin_popcount(tk->proposal_acknowledges) * 2 > booth_conf->site_count; } /* We allow half of the uint32_t to be used; * half of that below, half of that above the current known "good" value. * 0 UINT32_MAX * |--------------------------+----------------+------------| * | | | * |--------+-------| allowed range * | * current ballot * * So, on overflow it looks like that: * UINT32_MAX 0 * |--------------------------+-----------||---+------------| * | | | * |--------+-------| allowed range * | * current ballot * * This should be possible by using the same datatype and relying * on the under/overflow semantics. */ static inline int ballot_is_higher_than(uint32_t b_high, uint32_t b_low) { uint32_t diff; if (b_high == b_low) return 0; diff = b_high - b_low; if (diff < UINT32_MAX/4) return 1; diff = b_low - b_high; if (diff < UINT32_MAX/4) return 0; assert(!"ballot out of range - invalid"); } static inline uint32_t ballot_max2(uint32_t a, uint32_t b) { return ballot_is_higher_than(a, b) ? a : b; } static inline uint32_t ballot_max3(uint32_t a, uint32_t b, uint32_t c) { return ballot_max2( ballot_max2(a, b), c); } #endif diff --git a/src/main.c b/src/main.c index cc8ba2d..ec700ff 100644 --- a/src/main.c +++ b/src/main.c @@ -1,1268 +1,1268 @@ /* * Copyright (C) 2011 Jiaju Zhang - * Copyright (C) 2013 Philipp Marek + * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "log.h" #include "booth.h" #include "config.h" #include "transport.h" #include "inline-fn.h" #include "pacemaker.h" #include "ticket.h" #define RELEASE_VERSION "1.0" #define CLIENT_NALLOC 32 int daemonize = 0; /** Structure for "clients". * Filehandles with incoming data get registered here (and in pollfds), * along with their callbacks. * Because these can be reallocated with every new fd, addressing * happens _only_ by their numeric index. */ struct client *clients = NULL; struct pollfd *pollfds = NULL; static int client_maxi; static int client_size = 0; typedef enum { BOOTHD_STARTED=0, BOOTHD_STARTING } BOOTH_DAEMON_STATE; int poll_timeout = POLL_TIMEOUT; typedef enum { OP_LIST = 1, OP_GRANT, OP_REVOKE, } operation_t; struct command_line { int type; /* ACT_ */ int op; /* OP_ */ char configfile[BOOTH_PATH_LEN]; char lockfile[BOOTH_PATH_LEN]; char site[BOOTH_NAME_LEN]; struct boothc_ticket_msg msg; }; struct booth_config *booth_conf; static struct command_line cl; int do_read(int fd, void *buf, size_t count) { int rv, off = 0; while (off < count) { rv = read(fd, (char *)buf + off, count - off); if (rv == 0) return -1; if (rv == -1 && errno == EINTR) continue; if (rv == -1) return -1; off += rv; } return 0; } int do_write(int fd, void *buf, size_t count) { int rv, off = 0; retry: rv = write(fd, (char *)buf + off, count); if (rv == -1 && errno == EINTR) goto retry; /* If we cannot write _any_ data, we'd be in an (potential) loop. */ if (rv <= 0) { log_error("write failed: %s (%d)", strerror(errno), errno); return rv; } if (rv != count) { count -= rv; off += rv; goto retry; } return 0; } static void client_alloc(void) { int i; if (!clients) { clients = malloc(CLIENT_NALLOC * sizeof(struct client)); pollfds = malloc(CLIENT_NALLOC * sizeof(struct pollfd)); } else { clients = realloc(clients, (client_size + CLIENT_NALLOC) * sizeof(struct client)); pollfds = realloc(pollfds, (client_size + CLIENT_NALLOC) * sizeof(struct pollfd)); } if (!clients || !pollfds) { log_error("can't alloc for client array"); exit(1); } for (i = client_size; i < client_size + CLIENT_NALLOC; i++) { clients[i].workfn = NULL; clients[i].deadfn = NULL; clients[i].fd = -1; pollfds[i].fd = -1; pollfds[i].revents = 0; } client_size += CLIENT_NALLOC; } static void client_dead(int ci) { if (clients[ci].fd != -1) close(clients[ci].fd); clients[ci].fd = -1; clients[ci].workfn = NULL; pollfds[ci].fd = -1; } int client_add(int fd, const struct booth_transport *tpt, void (*workfn)(int ci), void (*deadfn)(int ci)) { int i; struct client *c; if (client_size + 2 >= client_maxi ) { client_alloc(); } for (i = 0; i < client_size; i++) { c = clients + i; if (c->fd != -1) continue; c->workfn = workfn; if (deadfn) c->deadfn = deadfn; else c->deadfn = client_dead; c->transport = tpt; c->fd = fd; pollfds[i].fd = fd; pollfds[i].events = POLLIN; if (i > client_maxi) client_maxi = i; return i; } assert(!"no client"); } /* Only used for client requests, TCP ???*/ void process_connection(int ci) { struct boothc_ticket_msg msg; int rv, len, exp, fd; void (*deadfn) (int ci); fd = clients[ci].fd; rv = do_read(fd, &msg.header, sizeof(msg.header)); if (rv < 0) { if (errno == ECONNRESET) log_debug("client %d connection reset for fd %d", ci, clients[ci].fd); goto kill; } if (check_boothc_header(&msg.header, -1) < 0) goto kill; /* Basic sanity checks already done. */ len = ntohl(msg.header.length); if (len) { if (len != sizeof(msg)) { bad_len: log_error("got wrong length %u", len); return; } exp = len - sizeof(msg.header); rv = do_read(clients[ci].fd, msg.header.data, exp); if (rv < 0) { log_error("connection %d read data error %d, wanted %d", ci, rv, exp); goto kill; } } switch (ntohl(msg.header.cmd)) { case CMD_LIST: ticket_answer_list(fd, &msg); goto kill; case CMD_GRANT: /* Expect boothc_ticket_site_msg. */ if (len != sizeof(msg)) goto bad_len; ticket_answer_grant(fd, &msg); goto kill; case CMD_REVOKE: /* Expect boothc_ticket_site_msg. */ if (len != sizeof(msg)) goto bad_len; ticket_answer_revoke(fd, &msg); goto kill; default: log_error("connection %d cmd %x unknown", ci, ntohl(msg.header.cmd)); init_header(&msg.header,CMR_GENERAL, RLT_INVALID_ARG, sizeof(msg.header)); send_header_only(fd, &msg.header); goto kill; } assert(0); return; kill: deadfn = clients[ci].deadfn; if(deadfn) { deadfn(ci); } return; } /** Callback function for the listening TCP socket. */ static void process_listener(int ci) { int fd, i; fd = accept(clients[ci].fd, NULL, NULL); if (fd < 0) { log_error("process_listener: accept error for fd %d: %s (%d)", clients[ci].fd, strerror(errno), errno); if (clients[ci].deadfn) clients[ci].deadfn(ci); return; } i = client_add(fd, clients[ci].transport, process_connection, NULL); log_debug("add client connection %d fd %d", i, fd); } static int setup_config(int type) { int rv; rv = read_config(cl.configfile); if (rv < 0) goto out; /* Set "local" pointer, ignoring errors. */ if (cl.type == DAEMON && cl.site[0]) { if (!find_site_by_name(cl.site, &local, 1)) { log_error("Cannot find \"%s\" in the configuration.", cl.site); return -EINVAL; } local->local = 1; } else find_myself(NULL, type == CLIENT); rv = check_config(type); if (rv < 0) goto out; /* Per default the PID file name is derived from the * configuration name. */ if (!cl.lockfile[0]) { snprintf(cl.lockfile, sizeof(cl.lockfile)-1, "%s/%s.pid", BOOTH_RUN_DIR, booth_conf->name); } out: return rv; } static int setup_transport(void) { int rv; rv = transport()->init(message_recv); if (rv < 0) { log_error("failed to init booth_transport %s", transport()->name); goto out; } rv = booth_transport[TCP].init(NULL); if (rv < 0) { log_error("failed to init booth_transport[TCP]"); goto out; } out: return rv; } static int write_daemon_state(int fd, int state) { char buffer[1024]; int rv, size; size = sizeof(buffer) - 1; rv = snprintf(buffer, size, "booth_pid=%d " "booth_state=%s " "booth_type=%s " "booth_cfg_name='%s' " "booth_addr_string='%s' " "booth_port=%d\n", getpid(), ( state == BOOTHD_STARTED ? "started" : state == BOOTHD_STARTING ? "starting" : "invalid"), type_to_string(local->type), booth_conf->name, local->addr_string, booth_conf->port); if (rv < 0 || rv == size) { log_error("Buffer filled up in write_daemon_state()."); return -1; } size = rv; rv = ftruncate(fd, 0); if (rv < 0) { log_error("lockfile %s truncate error %d: %s", cl.lockfile, errno, strerror(errno)); return rv; } rv = lseek(fd, 0, SEEK_SET); if (rv < 0) { log_error("lseek set fd(%d) offset to 0 error, return(%d), message(%s)", fd, rv, strerror(errno)); rv = -1; return rv; } rv = write(fd, buffer, size); if (rv != size) { log_error("write to fd(%d, %d) returned %d, errno %d, message(%s)", fd, size, rv, errno, strerror(errno)); return -1; } return 0; } static int loop(int fd) { void (*workfn) (int ci); void (*deadfn) (int ci); int rv, i; rv = setup_transport(); if (rv < 0) goto fail; rv = setup_ticket(); if (rv < 0) goto fail; client_add(local->tcp_fd, booth_transport + TCP, process_listener, NULL); rv = write_daemon_state(fd, BOOTHD_STARTED); if (rv != 0) { log_error("write daemon state %d to lockfile error %s: %s", BOOTHD_STARTED, cl.lockfile, strerror(errno)); goto fail; } if (cl.type == ARBITRATOR) log_info("BOOTH arbitrator daemon started"); else if (cl.type == SITE) log_info("BOOTH cluster site daemon started"); while (1) { rv = poll(pollfds, client_maxi + 1, poll_timeout); if (rv == -1 && errno == EINTR) continue; if (rv < 0) { log_error("poll failed: %s (%d)", strerror(errno), errno); goto fail; } for (i = 0; i <= client_maxi; i++) { if (clients[i].fd < 0) continue; if (pollfds[i].revents & POLLIN) { workfn = clients[i].workfn; if (workfn) workfn(i); } if (pollfds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) { deadfn = clients[i].deadfn; if (deadfn) deadfn(i); } } process_tickets(); } return 0; fail: return -1; } static int query_get_string_answer(cmd_request_t cmd) { struct booth_site *site; struct boothc_header reply; char *data; int data_len; int rv; struct booth_transport const *tpt; data = NULL; init_header(&cl.msg.header, cmd, 0, sizeof(cl.msg)); if (!*cl.site) site = local; else if (!find_site_by_name(cl.site, &site, 1)) { log_error("cannot find site \"%s\"", cl.site); rv = ENOENT; goto out; } tpt = booth_transport + TCP; rv = tpt->open(site); if (rv < 0) goto out_free; rv = tpt->send(site, &cl.msg, sizeof(cl.msg)); if (rv < 0) goto out_free; rv = tpt->recv(site, &reply, sizeof(reply)); if (rv < 0) goto out_free; data_len = ntohl(reply.length) - sizeof(reply); data = malloc(data_len); if (!data) { rv = -ENOMEM; goto out_free; } rv = tpt->recv(site, data, data_len); if (rv < 0) goto out_free; do_write(STDOUT_FILENO, data, data_len); rv = 0; out_free: free(data); tpt->close(site); out: return rv; } static int do_command(cmd_request_t cmd) { struct booth_site *site; struct boothc_header reply; struct booth_transport const *tpt; int rv; rv = 0; site = NULL; if (!*cl.site) site = local; else { if (!find_site_by_name(cl.site, &site, 1)) { log_error("Site \"%s\" not configured.", cl.site); goto out_close; } if (site->type == ARBITRATOR) { log_error("Site \"%s\" is an arbitrator, cannot grant ticket there.", cl.site); goto out_close; } assert(site->type == SITE); } /* We don't check for existence of ticket, so that asking can be * done without local configuration, too. * Although, that means that the UDP port has to be specified, too. */ if (!cl.msg.ticket.id[0]) { /* If the loaded configuration has only a single ticket defined, use that. */ if (booth_conf->ticket_count == 1) { strcpy(cl.msg.ticket.id, booth_conf->ticket[0].name); } else { log_error("No ticket given."); goto out_close; } } init_header(&cl.msg.header, cmd, 0, sizeof(cl.msg)); /* Always use TCP for client - at least for now. */ tpt = booth_transport + TCP; rv = tpt->open(site); if (rv < 0) goto out_close; rv = tpt->send(site, &cl.msg, sizeof(cl.msg)); if (rv < 0) goto out_close; rv = tpt->recv(site, &reply, sizeof(reply)); if (rv < 0) goto out_close; if (reply.result == RLT_INVALID_ARG) { log_info("invalid argument!"); rv = -1; goto out_close; } if (reply.result == RLT_OVERGRANT) { log_info("You're granting a granted ticket " "If you wanted to migrate a ticket," "use revoke first, then use grant"); rv = -1; goto out_close; } rv = ntohl(reply.result); switch (rv) { case RLT_ASYNC: if (cmd == CMD_GRANT) log_info("grant command sent, result will be returned " "asynchronously, you can get the result from " "the log files"); else if (cmd == CMD_REVOKE) log_info("revoke command sent, result will be returned " "asynchronously, you can get the result from " "the log files."); else log_error("internal error reading reply result!"); rv = 0; break; case RLT_SYNC_SUCC: case RLT_SUCCESS: if (cmd == CMD_GRANT) log_info("grant succeeded!"); else if (cmd == CMD_REVOKE) log_info("revoke succeeded!"); rv = 0; break; case RLT_SYNC_FAIL: if (cmd == CMD_GRANT) log_info("grant failed!"); else if (cmd == CMD_REVOKE) log_info("revoke failed!"); rv = -1; break; case RLT_INVALID_ARG: log_error("\"Invalid argument\", most probably ticket name \"%s\" wrong.", cl.msg.ticket.id); break; default: log_error("got an error code: %x", rv); rv = -1; } out_close: if (site) local_transport->close(site); return rv; } static int do_grant(void) { return do_command(CMD_GRANT); } static int do_revoke(void) { return do_command(CMD_REVOKE); } static int _lockfile(int mode, int *fdp, pid_t *locked_by) { struct flock lock; int fd, rv; /* After reboot the directory may not yet exist. * Try to create it, but ignore errors. */ if (strncmp(cl.lockfile, BOOTH_RUN_DIR, strlen(BOOTH_RUN_DIR)) == 0) mkdir(BOOTH_RUN_DIR, 0775); if (locked_by) *locked_by = 0; *fdp = -1; fd = open(cl.lockfile, mode, 0664); if (fd < 0) return errno; *fdp = fd; lock.l_type = F_WRLCK; lock.l_start = 0; lock.l_whence = SEEK_SET; lock.l_len = 0; lock.l_pid = 0; if (fcntl(fd, F_SETLK, &lock) == 0) return 0; rv = errno; if (locked_by) if (fcntl(fd, F_GETLK, &lock) == 0) *locked_by = lock.l_pid; return rv; } static int lockfile(void) { int rv, fd; fd = -1; rv = _lockfile(O_CREAT | O_WRONLY, &fd, NULL); if (fd == -1) { log_error("lockfile %s open error %d: %s", cl.lockfile, rv, strerror(rv)); return -1; } if (rv < 0) { log_error("lockfile %s setlk error %d: %s", cl.lockfile, rv, strerror(rv)); goto fail; } rv = write_daemon_state(fd, BOOTHD_STARTING); if (rv != 0) { log_error("write daemon state %d to lockfile error %s: %s", BOOTHD_STARTING, cl.lockfile, strerror(errno)); goto fail; } return fd; fail: close(fd); return -1; } static void unlink_lockfile(int fd) { unlink(cl.lockfile); close(fd); } static void print_usage(void) { printf("Usage:\n"); printf("booth [options]\n"); printf("\n"); printf("Types:\n"); printf(" arbitrator: daemon running on arbitrator\n"); printf(" site: daemon running on cluster site\n"); printf(" client: command running from client\n"); printf("\n"); printf("Operations:\n"); printf("Please note that operations are valid iff type is client!\n"); printf(" list: List all the tickets\n"); printf(" grant: Grant ticket T(-t T) to site S(-s S)\n"); printf(" revoke: Revoke ticket T(-t T) from site S(-s S)\n"); printf("\n"); printf("Options:\n"); printf(" -c FILE Specify config file [default " BOOTH_DEFAULT_CONF "]\n"); printf(" -l LOCKFILE Specify lock file path\n"); printf(" -D Enable debugging to stderr and don't fork\n"); printf(" -t ticket name\n"); printf(" -S report local daemon status (for site and arbitrator)\n"); printf(" RA script compliant return codes.\n"); printf(" -s site name\n"); printf(" -h Print this help, then exit\n"); } #define OPTION_STRING "c:Dl:t:s:hS" void safe_copy(char *dest, char *value, size_t buflen, const char *description) { int content_len = buflen - 1; if (strlen(value) >= content_len) { fprintf(stderr, "'%s' exceeds maximum %s length of %d\n", value, description, content_len); exit(EXIT_FAILURE); } strncpy(dest, value, content_len); dest[content_len] = 0; } static int host_convert(char *hostname, char *ip_str, size_t ip_size) { struct addrinfo *result = NULL, hints = {0}; int re = -1; memset(&hints, 0, sizeof(hints)); hints.ai_family = BOOTH_PROTO_FAMILY; hints.ai_socktype = SOCK_DGRAM; re = getaddrinfo(hostname, NULL, &hints, &result); if (re == 0) { struct in_addr addr = ((struct sockaddr_in *)result->ai_addr)->sin_addr; const char *re_ntop = inet_ntop(BOOTH_PROTO_FAMILY, &addr, ip_str, ip_size); if (re_ntop == NULL) { re = -1; } } freeaddrinfo(result); return re; } static int read_arguments(int argc, char **argv) { int optchar; char *arg1 = argv[1]; char *op = NULL; char *cp; char site_arg[INET_ADDRSTRLEN] = {0}; int left; if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") || !strcmp(arg1, "-h")) { print_usage(); exit(EXIT_SUCCESS); } if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") || !strcmp(arg1, "-V")) { printf("%s %s (built %s %s)\n", argv[0], RELEASE_VERSION, __DATE__, __TIME__); exit(EXIT_SUCCESS); } if (strcmp(arg1, "arbitrator") == 0 || strcmp(arg1, "site") == 0 || strcmp(arg1, "start") == 0 || strcmp(arg1, "daemon") == 0) { cl.type = DAEMON; optind = 2; } else if (strcmp(arg1, "status") == 0) { cl.type = STATUS; optind = 2; } else if (strcmp(arg1, "client") == 0) { cl.type = CLIENT; if (argc < 3) { print_usage(); exit(EXIT_FAILURE); } op = argv[2]; optind = 3; } else { cl.type = CLIENT; op = argv[1]; optind = 2; } if (cl.type == CLIENT) { if (!strcmp(op, "list")) cl.op = OP_LIST; else if (!strcmp(op, "grant")) cl.op = OP_GRANT; else if (!strcmp(op, "revoke")) cl.op = OP_REVOKE; else { fprintf(stderr, "client operation \"%s\" is unknown\n", op); exit(EXIT_FAILURE); } } while (optind < argc) { optchar = getopt(argc, argv, OPTION_STRING); switch (optchar) { case 'c': if (strchr(optarg, '/')) { safe_copy(cl.configfile, optarg, sizeof(cl.configfile), "config file"); } else { /* If no "/" in there, use with default directory. */ strcpy(cl.configfile, BOOTH_DEFAULT_CONF_DIR); cp = cl.configfile + strlen(BOOTH_DEFAULT_CONF_DIR); assert(cp > cl.configfile); assert(*(cp-1) == '/'); /* Write after the "/" */ safe_copy(cp + 1, optarg, (sizeof(cl.configfile) - (cp - cl.configfile) - strlen(BOOTH_DEFAULT_CONF_EXT)), "config name"); /* If no extension, append ".conf". * Space is available, see -strlen() above. */ if (!strchr(cp, '.')) strcat(cp, BOOTH_DEFAULT_CONF_EXT); } break; case 'D': daemonize = 1; debug_level++; break; case 'l': safe_copy(cl.lockfile, optarg, sizeof(cl.lockfile), "lock file"); break; case 't': if (cl.op == OP_GRANT || cl.op == OP_REVOKE) { safe_copy(cl.msg.ticket.id, optarg, sizeof(cl.msg.ticket.id), "ticket name"); } else { print_usage(); exit(EXIT_FAILURE); } break; case 's': /* For testing and debugging: allow "-s site" also for * daemon start, so that the address that should be used * can be set manually. * This makes it easier to start multiple processes * on one machine. */ if (cl.type == CLIENT || (cl.type == DAEMON && debug_level)) { int re = host_convert(optarg, site_arg, INET_ADDRSTRLEN); if (re == 0) { safe_copy(cl.site, site_arg, sizeof(cl.site), "site name"); } else { safe_copy(cl.site, optarg, sizeof(cl.site), "site name"); } } else { log_error("\"-s\" not allowed in daemon mode."); exit(EXIT_FAILURE); } break; case 'h': print_usage(); exit(EXIT_SUCCESS); break; case ':': case '?': fprintf(stderr, "Please use '-h' for usage.\n"); exit(EXIT_FAILURE); break; case -1: /* No more parameters on cmdline, only arguments. */ goto extra_args; default: goto unknown; }; } return 0; extra_args: if (cl.type == CLIENT && !cl.msg.ticket.id[0]) { /* Use additional argument as ticket name. */ safe_copy(cl.msg.ticket.id, argv[optind], sizeof(cl.msg.ticket.id), "ticket name"); optind++; } if (optind == argc) return 0; left = argc - optind; fprintf(stderr, "Superfluous argument%s: %s%s\n", left == 1 ? "" : "s", argv[optind], left == 1 ? "" : "..."); exit(EXIT_FAILURE); unknown: fprintf(stderr, "unknown option: %s\n", argv[optind]); exit(EXIT_FAILURE); } static void set_scheduler(void) { struct sched_param sched_param; struct rlimit rlimit; int rv; rlimit.rlim_cur = RLIM_INFINITY; rlimit.rlim_max = RLIM_INFINITY; setrlimit(RLIMIT_MEMLOCK, &rlimit); rv = mlockall(MCL_CURRENT | MCL_FUTURE); if (rv < 0) { log_error("mlockall failed"); } rv = sched_get_priority_max(SCHED_RR); if (rv != -1) { sched_param.sched_priority = rv; rv = sched_setscheduler(0, SCHED_RR, &sched_param); if (rv == -1) log_error("could not set SCHED_RR priority %d: %s (%d)", sched_param.sched_priority, strerror(errno), errno); } else { log_error("could not get maximum scheduler priority err %d", errno); } } static void set_oom_adj(int val) { FILE *fp; fp = fopen("/proc/self/oom_adj", "w"); if (!fp) return; fprintf(fp, "%i", val); fclose(fp); } static int do_status(int type) { pid_t pid; int rv, lock_fd, ret; const char *reason = NULL; char lockfile_data[1024], *cp; ret = PCMK_OCF_NOT_RUNNING; /* TODO: query all, and return quit only if it's _cleanly_ not * running, ie. _neither_ of port/lockfile/process is available? * * Currently a single failure says "not running", even if "only" the * lockfile has been removed. */ rv = setup_config(type); if (rv) { reason = "Error reading configuration."; ret = PCMK_OCF_UNKNOWN_ERROR; goto quit; } if (!local) { reason = "No Service IP active here."; goto quit; } rv = _lockfile(O_RDWR, &lock_fd, &pid); if (rv == 0) { reason = "PID file not locked."; goto quit; } if (lock_fd == -1) { reason = "No PID file."; goto quit; } if (pid) { fprintf(stdout, "booth_lockpid=%d ", pid); fflush(stdout); } rv = read(lock_fd, lockfile_data, sizeof(lockfile_data) - 1); if (rv < 4) { reason = "Cannot read lockfile data."; ret = PCMK_LSB_UNKNOWN_ERROR; goto quit; } lockfile_data[rv] = 0; if (lock_fd != -1) close(lock_fd); /* Make sure it's only a single line */ cp = strchr(lockfile_data, '\r'); if (cp) *cp = 0; cp = strchr(lockfile_data, '\n'); if (cp) *cp = 0; rv = setup_udp_server(1); if (rv == 0) { reason = "UDP port not in use."; goto quit; } fprintf(stdout, "booth_lockfile='%s' %s\n", cl.lockfile, lockfile_data); if (daemonize) fprintf(stderr, "Booth at %s port %d seems to be running.\n", local->addr_string, booth_conf->port); return 0; quit: log_debug("not running: %s", reason); /* Ie. "DEBUG" */ if (daemonize) fprintf(stderr, "not running: %s\n", reason); return ret; } static int do_server(int type) { int lock_fd = -1; int rv = -1; static char log_ent[128] = DAEMON_NAME "-"; rv = setup_config(type); if (rv < 0) goto out; if (!local) { log_error("Cannot find myself in the configuration."); exit(EXIT_FAILURE); } if (!daemonize) { if (daemon(0, 0) < 0) { perror("daemon error"); exit(EXIT_FAILURE); } } /* The lockfile must be written to _after_ the call to daemon(), so * that the lockfile contains the pid of the daemon, not the parent. */ lock_fd = lockfile(); if (lock_fd < 0) return lock_fd; strcat(log_ent, type_to_string(local->type)); cl_log_set_entity(log_ent); cl_log_enable_stderr(debug_level ? TRUE : FALSE); cl_log_set_facility(HA_LOG_FACILITY); cl_inherit_logging_environment(0); log_info("BOOTH %s daemon is starting, node id is %08X.", type_to_string(local->type), local->site_id); signal(SIGUSR1, (__sighandler_t)tickets_log_info); set_scheduler(); set_oom_adj(-16); set_proc_title("%s %s for [%s]:%d", DAEMON_NAME, type_to_string(local->type), local->addr_string, booth_conf->port); rv = loop(lock_fd); out: if (lock_fd >= 0) unlink_lockfile(lock_fd); return rv; } static int do_client(void) { int rv = -1; rv = setup_config(CLIENT); if (rv < 0) { log_error("cannot read config"); goto out; } switch (cl.op) { case OP_LIST: rv = query_get_string_answer(CMD_LIST); break; case OP_GRANT: rv = do_grant(); break; case OP_REVOKE: rv = do_revoke(); break; } out: return rv; } int main(int argc, char *argv[], char *envp[]) { int rv; init_set_proc_title(argc, argv, envp); memset(&cl, 0, sizeof(cl)); strncpy(cl.configfile, BOOTH_DEFAULT_CONF, BOOTH_PATH_LEN - 1); cl.lockfile[0] = 0; debug_level = 0; cl_log_set_entity("booth"); cl_log_enable_stderr(TRUE); cl_log_set_facility(0); rv = read_arguments(argc, argv); if (rv < 0) goto out; switch (cl.type) { case STATUS: rv = do_status(cl.type); break; case ARBITRATOR: case DAEMON: case SITE: rv = do_server(cl.type); break; case CLIENT: rv = do_client(); break; } out: /* Normalize values. 0x100 would be seen as "OK" by waitpid(). */ return (rv >= 0 && rv < 0x70) ? rv : 1; } diff --git a/src/pacemaker.c b/src/pacemaker.c index 78d450c..0558223 100644 --- a/src/pacemaker.c +++ b/src/pacemaker.c @@ -1,187 +1,187 @@ /* * Copyright (C) 2011 Jiaju Zhang - * Copyright (C) 2013 Philipp Marek + * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include "log.h" #include "pacemaker.h" #include "inline-fn.h" #define COMMAND_MAX 256 static const char * interpret_rv(int rv) { static char text[64]; int p; if (rv == 0) return "0"; p = sprintf(text, "rv %d", WEXITSTATUS(rv)); if (WIFSIGNALED(rv)) sprintf(text + p, " signal %d", WTERMSIG(rv)); return text; } static void pcmk_grant_ticket(struct ticket_config *tk) { char cmd[COMMAND_MAX]; int rv; snprintf(cmd, COMMAND_MAX, "crm_ticket -t %s -g --force", tk->name); log_info("command: '%s' was executed", cmd); rv = system(cmd); if (rv != 0) log_error("error: \"%s\" failed, rv %s", cmd, interpret_rv(rv)); } static void pcmk_revoke_ticket(struct ticket_config *tk) { char cmd[COMMAND_MAX]; int rv; snprintf(cmd, COMMAND_MAX, "crm_ticket -t %s -r --force", tk->name); log_info("command: '%s' was executed", cmd); rv = system(cmd); if (rv != 0) log_error("error: \"%s\" failed, rv %s", cmd, interpret_rv(rv)); } static int crm_ticket_set(const struct ticket_config *tk, const char *attr, int64_t val) { char cmd[COMMAND_MAX]; int i, rv; snprintf(cmd, COMMAND_MAX, "crm_ticket -t '%s' -S '%s' -v %" PRIi64, tk->name, attr, val); /* If there are errors, there's not much we can do but retry ... */ for (i=0; i<3 && (rv = system(cmd)); i++) ; log_info("'%s' gave result %s", cmd, interpret_rv(rv)); return rv; } static void pcmk_store_ticket(struct ticket_config *tk) { crm_ticket_set(tk, "owner", (int32_t)get_node_id(tk->owner)); crm_ticket_set(tk, "expires", tk->expires); crm_ticket_set(tk, "ballot", tk->last_ack_ballot); } static int crm_ticket_get(struct ticket_config *tk, const char *attr, int64_t *data) { char cmd[COMMAND_MAX]; char line[256]; int rv; int64_t v; FILE *p; *data = -1; v = 0; snprintf(cmd, COMMAND_MAX, "crm_ticket -t '%s' -G '%s' --quiet", tk->name, attr); p = popen(cmd, "r"); if (p == NULL) { rv = errno; log_error("popen error %d (%s) for \"%s\"", rv, strerror(rv), cmd); return rv || -EINVAL; } if (fgets(line, sizeof(line) - 1, p) == NULL) { rv = ENODATA; goto out; } rv = EINVAL; if (sscanf(line, "%" PRIi64, &v) == 1) rv = 0; *data = v; out: rv = pclose(p); log_info("command \"%s\" returned rv %s, value %" PRIi64, cmd, interpret_rv(rv), v); return rv; } static void pcmk_load_ticket(struct ticket_config *tk) { int rv; int64_t v; rv = crm_ticket_get(tk, "expires", &v); if (!rv) { tk->expires = v; } rv = crm_ticket_get(tk, "ballot", &v); if (!rv) { tk->new_ballot = tk->last_ack_ballot = v; } rv = crm_ticket_get(tk, "owner", &v); if (!rv) { /* No check, node could have been deconfigured. */ find_site_by_id(v, &tk->proposed_owner); } disown_if_expired(tk); tk->proposal_acknowledges = local->bitmask; /* We load only when the state is completely unknown. */ tk->state = ST_INIT; return; } struct ticket_handler pcmk_handler = { .grant_ticket = pcmk_grant_ticket, .revoke_ticket = pcmk_revoke_ticket, .store_ticket = pcmk_store_ticket, .load_ticket = pcmk_load_ticket, }; diff --git a/src/paxos.c b/src/paxos.c index 9cd1199..334ad8a 100644 --- a/src/paxos.c +++ b/src/paxos.c @@ -1,397 +1,397 @@ /* * Copyright (C) 2011 Jiaju Zhang - * Copyright (C) 2013 Philipp Marek + * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include "booth.h" #include "transport.h" #include "inline-fn.h" #include "config.h" #include "paxos.h" #include "log.h" static uint32_t next_ballot_number(struct ticket_config *tk) { uint32_t b; /* TODO: getenv() for debugging */ b = tk->new_ballot; /* + unique number */ b += local->bitmask; /* + weight */ b += booth_conf->site_bits * tk->weight[ local->index ]; return b; } static inline void set_proposal_in_ticket(struct ticket_config *tk, struct booth_site *from, uint32_t ballot, struct booth_site *new_owner) { tk->proposer = from; tk->new_ballot = ballot; tk->proposed_owner = new_owner; tk->proposal_expires = 0; // TODO - needed? tk->proposal_acknowledges = from->bitmask | local->bitmask; /* We lose (?) */ tk->state = ST_STABLE; } static inline void change_ticket_owner(struct ticket_config *tk, uint32_t ballot, struct booth_site *new_owner) { int next; /* set "previous" value for next round */ tk->last_ack_ballot = tk->new_ballot = ballot; tk->owner = new_owner; tk->expires = time(NULL) + tk->expiry; tk->proposer = NULL; tk->state = ST_STABLE; if (new_owner == local) { next = tk->expiry / 2; if (tk->timeout * RETRIES/2 < next) next = tk->timeout; ticket_next_cron_in(tk, next); } else ticket_next_cron_in(tk, tk->expiry + tk->acquire_after); log_info("Now actively COMMITTED for \"%s\": new owner %s, ballot %d", tk->name, ticket_owner_string(tk->owner), ballot); ticket_write(tk); } void abort_proposal(struct ticket_config *tk) { tk->proposer = NULL; tk->proposed_owner = tk->owner; tk->retry_number = 0; /* Ask others (repeatedly) until we know the new owner. */ tk->state = ST_INIT; } /** \defgroup msghdl Message handling functions. * * Not all use all arguments; but to keep the interface the same, * they're all just passed everything we have. * * See also enum \ref cmd_request_t. * @{ */ /** Start a PAXOS round, by sending out an OP_PREPARING. */ int paxos_start_round(struct ticket_config *tk, struct booth_site *new_owner) { if (tk->state != ST_STABLE) return RLT_BUSY; /* This may not be done from cron, because the ballot number would simply * get counted up without any benefit. * The message may get retransmitted, though. */ tk->proposer = local; tk->new_ballot = next_ballot_number(tk); tk->proposed_owner = new_owner; tk->retry_number = 0; ticket_activate_timeout(tk); return ticket_broadcast_proposed_state(tk, OP_PREPARING); } /** Answering OP_PREPARING means sending out OP_PROMISING. */ inline static int answer_PREP( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { if (!(local->role & ACCEPTOR)) return 0; /* We have to be careful here. * Getting multiple copies of the same message must not trigger * rejections, but only repeated promises. */ if (from == tk->proposer && ballot == tk->new_ballot) goto promise; /* It doesn't matter whether it's the same or another host; * the only distinction is the ballot number. */ if (ballot > tk->new_ballot) { promise: msg->header.cmd = htonl(OP_PROMISING); msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); set_proposal_in_ticket(tk, from, ballot, new_owner); log_info("PROMISING for ticket \"%s\" (by %s) for %d", tk->name, from->addr_string, ballot); } else { msg->header.cmd = htonl(OP_REJECTED); msg->ticket.ballot = htonl(tk->new_ballot); msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); log_info("REJECTING (prep) for ticket \"%s\" from %s - have %d, wanted %d", tk->name, from->addr_string, tk->new_ballot, ballot); } init_header_bare(&msg->header); return booth_udp_send(from, msg, sizeof(*msg)); } /** Getting OP_REJECTED means abandoning the current operation. */ inline static int answer_REJ( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { log_info("got REJECTED for ticket \"%s\", ballot %d (has %d), from %s", tk->name, tk->new_ballot, ballot, from->addr_string); abort_proposal(tk); /* TODO: should we check whether that sequence is increasing? */ tk->new_ballot = ballot; tk->last_ack_ballot = ntohl(msg->ticket.prev_ballot); /* No need to ask the others. */ tk->state = ST_STABLE; return 0; } /** After a few OP_PROMISING replies we can send out OP_PROPOSING. */ inline static int got_a_PROM( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { if (tk->proposer == local || tk->state == OP_PREPARING) { tk->proposal_acknowledges |= from->bitmask; log_info("Got PROMISE from %s for \"%s\", now %" PRIx64, from->addr_string, tk->name, tk->proposal_acknowledges); /* TODO: only check for count? */ if (promote_ticket_state(tk)) { ticket_activate_timeout(tk); return ticket_broadcast_proposed_state(tk, OP_PROPOSING); } /* Wait for further data */ return 0; } /* Packet just delayed? Silently ignore. */ if (ballot == tk->last_ack_ballot && (new_owner == tk->owner || new_owner == tk->proposed_owner)) return 0; /* Message sent to wrong host? */ log_debug("got unexpected PROMISE from %s for \"%s\"", from->addr_string, tk->name); return 0; } /** Answering OP_PROPOSING means sending out OP_ACCEPTING. */ inline static int answer_PROP( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { if (!(local->role & ACCEPTOR)) return 0; if (from == tk->proposer && ballot == tk->new_ballot) goto accepting; /* We have to be careful here. * Getting multiple copies of the same message must not trigger * rejections, but only repeated OP_ACCEPTING messages. */ if (ballot > tk->last_ack_ballot && ballot == tk->new_ballot && ntohl(msg->ticket.prev_ballot) == tk->last_ack_ballot) { if (tk->proposer) { /* Send OP_REJECTED to previous proposer? */ log_info("new PROPOSAL for ticket \"%s\" overriding older one from %s", tk->name, from->addr_string); } tk->proposer = from; accepting: init_ticket_msg(msg, OP_ACCEPTING, RLT_SUCCESS, tk); log_info("sending ACCEPT for ticket \"%s\" (by %s) for %d - new owner %s", tk->name, from->addr_string, ballot, ticket_owner_string(new_owner)); change_ticket_owner(tk, ballot, new_owner); } if (ballot == tk->last_ack_ballot && ballot == tk->new_ballot && ntohl(msg->ticket.prev_ballot) == tk->last_ack_ballot) { /* Silently ignore delayed messages. */ } else { msg->header.cmd = htonl(OP_REJECTED); msg->ticket.ballot = htonl(tk->new_ballot); msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); log_info("REJECTING (prop) for ticket \"%s\" from %s - have %d, wanted %d", tk->name, from->addr_string, tk->new_ballot, ballot); } init_header_bare(&msg->header); return booth_udp_send(from, msg, sizeof(*msg)); } /** After enough OP_ACCEPTING we can do the change, and send an OP_COMMITTED. */ inline static int got_an_ACC( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { int rv; if (tk->proposer == local && tk->state == OP_PROPOSING) { tk->proposal_acknowledges |= from->bitmask; log_info("Got ACCEPTING from %s for \"%s\", now %" PRIx64, from->addr_string, tk->name, tk->proposal_acknowledges); /* TODO: only check for count? */ if (promote_ticket_state(tk)) { change_ticket_owner(tk, tk->new_ballot, tk->proposed_owner); rv = ticket_broadcast_proposed_state(tk, OP_COMMITTED); return rv; } } return 0; } /** An OP_COMMITTED gets no answer; just record the new state. */ inline static int answer_COMM( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { /* We cannot check whether the packet is from an expected proposer - * perhaps this is the _only_ message of the whole handshake? */ if (ballot > tk->new_ballot && ntohl(msg->ticket.prev_ballot) == tk->last_ack_ballot) { change_ticket_owner(tk, ballot, new_owner); } /* Send ack? */ return 0; } /** @} */ int paxos_answer( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner_p) { int cmd; cmd = ntohl(msg->header.cmd); /* These are in roughly chronological order. * What the first machine sends is an OP_PREPARING * (see paxos_start_round()), which gets received * (below) from the others ... */ switch (cmd) { case OP_PREPARING: return answer_PREP(tk, from, msg, ballot, new_owner_p); case OP_REJECTED: return answer_REJ(tk, from, msg, ballot, new_owner_p); case OP_PROMISING: return got_a_PROM(tk, from, msg, ballot, new_owner_p); case OP_PROPOSING: return answer_PROP(tk, from, msg, ballot, new_owner_p); case OP_ACCEPTING: return got_an_ACC(tk, from, msg, ballot, new_owner_p); case OP_COMMITTED: return answer_COMM(tk, from, msg, ballot, new_owner_p); default: log_error("unprocessed message, cmd %x", cmd); return -EINVAL; } } diff --git a/src/paxos.h b/src/paxos.h index 2bf29bb..a819cb5 100644 --- a/src/paxos.h +++ b/src/paxos.h @@ -1,42 +1,42 @@ /* * Copyright (C) 2011 Jiaju Zhang - * Copyright (C) 2013 Philipp Marek + * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _PAXOS_H #define _PAXOS_H #include "config.h" #include "ticket.h" #define PROPOSER 0x4 #define ACCEPTOR 0x2 #define LEARNER 0x1 int paxos_answer( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner_p); int paxos_start_round(struct ticket_config *tk, struct booth_site *new_owner); void abort_proposal(struct ticket_config *tk); #endif /* _PAXOS_H */ diff --git a/src/ticket.c b/src/ticket.c index a16ed8c..afadaf4 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -1,631 +1,631 @@ /* * Copyright (C) 2011 Jiaju Zhang - * Copyright (C) 2013 Philipp Marek + * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include "ticket.h" #include "config.h" #include "pacemaker.h" #include "inline-fn.h" #include "log.h" #include "booth.h" #include "paxos.h" #define TK_LINE 256 /* Untrusted input, must fit (incl. \0) in a buffer of max chars. */ int check_max_len_valid(const char *s, int max) { int i; for(i=0; iticket_count; i++) { if (!strcmp(booth_conf->ticket[i].name, ticket)) { if (found) *found = booth_conf->ticket + i; return 1; } } return 0; } int check_ticket(char *ticket, struct ticket_config **found) { if (found) *found = NULL; if (!booth_conf) return 0; if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name))) return 0; return find_ticket_by_name(ticket, found); } int check_site(char *site, int *is_local) { struct booth_site *node; if (!check_max_len_valid(site, sizeof(node->addr_string))) return 0; if (find_site_by_name(site, &node, 0)) { *is_local = node->local; return 1; } return 0; } /** Find out what others think about this ticket. * * If we're a SITE, we can ask (and have to tell) Pacemaker. * An ARBITRATOR can only ask others. */ static int ticket_send_catchup(struct ticket_config *tk) { int i, rv = 0; struct booth_site *site; struct boothc_ticket_msg msg; foreach_node(i, site) { if (!site->local) { init_ticket_msg(&msg, CMD_CATCHUP, RLT_SUCCESS, tk); log_debug("attempting catchup from %s", site->addr_string); rv = booth_udp_send(site, &msg, sizeof(msg)); } } ticket_activate_timeout(tk); return rv; } int ticket_write(struct ticket_config *tk) { if (local->type != SITE) return -EINVAL; disown_if_expired(tk); pcmk_handler.store_ticket(tk); if (tk->owner == local) { pcmk_handler.grant_ticket(tk); } else { pcmk_handler.revoke_ticket(tk); } return 0; } /** Try to get the ticket for the local site. * */ int do_grant_ticket(struct ticket_config *tk) { int rv; if (tk->owner == local) return RLT_SUCCESS; if (tk->owner) return RLT_OVERGRANT; rv = paxos_start_round(tk, local); return rv; } /** Start a PAXOS round for revoking. * That can be started from any site. */ int do_revoke_ticket(struct ticket_config *tk) { int rv; if (!tk->owner) return RLT_SUCCESS; rv = paxos_start_round(tk, NULL); return rv; } int list_ticket(char **pdata, unsigned int *len) { struct ticket_config *tk; char timeout_str[64]; char *data, *cp; int i, alloc; *pdata = NULL; *len = 0; alloc = 256 + booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128); data = malloc(alloc); if (!data) return -ENOMEM; cp = data; foreach_ticket(i, tk) { if (tk->expires != 0) strftime(timeout_str, sizeof(timeout_str), "%F %T", localtime(&tk->expires)); else strcpy(timeout_str, "INF"); cp += sprintf(cp, "ticket: %s, owner: %s, expires: %s\n", tk->name, tk->owner ? tk->owner->addr_string : "None", timeout_str); *len = cp - data; assert(*len < alloc); } *pdata = data; return 0; } int setup_ticket(void) { struct ticket_config *tk; int i; /* TODO */ foreach_ticket(i, tk) { tk->owner = NULL; tk->expires = 0; abort_proposal(tk); if (local->role & PROPOSER) { pcmk_handler.load_ticket(tk); } } return 0; } int ticket_answer_list(int fd, struct boothc_ticket_msg *msg) { char *data; int olen, rv; struct boothc_header hdr; rv = list_ticket(&data, &olen); if (rv < 0) return rv; init_header(&hdr, CMR_LIST, RLT_SUCCESS, sizeof(hdr) + olen); return send_header_plus(fd, &hdr, data, olen); } int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { log_error("Client asked to grant unknown ticket"); rv = RLT_INVALID_ARG; goto reply; } if (tk->owner) { log_error("client wants to get an granted ticket %s", msg->ticket.id); rv = RLT_OVERGRANT; goto reply; } rv = do_grant_ticket(tk); reply: init_header(&msg->header, CMR_GRANT, rv ?: RLT_ASYNC, sizeof(*msg)); return send_ticket_msg(fd, msg); } int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { log_error("Client asked to grant unknown ticket"); rv = RLT_INVALID_ARG; goto reply; } if (!tk->owner) { log_info("client wants to revoke a free ticket \"%s\"", msg->ticket.id); rv = RLT_SUCCESS; goto reply; } rv = do_revoke_ticket(tk); reply: init_ticket_msg(msg, CMR_REVOKE, rv ?: RLT_ASYNC, tk); return send_ticket_msg(fd, msg); } /** Got a CMD_CATCHUP query. * In this file because it's mostly used during startup. */ static int ticket_answer_catchup( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { int rv; log_debug("got CATCHUP query for \"%s\" from %s", msg->ticket.id, from->addr_string); /* We do _always_ answer. * In case all booth daemons are restarted at the same time, nobody * would answer any questions, leading to timeouts and delays. * Just admit we don't know. */ rv = (tk->state == ST_INIT) ? RLT_PROBABLY_SUCCESS : RLT_SUCCESS; init_ticket_msg(msg, CMR_CATCHUP, rv, tk); return booth_udp_send(from, msg, sizeof(*msg)); } /** Got a CMR_CATCHUP message. * Gets handled here because it's not PAXOS per se, * but only needed during startup. */ static int ticket_process_catchup( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { int rv; log_info("got CATCHUP answer for \"%s\" from %s; says owner %s with ballot %d", tk->name, from->addr_string, ticket_owner_string(new_owner), ballot); rv = ntohl(msg->header.result); if (rv != RLT_SUCCESS && rv != RLT_PROBABLY_SUCCESS) { log_error("dropped because of wrong rv: 0x%x", rv); return -EINVAL; } if (ballot == tk->new_ballot && ballot == tk->last_ack_ballot && new_owner == tk->owner) { /* Peer says the same thing we're believing. */ tk->proposal_acknowledges |= from->bitmask; tk->expires = ntohl(msg->ticket.expiry) + time(NULL); if (promote_ticket_state(tk)) { if (tk->state == ST_INIT) tk->state = ST_STABLE; } disown_if_expired(tk); log_debug("catchup: peer ack 0x%" PRIx64 ", now state '%s'", tk->proposal_acknowledges, STATE_STRING(tk->state)); goto ex; } if (ticket_valid_for(tk) == 0 && !tk->owner) { /* We see the ticket as expired, and therefore don't know an owner. * So believe some other host. */ tk->state = ST_STABLE; log_debug("catchup: no owner locally, believe peer."); goto accept; } if (ballot >= tk->new_ballot && ballot >= tk->last_ack_ballot && rv == RLT_SUCCESS) { /* Peers seems to know better, but as yet we only have _her_ * word for that. */ log_debug("catchup: peer has higher ballot: %d >= %d/%d", ballot, tk->new_ballot, tk->last_ack_ballot); accept: tk->expires = ntohl(msg->ticket.expiry) + time(NULL); tk->new_ballot = ballot_max2(ballot, tk->new_ballot); tk->last_ack_ballot = ballot_max2(ballot, tk->last_ack_ballot); tk->owner = new_owner; tk->proposal_acknowledges = from->bitmask; /* We stay in ST_INIT and wait for confirmation. */ goto ex; } if (ballot >= tk->last_ack_ballot && rv == RLT_PROBABLY_SUCCESS && tk->state == ST_INIT && tk->retry_number > 3) { /* Peer seems to know better than us, and there's no * convincing other report. Just take it. */ tk->state = ST_STABLE; log_debug("catchup: exceeded retries, peer has higher ballot."); goto accept; } if (ballot < tk->new_ballot || ballot < tk->last_ack_ballot) { /* Peer seems outdated ... tell it to reload? */ log_debug("catchup: peer outdated?"); #if 0 init_ticket_msg(&msg, CMD_DO_CATCHUP, RLT_SUCCESS, tk, &tk->current_state); #endif goto ex; } log_debug("catchup: unhandled situation!"); ex: ticket_write(tk); return 0; } /** Send new state request to all sites. * Perhaps this should take a flag for ACCEPTOR etc.? * No need currently, as all nodes are more or less identical. */ int ticket_broadcast_proposed_state(struct ticket_config *tk, cmd_request_t state) { struct boothc_ticket_msg msg; tk->proposal_acknowledges = local->bitmask; tk->state = state; init_ticket_msg(&msg, state, RLT_SUCCESS, tk); msg.ticket.owner = htonl(get_node_id(tk->proposed_owner)); log_debug("broadcasting '%s' for ticket \"%s\"", STATE_STRING(state), tk->name); return transport()->broadcast(&msg, sizeof(msg)); } static void ticket_cron(struct ticket_config *tk) { time_t now; now = time(NULL); switch(tk->state) { case ST_INIT: /* Unknown state, ask others. */ ticket_send_catchup(tk); return; case OP_COMMITTED: case ST_STABLE: /* Has an owner, has an expiry date, and expiry date in the past? */ if (tk->expires && tk->owner && now > tk->expires) { log_info("LOST ticket: \"%s\" no longer at %s", tk->name, ticket_owner_string(tk->owner)); /* Couldn't renew in time - ticket lost. */ tk->owner = NULL; abort_proposal(tk); ticket_write(tk); if (tk->acquire_after) ticket_next_cron_in(tk, tk->acquire_after); } /* Do we need to refresh? */ if (tk->owner == local && ticket_valid_for(tk) < tk->expiry/2) { log_info("RENEW ticket \"%s\"", tk->name); paxos_start_round(tk, local); /* TODO: remember when we started, and restart afresh after some retries */ } break; case OP_PREPARING: case OP_PROPOSING: tk->retry_number ++; if (tk->retry_number > RETRIES) { log_info("ABORT %s for ticket \"%s\" - " "not enough answers after %d retries", tk->state == OP_PREPARING ? "prepare" : "propose", tk->name, tk->retry_number); abort_proposal(tk); } else { /* We ask others for a change; retry to get consensus. */ ticket_broadcast_proposed_state(tk, tk->state); ticket_activate_timeout(tk); } break; case OP_PROMISING: case OP_ACCEPTING: case OP_RECOVERY: case OP_REJECTED: break; default: break; } } void process_tickets(void) { struct ticket_config *tk; int i; time_t now; time(&now); foreach_ticket(i, tk) { if (0) log_debug("ticket %s next cron %" PRIx64 ", now %" PRIx64 ", in %" PRIi64, tk->name, (uint64_t)tk->next_cron, (uint64_t)now, (int64_t)tk->next_cron - now); if (tk->next_cron > now) continue; log_debug("ticket cron: doing %s", tk->name); /* Set next value, handler may override. */ tk->next_cron = INT_MAX; ticket_cron(tk); } } void tickets_log_info(void) { struct ticket_config *tk; int i; foreach_ticket(i, tk) { log_info("Ticket %s: state '%s' " "mask %" PRIx64 "/%" PRIx64 " " "ballot %d (current %d) " "expires %-24.24s", tk->name, STATE_STRING(tk->state), tk->proposal_acknowledges, booth_conf->site_bits, tk->last_ack_ballot, tk->new_ballot, ctime(&tk->expires)); } } /* UDP message receiver. */ int message_recv(struct boothc_ticket_msg *msg, int msglen) { int cmd, rv; uint32_t from; struct booth_site *dest; struct ticket_config *tk; struct booth_site *new_owner_p; uint32_t ballot, new_owner; if (check_boothc_header(&msg->header, sizeof(*msg)) < 0 || msglen != sizeof(*msg)) { log_error("message receive error"); return -1; } from = ntohl(msg->header.from); if (!find_site_by_id(from, &dest) || !dest) { log_error("unknown sender: %08x", from); return -1; } if (!check_ticket(msg->ticket.id, &tk)) { log_error("got invalid ticket name \"%s\" from %s", msg->ticket.id, dest->addr_string); return -EINVAL; } cmd = ntohl(msg->header.cmd); ballot = ntohl(msg->ticket.ballot); new_owner = ntohl(msg->ticket.owner); if (!find_site_by_id(new_owner, &new_owner_p)) { log_error("Message with unknown owner %x received", new_owner); return -EINVAL; } switch (cmd) { case CMD_CATCHUP: return ticket_answer_catchup(tk, dest, msg, ballot, new_owner_p); case CMR_CATCHUP: return ticket_process_catchup(tk, dest, msg, ballot, new_owner_p); default: /* only used in catchup, and not even really there ?? */ assert(ntohl(msg->header.result) == 0); rv = paxos_answer(tk, dest, msg, ballot, new_owner_p); assert((tk->proposal_acknowledges & ~booth_conf->site_bits) == 0); return rv; } return 0; } diff --git a/src/ticket.h b/src/ticket.h index 637f18f..0480b97 100644 --- a/src/ticket.h +++ b/src/ticket.h @@ -1,83 +1,83 @@ /* * Copyright (C) 2011 Jiaju Zhang - * Copyright (C) 2013 Philipp Marek + * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _TICKET_H #define _TICKET_H #include #include "config.h" #define DEFAULT_TICKET_EXPIRY 600 #define DEFAULT_TICKET_TIMEOUT 10 #define foreach_ticket(i_,t_) for(i=0; (t_=booth_conf->ticket+i, iticket_count); i++) #define foreach_node(i_,n_) for(i=0; (n_=booth_conf->site+i, isite_count); i++) int check_ticket(char *ticket, struct ticket_config **tc); int check_site(char *site, int *local); int do_grant_ticket(struct ticket_config *ticket); int revoke_ticket(struct ticket_config *ticket); int list_ticket(char **pdata, unsigned int *len); int message_recv(struct boothc_ticket_msg *msg, int msglen); int setup_ticket(void); int check_max_len_valid(const char *s, int max); int do_grant_ticket(struct ticket_config *tk); int do_revoke_ticket(struct ticket_config *tk); int find_ticket_by_name(const char *ticket, struct ticket_config **found); int ticket_answer_list(int fd, struct boothc_ticket_msg *msg); int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg); int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg); int ticket_broadcast_proposed_state(struct ticket_config *tk, cmd_request_t state); int ticket_write(struct ticket_config *tk); void process_tickets(void); void tickets_log_info(void); static inline void ticket_next_cron_at(struct ticket_config *tk, time_t when) { tk->next_cron = when; } static inline void ticket_next_cron_in(struct ticket_config *tk, int seconds) { ticket_next_cron_at(tk, time(NULL) + seconds); } static inline void ticket_activate_timeout(struct ticket_config *tk) { /* TODO: increase timeout when no answers */ ticket_next_cron_in(tk, tk->timeout); tk->retry_number ++; } #endif /* _TICKET_H */ diff --git a/src/transport.c b/src/transport.c index fbc9fad..ffa6f88 100644 --- a/src/transport.c +++ b/src/transport.c @@ -1,673 +1,673 @@ /* * Copyright (C) 2011 Jiaju Zhang - * Copyright (C) 2013 Philipp Marek + * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include "booth.h" #include "inline-fn.h" #include "log.h" #include "config.h" #include "ticket.h" #include "transport.h" #define BOOTH_IPADDR_LEN (sizeof(struct in6_addr)) #define NETLINK_BUFSIZE 16384 #define SOCKET_BUFFER_SIZE 160000 #define FRAME_SIZE_MAX 10000 struct booth_site *local = NULL; static int (*deliver_fn) (void *msg, int msglen); static void parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int len) { while (RTA_OK(rta, len)) { if (rta->rta_type <= max) tb[rta->rta_type] = rta; rta = RTA_NEXT(rta,len); } } static int find_address(unsigned char ipaddr[BOOTH_IPADDR_LEN], int family, int prefixlen, int fuzzy_allowed, struct booth_site **me) { int i; struct booth_site *node; int bytes, bits_left, mask; unsigned char node_bits, ip_bits; uint8_t *n_a; bytes = prefixlen / 8; bits_left = prefixlen % 8; /* One bit left to check means ignore 7 lowest bits. */ mask = ~( (1 << (8 - bits_left)) -1); for (i = 0; i < booth_conf->site_count; i++) { node = booth_conf->site + i; if (family != node->family) continue; n_a = node_to_addr_pointer(node); if (memcmp(ipaddr, n_a, node->addrlen) == 0) { found: *me = node; return 1; } if (!fuzzy_allowed) continue; /* Check prefix, whole bytes */ if (memcmp(ipaddr, n_a, bytes) != 0) continue; if (!bits_left) goto found; node_bits = n_a[bytes]; ip_bits = ipaddr[bytes]; if (((node_bits ^ ip_bits) & mask) == 0) goto found; } return 0; } int _find_myself(int family, struct booth_site **mep, int fuzzy_allowed); int _find_myself(int family, struct booth_site **mep, int fuzzy_allowed) { int fd; struct sockaddr_nl nladdr; struct booth_site *me; unsigned char ipaddr[BOOTH_IPADDR_LEN]; static char rcvbuf[NETLINK_BUFSIZE]; struct { struct nlmsghdr nlh; struct rtgenmsg g; } req; if (local) goto found; me = NULL; if (mep) *mep = NULL; fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE); if (fd < 0) { log_error("failed to create netlink socket"); return 0; } setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); memset(&nladdr, 0, sizeof(nladdr)); nladdr.nl_family = AF_NETLINK; memset(&req, 0, sizeof(req)); req.nlh.nlmsg_len = sizeof(req); req.nlh.nlmsg_type = RTM_GETADDR; req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST; req.nlh.nlmsg_pid = 0; req.nlh.nlmsg_seq = 1; req.g.rtgen_family = family; if (sendto(fd, (void *)&req, sizeof(req), 0, (struct sockaddr*)&nladdr, sizeof(nladdr)) < 0) { close(fd); log_error("failed to send data to netlink socket"); return 0; } while (1) { int status; struct nlmsghdr *h; struct iovec iov = { rcvbuf, sizeof(rcvbuf) }; struct msghdr msg = { (void *)&nladdr, sizeof(nladdr), &iov, 1, NULL, 0, 0 }; status = recvmsg(fd, &msg, 0); if (!status) { close(fd); log_error("failed to recvmsg from netlink socket"); return 0; } h = (struct nlmsghdr *)rcvbuf; if (h->nlmsg_type == NLMSG_DONE) break; if (h->nlmsg_type == NLMSG_ERROR) { close(fd); log_error("netlink socket recvmsg error"); return 0; } while (NLMSG_OK(h, status)) { if (h->nlmsg_type == RTM_NEWADDR) { struct ifaddrmsg *ifa = NLMSG_DATA(h); struct rtattr *tb[IFA_MAX+1]; int len = h->nlmsg_len - NLMSG_LENGTH(sizeof(*ifa)); memset(tb, 0, sizeof(tb)); parse_rtattr(tb, IFA_MAX, IFA_RTA(ifa), len); memset(ipaddr, 0, BOOTH_IPADDR_LEN); memcpy(ipaddr, RTA_DATA(tb[IFA_ADDRESS]), BOOTH_IPADDR_LEN); if (find_address(ipaddr, ifa->ifa_family, ifa->ifa_prefixlen, fuzzy_allowed, &me)) goto out; } h = NLMSG_NEXT(h, status); } } out: close(fd); if (!me) return 0; me->local = 1; local = me; found: if (mep) *mep = local; return 1; } int find_myself(struct booth_site **mep, int fuzzy_allowed) { return _find_myself(AF_INET6, mep, fuzzy_allowed) || _find_myself(AF_INET, mep, fuzzy_allowed); } /** Checks the header fields for validity. * cf. init_header(). * For @len_incl_data < 0 the length is not checked. * Return <0 if error, else bytes read. */ int check_boothc_header(struct boothc_header *h, int len_incl_data) { int l; if (h->magic != htonl(BOOTHC_MAGIC)) { log_error("magic error %x", ntohl(h->magic)); return -EINVAL; } if (h->version != htonl(BOOTHC_VERSION)) { log_error("version error %x", ntohl(h->version)); return -EINVAL; } l = ntohl(h->length); if (l < sizeof(*h)) { log_error("length %d out of range", l); return -EINVAL; } if (len_incl_data < 0) return 0; if (l != len_incl_data) { log_error("length error - got %d, wanted %d", l, len_incl_data); return -EINVAL; } return len_incl_data; } static void process_tcp_listener(int ci) { int fd, i, one = 1; socklen_t addrlen = sizeof(struct sockaddr); struct sockaddr addr; fd = accept(clients[ci].fd, &addr, &addrlen); if (fd < 0) { log_error("process_tcp_listener: accept error %d %d", fd, errno); return; } setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one)); i = client_add(fd, clients[ci].transport, process_connection, NULL); log_debug("client connection %d fd %d", i, fd); } static int setup_tcp_listener(void) { int s, rv; s = socket(local->family, SOCK_STREAM, 0); if (s == -1) { log_error("failed to create tcp socket %s", strerror(errno)); return s; } rv = bind(s, &local->sa6, local->saddrlen); if (rv == -1) { log_error("failed to bind socket %s", strerror(errno)); return rv; } rv = listen(s, 5); if (rv == -1) { log_error("failed to listen on socket %s", strerror(errno)); return rv; } return s; } static int booth_tcp_init(void * unused __attribute__((unused))) { int rv; if (get_local_id() < 0) return -1; rv = setup_tcp_listener(); if (rv < 0) return rv; client_add(rv, booth_transport + TCP, process_tcp_listener, NULL); return 0; } static int connect_nonb(int sockfd, const struct sockaddr *saptr, socklen_t salen, int sec) { int flags, n, error; socklen_t len; fd_set rset, wset; struct timeval tval; flags = fcntl(sockfd, F_GETFL, 0); fcntl(sockfd, F_SETFL, flags | O_NONBLOCK); error = 0; if ( (n = connect(sockfd, saptr, salen)) < 0) if (errno != EINPROGRESS) return -1; if (n == 0) goto done; /* connect completed immediately */ FD_ZERO(&rset); FD_SET(sockfd, &rset); wset = rset; tval.tv_sec = sec; tval.tv_usec = 0; if ((n = select(sockfd + 1, &rset, &wset, NULL, sec ? &tval : NULL)) == 0) { /* leave outside function to close */ /* timeout */ /* close(sockfd); */ errno = ETIMEDOUT; return -1; } if (FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) { len = sizeof(error); if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) return -1; /* Solaris pending error */ } else { log_error("select error: sockfd not set"); return -1; } done: fcntl(sockfd, F_SETFL, flags); /* restore file status flags */ if (error) { /* leave outside function to close */ /* close(sockfd); */ errno = error; return -1; } return 0; } int booth_tcp_open(struct booth_site *to) { int s, rv; if (to->tcp_fd >= STDERR_FILENO) goto found; s = socket(to->family, SOCK_STREAM, 0); if (s == -1) { log_error("cannot create socket of family %d", to->family); return -1; } rv = connect_nonb(s, (struct sockaddr *)&to->sa6, to->saddrlen, 10); if (rv == -1) { if( errno == ETIMEDOUT) log_error("connection to %s timeout", to->addr_string); else log_error("connection to %s error %s", to->addr_string, strerror(errno)); goto error; } to->tcp_fd = s; found: return 1; error: if (s >= 0) close(s); return -1; } int booth_tcp_send(struct booth_site *to, void *buf, int len) { return do_write(to->tcp_fd, buf, len); } static int booth_tcp_recv(struct booth_site *from, void *buf, int len) { int got; /* Needs timeouts! */ got = do_read(from->tcp_fd, buf, len); if (got < 0) return got; return len; } static int booth_tcp_close(struct booth_site *to) { if (to) { if (to->tcp_fd > STDERR_FILENO) close(to->tcp_fd); to->tcp_fd = -1; } return 0; } static int booth_tcp_exit(void) { return 0; } int setup_udp_server(int try_only) { int rv, fd; unsigned int recvbuf_size; fd = socket(local->family, SOCK_DGRAM, 0); if (fd == -1) { log_error("failed to create UDP socket %s", strerror(errno)); goto ex; } rv = fcntl(fd, F_SETFL, O_NONBLOCK); if (rv == -1) { log_error("failed to set non-blocking operation " "on UDP socket: %s", strerror(errno)); goto ex; } rv = bind(fd, (struct sockaddr *)&local->sa6, local->saddrlen); if (try_only) { rv = (rv == -1) ? errno : 0; close(fd); return rv; } if (rv == -1) { log_error("failed to bind UDP socket to [%s]:%d: %s", local->addr_string, booth_conf->port, strerror(errno)); goto ex; } recvbuf_size = SOCKET_BUFFER_SIZE; rv = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, sizeof(recvbuf_size)); if (rv == -1) { log_error("failed to set recvbuf size"); goto ex; } local->udp_fd = fd; return 0; ex: if (fd >= 0) close(fd); return -1; } /* Receive/process callback for UDP */ static void process_recv(int ci) { struct sockaddr_storage sa; int rv; socklen_t sa_len; char buffer[256]; /* Used for unit tests */ struct boothc_ticket_msg *msg; sa_len = sizeof(sa); msg = (void*)buffer; rv = recvfrom(clients[ci].fd, buffer, sizeof(buffer), MSG_NOSIGNAL | MSG_DONTWAIT, (struct sockaddr *)&sa, &sa_len); if (rv == -1) return; deliver_fn(msg, rv); } static int booth_udp_init(void *f) { int rv; rv = setup_udp_server(0); if (rv < 0) return rv; deliver_fn = f; client_add(local->udp_fd, booth_transport + UDP, process_recv, NULL); return 0; } int booth_udp_send(struct booth_site *to, void *buf, int len) { int rv; rv = sendto(local->udp_fd, buf, len, MSG_NOSIGNAL, (struct sockaddr *)&to->sa6, to->saddrlen); return rv; } static int booth_udp_broadcast(void *buf, int len) { int i; struct booth_site *site; if (!booth_conf || !booth_conf->site_count) return -1; foreach_node(i, site) { if (site != local) booth_udp_send(site, buf, len); } return 0; } static int booth_udp_exit(void) { return 0; } /* SCTP transport layer has not been developed yet */ static int booth_sctp_init(void *f __attribute__((unused))) { return 0; } static int booth_sctp_send(struct booth_site * to __attribute__((unused)), void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int booth_sctp_broadcast(void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int return_0_booth_site(struct booth_site *v __attribute((unused))) { return 0; } static int return_0(void) { return 0; } const struct booth_transport booth_transport[TRANSPORT_ENTRIES] = { [TCP] = { .name = "TCP", .init = booth_tcp_init, .open = booth_tcp_open, .send = booth_tcp_send, .recv = booth_tcp_recv, .close = booth_tcp_close, .exit = booth_tcp_exit }, [UDP] = { .name = "UDP", .init = booth_udp_init, .open = return_0_booth_site, .send = booth_udp_send, .close = return_0_booth_site, .broadcast = booth_udp_broadcast, .exit = booth_udp_exit }, [SCTP] = { .name = "SCTP", .init = booth_sctp_init, .open = return_0_booth_site, .send = booth_sctp_send, .broadcast = booth_sctp_broadcast, .exit = return_0, } }; const struct booth_transport *local_transport = booth_transport+TCP; int send_header_only(int fd, struct boothc_header *hdr) { int rv; rv = do_write(fd, hdr, sizeof(*hdr)); return rv; } int send_ticket_msg(int fd, struct boothc_ticket_msg *msg) { int rv; rv = do_write(fd, msg, sizeof(*msg)); return rv; } int send_header_plus(int fd, struct boothc_header *hdr, void *data, int len) { int rv; int l; if (data == hdr->data) { l = sizeof(*hdr) + len; assert(l == ntohl(hdr->length)); /* One struct */ rv = do_write(fd, hdr, l); } else { /* Header and data in two locations */ rv = send_header_only(fd, hdr); if (rv >= 0 && len) rv = do_write(fd, data, len); } return rv; }