diff --git a/script/ocf/booth-site b/script/ocf/booth-site index a34afc0..d4c4fc6 100755 --- a/script/ocf/booth-site +++ b/script/ocf/booth-site @@ -1,245 +1,245 @@ #!/bin/bash # vim: set sw=4 et : # # Resource Agent for BOOTH site daemon. # # This program is free software; you can redistribute it and/or modify # it under the terms of version 2 of the GNU General Public License as # published by the Free Software Foundation. # # This program is distributed in the hope that it would be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # # Further, this software is distributed without any warranty that it is # free of the rightful claim of any third person regarding infringement # or the like. Any license provided herein, whether implied or # otherwise, applies only to this software file. Patent licenses, if # any, provided herein do not apply to combinations of this program with # other software, or any other product whatsoever. # # You should have received a copy of the GNU General Public License # along with this program; if not, write the Free Software Foundation, # Inc., 59 Temple Place - Suite 330, Boston MA 02111-1307, USA. # ####################################################################### # Initialization: BOOTH_DAEMON_STARTED=0 BOOTH_DAEMON_STARTING=1 BOOTH_DAEMON_EXIST=2 BOOTH_DAEMON_NOT_RUNNING=3 BOOTH_ERROR_GENERIC=4 . ${OCF_ROOT}/resource.d/heartbeat/.ocf-shellfuncs ####################################################################### meta_data() { cat < 1.0 This Resource Agent can control the BOOTH site daemon. It assumes that the binary boothd is in your default PATH. In most cases, it should be run as a primitive resource. BOOTH site daemon The configuration name (or configuration filename) to use. BOOTH Options Any additional options to start the BOOTH daemon with BOOTH Options The daemon to start The daemon to start END } ####################################################################### booth_usage() { cat < * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _BOOTH_H #define _BOOTH_H #include #include #include #include #include #define BOOTH_LOG_DUMP_SIZE (1024*1024) #define BOOTH_RUN_DIR "/var/run/booth/" #define BOOTH_LOG_DIR "/var/log" #define BOOTH_LOGFILE_NAME "booth.log" #define BOOTH_DEFAULT_CONF "/etc/booth/booth.conf" #define DAEMON_NAME "booth" #define BOOTH_NAME_LEN 63 #define BOOTH_PATH_LEN 127 #define BOOTH_PROTO_FAMILY AF_INET #define BOOTHC_MAGIC 0x5F1BA08C -#define BOOTHC_VERSION 0x00010000 +#define BOOTHC_VERSION 0x00010002 struct boothc_header { + /** BOOTHC_MAGIC */ uint32_t magic; + /** BOOTHC_VERSION */ uint32_t version; + + /** Packet source; nodeid. See add_node(). */ + uint32_t from; + + /** Length including header */ + uint32_t length; + + /** The command, see cmd_request_t. cmp paxos_state_t ?? */ uint32_t cmd; - uint32_t expiry; - uint32_t len; + /** Result of operation. 0 == OK */ uint32_t result; + char data[0]; } __attribute__((packed)); typedef unsigned char boothc_site [BOOTH_NAME_LEN]; typedef unsigned char boothc_ticket[BOOTH_NAME_LEN]; -struct boothc_site_ticket_msg { - struct boothc_header header; +struct booth_node { + int nodeid; + int type; + int local; + + int role; + + char addr_string[BOOTH_NAME_LEN]; + + int tcp_fd; + + unsigned short family; + union { + struct sockaddr_in sa4; + struct sockaddr_in6 sa6; + }; + int saddrlen; + int addrlen; +} __attribute__((packed)); + + +extern struct booth_node *local; + +inline static int booth_get_myid(void) +{ + return local ? local->nodeid : -1; +} + + +struct ticket_data { + /** Ticket name. */ + boothc_ticket id; + + /** Owner. May be NO_OWNER. See add_node(). */ + uint32_t owner; + /* Better use that? but from is an int currently, too */ + boothc_site owner; + + /** POSIX timestamp? Or time until expiration? */ + uint32_t expiry; + + /* needed?? */ + /* From lease */ + uint32_t op; /* OP_START_LEASE, OP_STOP_LEASE? ?*/ +} __attribute__((packed)); + + +struct paxos_control_data { + /** Current protocol state. See paxos_state_t. */ + uint32_t state; + + /** Current ballot number. Might be < prev_ballot if overflown. */ + uint32_t ballot; + /** Previous ballot. */ + uint32_t prev_ballot; + + + /* From lease - needed? */ + uint32_t clear; /* NOT_CLEAR_RELEASE ? */ + uint32_t leased; /* has_been_leased by another node? */ +}; + +struct paxos_control_data { + +struct site_msg { boothc_site site; - boothc_ticket ticket; +}; + +struct boothc_ticket_site_msg { + struct boothc_header header; + struct ticket_msg ticket; + struct site_msg site; +} __attribute__((packed)); + +struct boothc_ticket_msg { + struct boothc_header header; + struct ticket_msg ticket; } __attribute__((packed)); +struct ticket_data { +}; + + typedef enum { - BOOTHC_CMD_LIST = 1, + BOOTHC_CMD_LIST = 0x30, BOOTHC_CMD_GRANT, BOOTHC_CMD_REVOKE, BOOTHC_CMD_CATCHUP, } cmd_request_t; typedef enum { - BOOTHC_RLT_ASYNC = 1, + BOOTHC_RLT_ASYNC = 0x40, BOOTHC_RLT_SYNC_SUCC, BOOTHC_RLT_SYNC_FAIL, BOOTHC_RLT_INVALID_ARG, BOOTHC_RLT_REMOTE_OP, BOOTHC_RLT_OVERGRANT, } cmd_result_t; struct client { int fd; void (*workfn)(int); void (*deadfn)(int); }; int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci)); int do_read(int fd, void *buf, size_t count); int do_write(int fd, void *buf, size_t count); void process_connection(int ci); void safe_copy(char *dest, char *value, size_t buflen, const char *description); + + +static inline void init_header(struct boothc_header *h, int cmd, + int result, int data_len) +{ + h->magic = htonl(BOOTHC_MAGIC); + h->version = htonl(BOOTHC_VERSION); + h->length = htonl(data_len); + h->cmd = htonl(cmd); + h->from = htonl(local->nodeid); + h->expiry = htonl(0); + h->result = htonl(result); +} + +static inline void init_ticket_site_header(struct boothc_ticket_site_msg *msg, int cmd) +{ + init_header(&msg->header, cmd, 0, sizeof(*msg)); +} + +static inline void init_ticket_msg(struct boothc_ticket_msg *msg, int cmd) +{ + init_header(&msg->header, cmd, 0, sizeof(*msg)); + memset(&msg->ticket, 0, sizeof(msg->ticket)); +} + + +static inline void init_ticket_site_msg(struct boothc_ticket_site_msg *msg, int cmd) +{ + init_ticket_site_header(msg, cmd); + memset(&msg->site, 0, sizeof(msg->site)); + memset(&msg->ticket, 0, sizeof(msg->ticket)); +} + + #endif /* _BOOTH_H */ diff --git a/src/config.c b/src/config.c index da9c5c3..b118516 100644 --- a/src/config.c +++ b/src/config.c @@ -1,407 +1,409 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include "booth.h" #include "config.h" #include "ticket.h" #include "log.h" static int ticket_size = 0; static int ticket_realloc(void) { void *p; booth_conf = realloc(booth_conf, sizeof(struct booth_config) + (ticket_size + TICKET_ALLOC) * sizeof(struct ticket_config)); if (!booth_conf) { log_error("can't alloc more booth config"); return -ENOMEM; } p = (char *) booth_conf + sizeof(struct booth_config) + ticket_size * sizeof(struct ticket_config); memset(p, 0, TICKET_ALLOC * sizeof(struct ticket_config)); ticket_size += TICKET_ALLOC; return 0; } int add_node(char *address, int type); int add_node(char *addr_string, int type) { int rv; struct booth_node *node; rv = 1; if (booth_conf->node_count == MAX_NODES) { log_error("too many nodes"); goto out; } if (strlen(addr_string)+1 >= sizeof(booth_conf->node[0].addr_string)) { log_error("node address \"%s\" too long", addr_string); goto out; } - node = booth_conf->node+booth_conf->node_count; + node = booth_conf->node + booth_conf->node_count; node->family = BOOTH_PROTO_FAMILY; node->type = type; - node->nodeid = booth_conf->node_count; + /* Make nodeid start at a non-zero point. + * Perhaps use hash over string or address? */ + node->nodeid = booth_conf->node_count * 0x11 + 0x98989011; strcpy(node->addr_string, addr_string); node->tcp_fd = -1; booth_conf->node_count++; rv = 0; memset(&node->sa6, 0, sizeof(node->sa6)); if (inet_pton(AF_INET, node->addr_string, &node->sa4.sin_addr) > 0) { node->family = AF_INET; node->sa4.sin_family = node->family; node->sa4.sin_port = htons(booth_conf->port); node->saddrlen = sizeof(node->sa4); node->addrlen = sizeof(node->sa4.sin_addr); } else if (inet_pton(AF_INET6, node->addr_string, &node->sa6.sin6_addr) > 0) { node->family = AF_INET6; node->sa6.sin6_family = node->family; node->sa6.sin6_flowinfo = 0; node->sa6.sin6_port = htons(booth_conf->port); node->saddrlen = sizeof(node->sa6); node->addrlen = sizeof(node->sa6.sin6_addr); } else { log_error("Address string \"%s\" is bad", node->addr_string); rv = EINVAL; } out: return rv; } inline static char *skip_while(char *cp, int (*fn)(int)) { while (fn(*cp)) cp++; return cp; } inline static char *skip_until(char *cp, char expected) { while (*cp && *cp != expected) cp++; return cp; } static inline int is_end_of_line(char *cp) { char c = *cp; return c == '\n' || c == 0 || c == '#'; } int read_config(const char *path) { char line[1024]; FILE *fp; char *s, *key, *val, *expiry, *weight, *c, *end_of_key; const char *cp, *error; int i; int lineno = 0; int got_transport = 0; fp = fopen(path, "r"); if (!fp) { log_error("failed to open %s: %s", path, strerror(errno)); return -1; } booth_conf = malloc(sizeof(struct booth_config) + TICKET_ALLOC * sizeof(struct ticket_config)); if (!booth_conf) { log_error("failed to alloc memory for booth config"); return -ENOMEM; } memset(booth_conf, 0, sizeof(struct booth_config) + TICKET_ALLOC * sizeof(struct ticket_config)); ticket_size = TICKET_ALLOC; booth_conf->proto = UDP; log_debug("reading config file %s", path); while (fgets(line, sizeof(line), fp)) { lineno++; s = skip_while(line, isspace); if (is_end_of_line(s)) continue; key = s; /* Key */ end_of_key = skip_while(key, isalnum); if (end_of_key == key) { error = "No key"; goto err; } if (!*end_of_key) goto exp_equal; /* whitespace, and something else but nothing more? */ s = skip_while(end_of_key, isspace); if (*s != '=') { exp_equal: error = "Expected '=' after key"; goto err; } s++; /* It's my buffer, and I terminate if I want to. */ /* But not earlier than that, because we had to check for = */ *end_of_key = 0; /* Value tokenizing */ s = skip_while(s, isspace); switch (*s) { case '"': case '\'': val = s+1; s = skip_until(val, *s); /* Terminate value */ if (!*s) { error = "Unterminated quoted string"; goto err; } /* Remove and skip quote */ *s = 0; s++; if (* skip_while(s, isspace)) { error = "Surplus data after value"; goto err; } *s = 0; break; case 0: no_value: error = "No value"; goto err; break; default: val = s; /* Rest of line. */ i = strlen(s); /* i > 0 because of "case 0" above. */ while (i > 0 && isspace(s[i-1])) i--; s += i; *s = 0; } if (val == s) goto no_value; if (strlen(key) > BOOTH_NAME_LEN || strlen(val) > BOOTH_NAME_LEN) { error = "key/value too long"; goto err; } if (strcmp(key, "transport") == 0) { if (got_transport) { error = "config file has multiple transport lines"; goto err; } if (strcasecmp(val, "UDP") == 0) booth_conf->proto = UDP; else if (strcasecmp(val, "SCTP") == 0) booth_conf->proto = SCTP; else { error = "invalid transport protocol"; goto err; } got_transport = 1; } if (strcmp(key, "port") == 0) booth_conf->port = atoi(val); if (strcmp(key, "name") == 0) { if(strlen(val)+1 >= BOOTH_NAME_LEN) { error = "Config name too long."; goto err; } } if (strcmp(key, "site") == 0) { if (add_node(val, SITE)) goto out; } if (strcmp(key, "arbitrator") == 0) { if (add_node(val, ARBITRATOR)) goto out; } if (strcmp(key, "ticket") == 0) { int count = booth_conf->ticket_count; if (booth_conf->ticket_count == ticket_size) { if (ticket_realloc() < 0) goto out; } expiry = index(val, ';'); weight = rindex(val, ';'); if (!expiry) { strcpy(booth_conf->ticket[count].name, val); booth_conf->ticket[count].expiry = DEFAULT_TICKET_EXPIRY; log_info("expire is not set in %s." " Set the default value %ds.", booth_conf->ticket[count].name, DEFAULT_TICKET_EXPIRY); } else if (expiry && expiry == weight) { *expiry++ = '\0'; while (*expiry == ' ') expiry++; strcpy(booth_conf->ticket[count].name, val); booth_conf->ticket[count].expiry = atoi(expiry); } else { *expiry++ = '\0'; *weight++ = '\0'; while (*expiry == ' ') expiry++; while (*weight == ' ') weight++; strcpy(booth_conf->ticket[count].name, val); booth_conf->ticket[count].expiry = atoi(expiry); i = 0; while ((c = index(weight, ','))) { *c++ = '\0'; booth_conf->ticket[count].weight[i++] = atoi(weight); while (*c == ' ') c++; weight = c; if (i == MAX_NODES) { error = "too many weights"; goto err; } } } booth_conf->ticket_count++; } } /* Default: make config name match config filename. */ if (!booth_conf->name[0]) { cp = strrchr(path, '/'); if (!cp) cp = path; /* TODO: locale? */ /* NUL-termination by memset. */ for(i=0; iname[i] = *(cp++); /* Last resort. */ if (!booth_conf->name[0]) strcpy(booth_conf->name, "booth"); } return 0; err: log_error("%s in config file line %d", error, lineno); out: free(booth_conf); booth_conf = NULL; return -1; } int check_config(int type) { if (!booth_conf) return -1; return 0; } int find_site_in_config(unsigned char *site, struct booth_node **node) { struct booth_node *n; int i; if (!booth_conf) return 0; for (i = 0; i < booth_conf->node_count; i++) { n = booth_conf->node + i; if (n->type == SITE && strcmp(n->addr_string, site) == 0) { *node = n; return 1; } } return 0; } const char *type_to_string(int type) { switch (type) { case ARBITRATOR: return "arbitrator"; case SITE: return "site"; case CLIENT: return "client"; } return "??invalid-type??"; } diff --git a/src/config.h b/src/config.h index 6670191..e43ffbd 100644 --- a/src/config.h +++ b/src/config.h @@ -1,61 +1,75 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _CONFIG_H #define _CONFIG_H #include #include "booth.h" +#include "config.h" +#include "paxos_lease.h" #include "transport.h" #define MAX_NODES 16 #define TICKET_ALLOC 16 +#define NO_OWNER (-1) + struct ticket_config { - int weight[MAX_NODES]; - int expiry; boothc_ticket name; + + /* How many seconds to hold it */ + int expiry; + /* Who has it. */ + int owner; struct booth_node *owner; ?? + + /** Timestamp of expiration. */ + time_t expires; + +// pl_handle_t handle; not needed? + + int weight[MAX_NODES]; }; struct booth_config { char name[BOOTH_NAME_LEN]; int node_count; int ticket_count; transport_layer_t proto; uint16_t port; struct booth_node node[MAX_NODES]; struct ticket_config ticket[0]; }; struct booth_config *booth_conf; int read_config(const char *path); int check_config(int type); int find_site_in_config(unsigned char *site, struct booth_node **node); const char *type_to_string(int type); static inline struct booth_transport const *transport(void) { return booth_transport + booth_conf->proto; } #endif /* _CONFIG_H */ diff --git a/src/main.c b/src/main.c index 908d188..f59438f 100644 --- a/src/main.c +++ b/src/main.c @@ -1,1236 +1,1249 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "log.h" #include "booth.h" #include "config.h" #include "transport.h" #include "timer.h" #include "pacemaker.h" #include "ticket.h" #define RELEASE_VERSION "1.0" #define CLIENT_NALLOC 32 int daemonize = 0; static int client_maxi; static int client_size = 0; struct client *client = NULL; struct pollfd *pollfd = NULL; typedef enum { BOOTHD_STARTED=0, BOOTHD_STARTING } BOOTH_DAEMON_STATE; int poll_timeout = -1; typedef enum { OP_LIST = 1, OP_GRANT, OP_REVOKE, } operation_t; struct command_line { int type; /* ACT_ */ int op; /* OP_ */ char configfile[BOOTH_PATH_LEN]; char lockfile[BOOTH_PATH_LEN]; - struct boothc_site_ticket_msg msg; + struct boothc_ticket_site_msg msg; }; static struct command_line cl; int do_read(int fd, void *buf, size_t count) { int rv, off = 0; while (off < count) { rv = read(fd, (char *)buf + off, count - off); if (rv == 0) return -1; if (rv == -1 && errno == EINTR) continue; if (rv == -1) return -1; off += rv; } return 0; } int do_write(int fd, void *buf, size_t count) { int rv, off = 0; retry: rv = write(fd, (char *)buf + off, count); if (rv == -1 && errno == EINTR) goto retry; /* If we cannot write _any_ data, we'd be in an (potential) loop. */ if (rv <= 0) { log_error("write failed: %s (%d)", strerror(errno), errno); return rv; } if (rv != count) { count -= rv; off += rv; goto retry; } return 0; } static int do_local_connect_and_write(void *data, int len, struct booth_node **ret) { struct booth_node *node; int rv; if (ret) *ret = NULL; /* Use locally reachable address, ie. in same cluster. */ if (!find_myself(&node, 1)) { log_error("Cannot find local cluster."); return ENOENT; } if (ret) *ret = node; /* Always use TCP within cluster. */ rv = booth_tcp_open(node); if (rv < 0) goto out; rv = booth_tcp_send(node, data, len); out: return rv; } -static void init_header(struct boothc_header *h, int cmd, - int result, int data_len) -{ - memset(h, 0, sizeof(struct boothc_header)); - - h->magic = BOOTHC_MAGIC; - h->version = BOOTHC_VERSION; - h->len = data_len; - h->cmd = cmd; - h->result = result; -} - static void client_alloc(void) { int i; if (!client) { client = malloc(CLIENT_NALLOC * sizeof(struct client)); pollfd = malloc(CLIENT_NALLOC * sizeof(struct pollfd)); } else { client = realloc(client, (client_size + CLIENT_NALLOC) * sizeof(struct client)); pollfd = realloc(pollfd, (client_size + CLIENT_NALLOC) * sizeof(struct pollfd)); if (!pollfd) log_error("can't alloc for pollfd"); } if (!client || !pollfd) log_error("can't alloc for client array"); for (i = client_size; i < client_size + CLIENT_NALLOC; i++) { client[i].workfn = NULL; client[i].deadfn = NULL; client[i].fd = -1; pollfd[i].fd = -1; pollfd[i].revents = 0; } client_size += CLIENT_NALLOC; } static void client_dead(int ci) { close(client[ci].fd); client[ci].workfn = NULL; client[ci].fd = -1; pollfd[ci].fd = -1; } int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci)) { int i; if (!client) client_alloc(); again: for (i = 0; i < client_size; i++) { if (client[i].fd == -1) { client[i].workfn = workfn; if (deadfn) client[i].deadfn = deadfn; else client[i].deadfn = client_dead; client[i].fd = fd; pollfd[i].fd = fd; pollfd[i].events = POLLIN; if (i > client_maxi) client_maxi = i; return i; } } client_alloc(); goto again; } void process_connection(int ci) { - struct boothc_site_ticket_msg msg; - char *data = NULL; - int ticket_owner; - int is_local, rv; + struct boothc_ticket_site_msg msg; + struct ticket_config *tc; + int is_local, rv, len, exp, olen; void (*deadfn) (int ci); + char *data; + rv = do_read(client[ci].fd, &msg.header, sizeof(msg.header)); if (rv < 0) { if (errno == ECONNRESET) log_debug("client %d connection reset for fd %d", - ci, client[ci].fd); + ci, client[ci].fd); - deadfn = client[ci].deadfn; - if(deadfn) { - deadfn(ci); - } - return; - } - if (msg.header.magic != BOOTHC_MAGIC) { - log_error("connection %d magic error %x", ci, msg.header.magic); - return; - } - if (msg.header.version != BOOTHC_VERSION) { - log_error("connection %d version error %x", ci, msg.header.version); - return; + goto kill; } - if (msg.header.len) { - if (msg.header.len != sizeof(msg) - sizeof(msg.header)) { - log_error("got wrong length %u", msg.header.len); + if (check_boothc_header(&msg.header, -1) < 0) + goto kill; + + /* Basic sanity checks already done. */ + len = ntohl(msg.header.length); + if (len) { + if (len != sizeof(msg)) { +bad_len: + log_error("got wrong length %u", len); return; } - rv = do_read(client[ci].fd, msg.header.data, msg.header.len); - if (rv < 0) { - log_error("connection %d read data error %d", ci, rv); - goto out; + exp = len - sizeof(msg.header); + rv = do_read(client[ci].fd, msg.header.data, exp); + if (rv != exp) { + log_error("connection %d read data error %d, wanted %d", + ci, rv, exp); + goto kill; } } - switch (msg.header.cmd) { - case BOOTHC_CMD_LIST: - assert(!data); - msg.header.result = list_ticket(&data, &msg.header.len); - break; - - case BOOTHC_CMD_GRANT: - msg.header.len = 0; - if (!check_ticket(msg.ticket)) { - msg.header.result = BOOTHC_RLT_INVALID_ARG; + olen = 0; + /* Commands have input msg; + * and output rv, data and olen (excluding header). */ + switch (ntohl(msg.header.cmd)) { + case BOOTHC_CMD_LIST: + assert(!data); + rv = list_ticket(&data, &olen); goto reply; - } - if (get_ticket_info(msg.ticket, &ticket_owner, NULL) == 0) { - if (ticket_owner > -1) { + case BOOTHC_CMD_GRANT: + /* Expect boothc_ticket_site_msg. */ + if (len != sizeof(msg)) + goto bad_len; + + /* Need to return ticket name etc. */ + olen = len; + data = msg.header.data; + if (!check_ticket(msg.ticket.id, &tc)) { + rv = BOOTHC_RLT_INVALID_ARG; + goto reply; + } + + if (tc->owner != NO_OWNER) { log_error("client want to get an granted " - "ticket %s", msg.ticket); - msg.header.result = BOOTHC_RLT_OVERGRANT; + "ticket %s", msg.ticket.id); + rv = BOOTHC_RLT_OVERGRANT; goto reply; } - } else { - log_error("can not get ticket %s's info", msg.ticket); - msg.header.result = BOOTHC_RLT_INVALID_ARG; - goto reply; - } - if (!check_site(msg.site, &is_local)) { - msg.header.result = BOOTHC_RLT_INVALID_ARG; - goto reply; - } - if (is_local) - msg.header.result = grant_ticket(msg.ticket); - else - msg.header.result = BOOTHC_RLT_REMOTE_OP; - break; + if (!check_site(msg.site.site, &is_local)) { + rv = BOOTHC_RLT_INVALID_ARG; + goto reply; + } + if (is_local) + rv = grant_ticket(msg.ticket.id); + else + rv = BOOTHC_RLT_REMOTE_OP; + break; - case BOOTHC_CMD_REVOKE: - msg.header.len = 0; - if (!check_ticket(msg.ticket)) { - msg.header.result = BOOTHC_RLT_INVALID_ARG; - goto reply; - } - if (!check_site(msg.site, &is_local)) { - msg.header.result = BOOTHC_RLT_INVALID_ARG; - goto reply; - } - if (is_local) - msg.header.result = revoke_ticket(msg.ticket); - else - msg.header.result = BOOTHC_RLT_REMOTE_OP; - break; + case BOOTHC_CMD_REVOKE: + /* Expect boothc_ticket_site_msg. */ + if (len != sizeof(msg)) + goto bad_len; - case BOOTHC_CMD_CATCHUP: - msg.header.result = catchup_ticket(&data, msg.header.len); - break; + olen = len; + data = msg.header.data; + if (!check_ticket(msg.ticket.id, &tc)) { + msg.header.result = BOOTHC_RLT_INVALID_ARG; + goto reply; + } + if (!check_site(msg.site.site, &is_local)) { + msg.header.result = BOOTHC_RLT_INVALID_ARG; + goto reply; + } + if (is_local) + msg.header.result = revoke_ticket(msg.ticket.id); + else + msg.header.result = BOOTHC_RLT_REMOTE_OP; + break; - default: - log_error("connection %d cmd %x unknown", ci, msg.header.cmd); - break; + case BOOTHC_CMD_CATCHUP: + /* Expect boothc_ticket_site_msg. */ + if (len != sizeof(msg)) + goto bad_len; + + /* Need to return ticket name etc. */ + olen = len; + data = msg.header.data; + + if (!check_ticket(msg.ticket.id, &tc)) { + rv = BOOTHC_RLT_INVALID_ARG; + goto reply; + } + + rv = catchup_ticket(&msg.ticket, tc); + /* Only answer if we're the owner. */ + if (rv == -1) + goto kill; + break; + + default: + log_error("connection %d cmd %x unknown", + ci, ntohl(msg.header.cmd)); + goto kill; } + reply: + msg.header.result = htonl(rv); + msg.header.length = htonl(olen + sizeof(msg.header)); + rv = do_write(client[ci].fd, &msg.header, sizeof(msg.header)); if (rv < 0) log_error("connection %d write error %d", ci, rv); - if (msg.header.len) { - rv = do_write(client[ci].fd, data, msg.header.len); + if (len) { + rv = do_write(client[ci].fd, data, olen); if (rv < 0) log_error("connection %d write error %d", ci, rv); } -out: - free(data); + + return; + +kill: + deadfn = client[ci].deadfn; + if(deadfn) { + deadfn(ci); + } + return; } static void process_listener(int ci) { int fd, i; fd = accept(client[ci].fd, NULL, NULL); if (fd < 0) { log_error("process_listener: accept error for fd %d: %s (%d)", client[ci].fd, strerror(errno), errno); if (client[ci].deadfn) client[ci].deadfn(ci); return; } i = client_add(fd, process_connection, NULL); log_debug("add client connection %d fd %d", i, fd); } static int setup_config(int type) { int rv; rv = read_config(cl.configfile); if (rv < 0) goto out; /* Set "local" pointer, ignoring errors. */ find_myself(NULL, 0); rv = check_config(type); if (rv < 0) goto out; /* Per default the PID file name is derived from the * configuration name. */ if (!cl.lockfile[0]) { snprintf(cl.lockfile, sizeof(cl.lockfile)-1, "%s/%s.pid", BOOTH_RUN_DIR, booth_conf->name); } out: return rv; } static int setup_transport(void) { int rv; rv = transport()->init(ticket_recv); if (rv < 0) { log_error("failed to init booth_transport %s", transport()->name); goto out; } rv = booth_transport[TCP].init(NULL); if (rv < 0) { log_error("failed to init booth_transport[TCP]"); goto out; } out: return rv; } static int setup_timer(void) { return timerlist_init(); } static int write_daemon_state(int fd, int state) { char buffer[1024]; int rv, size; size = sizeof(buffer) - 1; rv = snprintf(buffer, size, "booth_pid=%d " "booth_state=%s " "booth_type=%s " "booth_cfg_name='%s' " "booth_addr_string='%s' " "booth_port=%d\n", getpid(), ( state == BOOTHD_STARTED ? "started" : state == BOOTHD_STARTING ? "starting" : "invalid"), type_to_string(local->type), booth_conf->name, local->addr_string, booth_conf->port); if (rv < 0 || rv == size) { log_error("Buffer filled up in write_daemon_state()."); return -1; } size = rv; + rv = ftruncate(fd, 0); + if (rv < 0) { + log_error("lockfile %s truncate error %d: %s", + cl.lockfile, errno, strerror(errno)); + return rv; + } + + rv = lseek(fd, 0, SEEK_SET); if (rv < 0) { log_error("lseek set fd(%d) offset to 0 error, return(%d), message(%s)", fd, rv, strerror(errno)); rv = -1; return rv; } rv = write(fd, buffer, size); if (rv != size) { log_error("write to fd(%d, %d) returned %d, errno %d, message(%s)", fd, size, rv, errno, strerror(errno)); return -1; } return 0; } static int loop(int fd) { void (*workfn) (int ci); void (*deadfn) (int ci); int rv, i; rv = setup_timer(); if (rv < 0) goto fail; rv = setup_transport(); if (rv < 0) goto fail; rv = setup_ticket(); if (rv < 0) goto fail; client_add(rv, process_listener, NULL); rv = write_daemon_state(fd, BOOTHD_STARTED); if (rv != 0) { log_error("write daemon state %d to lockfile error %s: %s", BOOTHD_STARTED, cl.lockfile, strerror(errno)); goto fail; } if (cl.type == ARBITRATOR) log_info("BOOTH arbitrator daemon started"); else if (cl.type == SITE) log_info("BOOTH cluster site daemon started"); - while (1) { - rv = poll(pollfd, client_maxi + 1, poll_timeout); - if (rv == -1 && errno == EINTR) - continue; - if (rv < 0) { - log_error("poll failed: %s (%d)", strerror(errno), errno); - goto fail; - } - - for (i = 0; i <= client_maxi; i++) { - if (client[i].fd < 0) - continue; - if (pollfd[i].revents & POLLIN) { - workfn = client[i].workfn; - if (workfn) - workfn(i); - } - if (pollfd[i].revents & - (POLLERR | POLLHUP | POLLNVAL)) { - deadfn = client[i].deadfn; - if (deadfn) - deadfn(i); - } - } + while (1) { + rv = poll(pollfd, client_maxi + 1, poll_timeout); + if (rv == -1 && errno == EINTR) + continue; + if (rv < 0) { + log_error("poll failed: %s (%d)", strerror(errno), errno); + goto fail; + } + + for (i = 0; i <= client_maxi; i++) { + if (client[i].fd < 0) + continue; + if (pollfd[i].revents & POLLIN) { + workfn = client[i].workfn; + if (workfn) + workfn(i); + } + if (pollfd[i].revents & + (POLLERR | POLLHUP | POLLNVAL)) { + deadfn = client[i].deadfn; + if (deadfn) + deadfn(i); + } + } process_timerlist(); } return 0; fail: return -1; } static int query_get_string_answer(cmd_request_t cmd) { struct booth_node *node; - struct boothc_header h, *rh; - char *reply = NULL, *data; + struct boothc_header h, reply; + char *data; int data_len; int rv; + data = NULL; init_header(&h, cmd, 0, 0); rv = do_local_connect_and_write(&h, sizeof(h), &node); if (rv < 0) goto out; - reply = malloc(sizeof(struct boothc_header)); - if (!reply) { - rv = -ENOMEM; - goto out_close; - } - - rv = local_transport->recv(node, reply, sizeof(struct boothc_header)); + rv = local_transport->recv(node, &reply, sizeof(reply)); if (rv < 0) goto out_free; - rh = (struct boothc_header *)reply; - data_len = rh->len; + data_len = ntohl(reply.length) - sizeof(reply); - reply = realloc(reply, sizeof(struct boothc_header) + data_len); - if (!reply) { + data = malloc(data_len); + if (!data) { rv = -ENOMEM; goto out_free; } - data = reply + sizeof(struct boothc_header); rv = local_transport->recv(node, data, data_len); if (rv < 0) goto out_free; do_write(STDOUT_FILENO, data, data_len); rv = 0; out_free: - free(reply); -out_close: + free(data); local_transport->close(node); out: return rv; } static int do_command(cmd_request_t cmd) { struct booth_node *node, *to; struct boothc_header reply; int rv; node = NULL; to = NULL; - init_header(&cl.msg.header, cmd, 0, - sizeof(cl.msg) - sizeof(cl.msg.header)); + init_ticket_site_header(&cl.msg, cmd); rv = do_local_connect_and_write(&cl.msg, sizeof(cl.msg), &node); if (rv < 0) goto out_close; rv = local_transport->recv(node, &reply, sizeof(reply)); if (rv < 0) goto out_close; if (reply.result == BOOTHC_RLT_INVALID_ARG) { log_info("invalid argument!"); rv = -1; goto out_close; } - + if (reply.result == BOOTHC_RLT_OVERGRANT) { log_info("You're granting a granted ticket " "If you wanted to migrate a ticket," "use revoke first, then use grant"); rv = -1; goto out_close; } - + if (reply.result == BOOTHC_RLT_REMOTE_OP) { - if (!find_site_in_config(cl.msg.site, &to)) { - log_error("Redirected to unknown site %s.", cl.msg.site); + if (!find_site_in_config(cl.msg.site.site, &to)) { + log_error("Redirected to unknown site %s.", cl.msg.site.site); rv = -1; goto out_close; } rv = booth_transport[TCP].open(to); if (rv < 0) { goto out_close; } rv = booth_transport[TCP].send(to, &cl.msg, sizeof(cl.msg)); if (rv < 0) { booth_transport[TCP].close(to); goto out_close; } rv = booth_transport[TCP].recv(to, &reply, sizeof(struct boothc_header)); - if (rv < 0) { + if (rv < 0) { booth_transport[TCP].close(to); goto out_close; } booth_transport[TCP].close(to); } if (reply.result == BOOTHC_RLT_ASYNC) { if (cmd == BOOTHC_CMD_GRANT) log_info("grant command sent, result will be returned " "asynchronously, you can get the result from " "the log files"); else if (cmd == BOOTHC_CMD_REVOKE) log_info("revoke command sent, result will be returned " "asynchronously, you can get the result from " "the log files."); else log_error("internal error reading reply result!"); rv = 0; } else if (reply.result == BOOTHC_RLT_SYNC_SUCC) { if (cmd == BOOTHC_CMD_GRANT) log_info("grant succeeded!"); else if (cmd == BOOTHC_CMD_REVOKE) log_info("revoke succeeded!"); rv = 0; } else if (reply.result == BOOTHC_RLT_SYNC_FAIL) { if (cmd == BOOTHC_CMD_GRANT) log_info("grant failed!"); else if (cmd == BOOTHC_CMD_REVOKE) log_info("revoke failed!"); rv = -1; } else { log_error("internal error!"); rv = -1; } out_close: if (node) local_transport->close(node); if (to) booth_transport[TCP].close(to); return rv; } static int do_grant(void) { return do_command(BOOTHC_CMD_GRANT); } static int do_revoke(void) { return do_command(BOOTHC_CMD_REVOKE); } static int _lockfile(int mode, int *fdp, pid_t *locked_by) { struct flock lock; int fd, rv; /* After reboot the directory may not yet exist. * Try to create it, but ignore errors. */ if (strncmp(cl.lockfile, BOOTH_RUN_DIR, strlen(BOOTH_RUN_DIR)) == 0) mkdir(BOOTH_RUN_DIR, 0775); if (locked_by) *locked_by = 0; *fdp = -1; fd = open(cl.lockfile, mode, 0664); if (fd < 0) return errno; *fdp = fd; lock.l_type = F_WRLCK; lock.l_start = 0; lock.l_whence = SEEK_SET; lock.l_len = 0; lock.l_pid = 0; if (fcntl(fd, F_SETLK, &lock) == 0) return 0; rv = errno; if (locked_by) if (fcntl(fd, F_GETLK, &lock) == 0) *locked_by = lock.l_pid; return rv; } static int lockfile(void) { int rv, fd; fd = -1; rv = _lockfile(O_CREAT | O_WRONLY, &fd, NULL); if (fd == -1) { log_error("lockfile %s open error %d: %s", cl.lockfile, rv, strerror(rv)); return -1; } if (rv < 0) { log_error("lockfile %s setlk error %d: %s", cl.lockfile, rv, strerror(rv)); goto fail; } - rv = ftruncate(fd, 0); - if (rv < 0) { - log_error("lockfile %s truncate error %d: %s", - cl.lockfile, errno, strerror(errno)); - goto fail; - } - rv = write_daemon_state(fd, BOOTHD_STARTING); if (rv != 0) { log_error("write daemon state %d to lockfile error %s: %s", BOOTHD_STARTING, cl.lockfile, strerror(errno)); goto fail; } return fd; fail: close(fd); return -1; } static void unlink_lockfile(int fd) { unlink(cl.lockfile); close(fd); } static void print_usage(void) { printf("Usage:\n"); printf("booth [options]\n"); printf("\n"); printf("Types:\n"); printf(" arbitrator: daemon running on arbitrator\n"); printf(" site: daemon running on cluster site\n"); printf(" client: command running from client\n"); printf("\n"); printf("Operations:\n"); printf("Please note that operations are valid iff type is client!\n"); printf(" list: List all the tickets\n"); printf(" grant: Grant ticket T(-t T) to site S(-s S)\n"); printf(" revoke: Revoke ticket T(-t T) from site S(-s S)\n"); printf("\n"); printf("Options:\n"); printf(" -c FILE Specify config file [default " BOOTH_DEFAULT_CONF "]\n"); printf(" -l LOCKFILE Specify lock file path\n"); printf(" -D Enable debugging to stderr and don't fork\n"); printf(" -t ticket name\n"); printf(" -S report local daemon status (for site and arbitrator)\n"); printf(" RA script compliant return codes.\n"); printf(" -s site name\n"); printf(" -h Print this help, then exit\n"); } #define OPTION_STRING "c:Dl:t:s:hS" void safe_copy(char *dest, char *value, size_t buflen, const char *description) { int content_len = buflen - 1; if (strlen(value) >= content_len) { fprintf(stderr, "'%s' exceeds maximum %s length of %d\n", value, description, content_len); exit(EXIT_FAILURE); } strncpy(dest, value, content_len); dest[content_len] = 0; } static int host_convert(char *hostname, char *ip_str, size_t ip_size) { struct addrinfo *result = NULL, hints = {0}; int re = -1; memset(&hints, 0, sizeof(hints)); hints.ai_family = BOOTH_PROTO_FAMILY; hints.ai_socktype = SOCK_DGRAM; re = getaddrinfo(hostname, NULL, &hints, &result); if (re == 0) { struct in_addr addr = ((struct sockaddr_in *)result->ai_addr)->sin_addr; const char *re_ntop = inet_ntop(BOOTH_PROTO_FAMILY, &addr, ip_str, ip_size); if (re_ntop == NULL) { re = -1; } } freeaddrinfo(result); return re; } static int read_arguments(int argc, char **argv) { int optchar; char *arg1 = argv[1]; char *op = NULL; char site_arg[INET_ADDRSTRLEN] = {0}; if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") || !strcmp(arg1, "-h")) { print_usage(); exit(EXIT_SUCCESS); } if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") || !strcmp(arg1, "-V")) { printf("%s %s (built %s %s)\n", argv[0], RELEASE_VERSION, __DATE__, __TIME__); exit(EXIT_SUCCESS); } if (strcmp(arg1, "arbitrator") == 0 || strcmp(arg1, "site") == 0 || strcmp(arg1, "start") == 0 || strcmp(arg1, "daemon") == 0) { cl.type = DAEMON; optind = 2; } else if (strcmp(arg1, "status") == 0) { cl.type = STATUS; optind = 2; } else if (strcmp(arg1, "client") == 0) { cl.type = CLIENT; if (argc < 3) { print_usage(); exit(EXIT_FAILURE); } op = argv[2]; optind = 3; } else { cl.type = CLIENT; op = argv[1]; optind = 2; } switch (cl.type) { case ARBITRATOR: break; case SITE: break; case CLIENT: if (!strcmp(op, "list")) cl.op = OP_LIST; else if (!strcmp(op, "grant")) cl.op = OP_GRANT; else if (!strcmp(op, "revoke")) cl.op = OP_REVOKE; else { fprintf(stderr, "client operation \"%s\" is unknown\n", op); exit(EXIT_FAILURE); } break; } while (optind < argc) { optchar = getopt(argc, argv, OPTION_STRING); switch (optchar) { case 'c': safe_copy(cl.configfile, optarg, sizeof(cl.configfile), "config file"); break; case 'D': daemonize = 1; debug_level++; break; case 'l': safe_copy(cl.lockfile, optarg, sizeof(cl.lockfile), "lock file"); break; case 't': if (cl.op == OP_GRANT || cl.op == OP_REVOKE) { - safe_copy(cl.msg.ticket, optarg, sizeof(cl.msg.ticket), "ticket name"); + safe_copy(cl.msg.ticket.id, optarg, sizeof(cl.msg.ticket.id), "ticket name"); } else { print_usage(); exit(EXIT_FAILURE); } break; case 's': if (cl.op == OP_GRANT || cl.op == OP_REVOKE) { int re = host_convert(optarg, site_arg, INET_ADDRSTRLEN); if (re == 0) { - safe_copy(cl.msg.site, site_arg, sizeof(cl.msg.ticket), "site name"); + safe_copy(cl.msg.site.site, site_arg, sizeof(cl.msg.ticket), "site name"); } else { - safe_copy(cl.msg.site, optarg, sizeof(cl.msg.ticket), "site name"); + safe_copy(cl.msg.site.site, optarg, sizeof(cl.msg.ticket), "site name"); } } else { print_usage(); exit(EXIT_FAILURE); } break; case 'h': print_usage(); exit(EXIT_SUCCESS); break; case ':': case '?': fprintf(stderr, "Please use '-h' for usage.\n"); exit(EXIT_FAILURE); break; default: fprintf(stderr, "unknown option: %s\n", argv[optind]); exit(EXIT_FAILURE); break; }; } return 0; } static void set_scheduler(void) { struct sched_param sched_param; struct rlimit rlimit; int rv; rlimit.rlim_cur = RLIM_INFINITY; rlimit.rlim_max = RLIM_INFINITY; setrlimit(RLIMIT_MEMLOCK, &rlimit); rv = mlockall(MCL_CURRENT | MCL_FUTURE); if (rv < 0) { log_error("mlockall failed"); } rv = sched_get_priority_max(SCHED_RR); if (rv != -1) { sched_param.sched_priority = rv; rv = sched_setscheduler(0, SCHED_RR, &sched_param); if (rv == -1) log_error("could not set SCHED_RR priority %d: %s (%d)", sched_param.sched_priority, strerror(errno), errno); } else { log_error("could not get maximum scheduler priority err %d", errno); } } static void set_oom_adj(int val) { FILE *fp; fp = fopen("/proc/self/oom_adj", "w"); if (!fp) return; fprintf(fp, "%i", val); fclose(fp); } static int do_status(int type) { pid_t pid; int rv, lock_fd, ret; const char *reason = NULL; char lockfile_data[1024], *cp; ret = PCMK_OCF_NOT_RUNNING; /* TODO: query all, and return quit only if it's _cleanly_ not * running, ie. _neither_ of port/lockfile/process is available? * * Currently a single failure says "not running", even if "only" the * lockfile has been removed. */ rv = setup_config(type); if (rv) { reason = "Error reading configuration."; ret = PCMK_LSB_UNKNOWN_ERROR; goto quit; } if (!local) { reason = "No Service IP active here."; goto quit; } rv = _lockfile(O_RDWR, &lock_fd, &pid); if (rv == 0) { reason = "PID file not locked."; goto quit; } if (lock_fd == -1) { reason = "No PID file."; goto quit; } if (pid) { fprintf(stdout, "booth_lockpid=%d ", pid); fflush(stdout); } rv = read(lock_fd, lockfile_data, sizeof(lockfile_data) - 1); if (rv < 4) { reason = "Cannot read lockfile data."; ret = PCMK_LSB_UNKNOWN_ERROR; goto quit; } lockfile_data[rv] = 0; if (lock_fd != -1) close(lock_fd); /* Make sure it's only a single line */ cp = strchr(lockfile_data, '\r'); if (cp) *cp = 0; cp = strchr(lockfile_data, '\n'); if (cp) *cp = 0; rv = setup_udp_server(1); if (rv == 0) { reason = "UDP port not in use."; goto quit; } fprintf(stdout, "booth_lockfile='%s' %s\n", cl.lockfile, lockfile_data); if (daemonize) fprintf(stderr, "Booth at %s port %d seems to be running.\n", local->addr_string, booth_conf->port); return 0; quit: log_debug("not running: %s", reason); /* Ie. "DEBUG" */ if (daemonize) fprintf(stderr, "not running: %s\n", reason); return ret; } static int do_server(int type) { int lock_fd = -1; int rv = -1; static char log_ent[128] = DAEMON_NAME "-"; rv = setup_config(type); if (rv < 0) goto out; if (!local) { log_error("Cannot find myself in the configuration."); exit(EXIT_FAILURE); } if (!daemonize) { if (daemon(0, 0) < 0) { perror("daemon error"); exit(EXIT_FAILURE); } } /* The lockfile must be written to _after_ the call to daemon(), so * that the lockfile contains the pid of the daemon, not the parent. */ lock_fd = lockfile(); if (lock_fd < 0) return lock_fd; strcat(log_ent, type_to_string(local->type)); cl_log_set_entity(log_ent); cl_log_enable_stderr(debug_level ? TRUE : FALSE); cl_log_set_facility(HA_LOG_FACILITY); cl_inherit_logging_environment(0); if (local->type == ARBITRATOR) log_info("BOOTH arbitrator daemon is starting."); else if (local->type == SITE) log_info("BOOTH cluster site daemon is starting."); set_scheduler(); set_oom_adj(-16); set_proc_title("%s %s for [%s]:%d", DAEMON_NAME, type_to_string(local->type), local->addr_string, booth_conf->port); rv = loop(lock_fd); out: if (lock_fd >= 0) unlink_lockfile(lock_fd); return rv; } static int do_client(void) { int rv = -1; rv = setup_config(CLIENT); if (rv < 0) { log_error("cannot read config"); goto out; } switch (cl.op) { case OP_LIST: rv = query_get_string_answer(BOOTHC_CMD_LIST); break; case OP_GRANT: rv = do_grant(); break; case OP_REVOKE: rv = do_revoke(); break; } out: return rv; } int main(int argc, char *argv[], char *envp[]) { int rv; init_set_proc_title(argc, argv, envp); memset(&cl, 0, sizeof(cl)); strncpy(cl.configfile, BOOTH_DEFAULT_CONF, BOOTH_PATH_LEN - 1); cl.lockfile[0] = 0; debug_level = 0; cl_log_set_entity("booth"); cl_log_enable_stderr(TRUE); cl_log_set_facility(0); rv = read_arguments(argc, argv); if (rv < 0) goto out; switch (cl.type) { case STATUS: rv = do_status(cl.type); break; case ARBITRATOR: case DAEMON: case SITE: rv = do_server(cl.type); break; case CLIENT: rv = do_client(); break; } out: /* Normalize values. 0x100 would be seen as "OK" by waitpid(). */ return rv >= 0 && rv < 0x70 ? rv : 1; } diff --git a/src/paxos.c b/src/paxos.c index 1b1b541..3c86052 100644 --- a/src/paxos.c +++ b/src/paxos.c @@ -1,859 +1,816 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include "list.h" +#include "booth.h" +#include "config.h" #include "paxos.h" #include "log.h" +/* Use numbers that are unlikely to conflict with other enums. */ typedef enum { - INIT = 1, + INIT = 0x5104, PREPARING, PROMISING, PROPOSING, ACCEPTING, RECOVERY, COMMITTED, + REJECTED, } paxos_state_t; struct proposal { int ballot_number; char value[0]; }; struct learned { int ballot; int number; }; struct paxos_msghdr { paxos_state_t state; int from; char psname[PAXOS_NAME_LEN+1]; char piname[PAXOS_NAME_LEN+1]; int ballot_number; int proposer_id; unsigned int extralen; unsigned int valuelen; }; struct proposer { int state; int ballot; int open_number; int accepted_number; int proposed; struct proposal *proposal; }; struct acceptor { int state; int highest_promised; struct proposal *accepted_proposal; }; struct learner { int state; int learned_max; int learned_ballot; struct learned learned[0]; }; struct paxos_space; struct paxos_instance; struct proposer_operations { void (*prepare) (struct paxos_instance *, int *); void (*propose) (struct paxos_space *, struct paxos_instance *, void *, int); void (*commit) (struct paxos_space *, struct paxos_instance *, void *, int); }; struct acceptor_operations { void (*promise) (struct paxos_space *, struct paxos_instance *, void *, int); void (*accepted) (struct paxos_space *, struct paxos_instance *, void *, int); }; struct learner_operations { void (*response) (struct paxos_space *, struct paxos_instance *, void *, int); }; struct paxos_space { char name[PAXOS_NAME_LEN+1]; unsigned int number; unsigned int extralen; unsigned int valuelen; const unsigned char *role; const struct paxos_operations *p_op; const struct proposer_operations *r_op; const struct acceptor_operations *a_op; const struct learner_operations *l_op; struct list_head list; struct list_head pi_head; }; struct paxos_instance { char name[PAXOS_NAME_LEN+1]; int round; int *prio; struct proposer *proposer; struct acceptor *acceptor; struct learner *learner; void (*end) (pi_handle_t pih, int round, int result); struct list_head list; struct paxos_space *ps; }; static LIST_HEAD(ps_head); static int have_quorum(struct paxos_space *ps, int member) { int i, sum = 0; for (i = 0; i < ps->number; i++) { if (ps->role[i] & ACCEPTOR) sum++; } if (member * 2 > sum) return 1; else return 0; } static int next_ballot_number(struct paxos_instance *pi) { int ballot; int myid = pi->ps->p_op->get_myid(); if (pi->prio) ballot = pi->prio[myid]; else ballot = myid; while (ballot <= pi->round) ballot += pi->ps->number; return ballot; } + +static void prepare_a_message(struct boothc_ticket_msg *msg, int state, struct paxos_instance *pax_inst) +{ + msg->ticket.state = htonl(state); + msg->ticket.proposer_id = + msg->header.from = + htonl(booth_get_myid()); +// strcpy(hdr->psname, pax_inst->ps->name); +// strcpy(hdr->piname, pax_inst->name); + msg->ticket.ballot = htonl(pax_inst->round); +// hdr->extralen = htonl(pax_inst->ps->extralen); +// extra = (char *)msg + sizeof(struct paxos_msghdr); +// memcpy((char *)msg + sizeof(struct paxos_msghdr) + pi->ps->extralen, +// value, pax_inst->ps->valuelen); + +} + static void proposer_prepare(struct paxos_instance *pi, int *round) { - struct paxos_msghdr *hdr; - void *msg, *extra; - int msglen = sizeof(struct paxos_msghdr) + pi->ps->extralen; + struct boothc_ticket_msg msg; int ballot; log_debug("preposer prepare ..."); - msg = malloc(msglen); - if (!msg) { - log_error("no mem for msg"); - *round = -ENOMEM; - return; - } - memset(msg, 0, msglen); - hdr = msg; - extra = (char *)msg + sizeof(struct paxos_msghdr); if (*round > pi->round) pi->round = *round; ballot = next_ballot_number(pi); pi->proposer->ballot = ballot; - hdr->state = htonl(PREPARING); - hdr->from = htonl(pi->ps->p_op->get_myid()); - hdr->proposer_id = hdr->from; - strcpy(hdr->psname, pi->ps->name); - strcpy(hdr->piname, pi->name); - hdr->ballot_number = htonl(ballot); - hdr->extralen = htonl(pi->ps->extralen); + prepare_a_message(&msg, PREPARING, pi); - if (pi->ps->p_op->prepare && - pi->ps->p_op->prepare((pi_handle_t)pi, extra) < 0) + if (lease_prepare(pi, &msg) < 0) return; - if (pi->ps->p_op->broadcast) - pi->ps->p_op->broadcast(msg, msglen); - else { - int i; - for (i = 0; i < pi->ps->number; i++) { - if (pi->ps->role[i] & ACCEPTOR) - pi->ps->p_op->send(i, msg, msglen); - } - } + transport()->broadcast(&msg, sizeof(msg)); - free(msg); *round = ballot; } static void proposer_propose(struct paxos_space *ps, struct paxos_instance *pi, - void *msg, int msglen) + struct boothc_ticket_msg *msg, int msglen) { struct paxos_msghdr *hdr; - pi_handle_t pih = (pi_handle_t)pi; - void *extra, *value, *message; int ballot; log_debug("proposer propose ..."); - if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) { - log_error("message length incorrect, " - "msglen: %d, msghdr len: %lu, extralen: %u", - msglen, (long)sizeof(struct paxos_msghdr), - ps->extralen); - return; - } - hdr = msg; ballot = ntohl(hdr->ballot_number); if (pi->proposer->ballot != ballot) { log_debug("not the same ballot, proposer ballot: %d, " "received ballot: %d", pi->proposer->ballot, ballot); return; } - extra = (char *)msg + sizeof(struct paxos_msghdr); - if (ps->p_op->is_prepared) { - if (ps->p_op->is_prepared(pih, extra)) - pi->proposer->open_number++; - } else + if (lease_is_prepared(pi, msg)) pi->proposer->open_number++; if (!have_quorum(ps, pi->proposer->open_number)) return; if (pi->proposer->proposed) return; pi->proposer->proposed = 1; value = pi->proposer->proposal->value; - if (ps->p_op->propose - && ps->p_op->propose(pih, extra, ballot, value) < 0) + if (lease_propose(pih, &msg, ballot, value) < 0) return; - hdr->valuelen = htonl(ps->valuelen); - message = malloc(msglen + ps->valuelen); - if (!message) { - log_error("no mem for value"); - return; - } - memset(message, 0, msglen + ps->valuelen); - memcpy(message, msg, msglen); - memcpy((char *)message + msglen, value, ps->valuelen); - pi->proposer->state = PROPOSING; - hdr = message; - hdr->from = htonl(ps->p_op->get_myid()); - hdr->state = htonl(PROPOSING); + prepare_a_message(&msg, PROPOSING, pi); - if (ps->p_op->broadcast) - ps->p_op->broadcast(message, msglen + ps->valuelen); - else { - int i; - for (i = 0; i < ps->number; i++) { - if (ps->role[i] & ACCEPTOR) - ps->p_op->send(i, message, - msglen + ps->valuelen); - } - } - free(message); + transport()->broadcast(&msg, sizeof(msg)) } static void proposer_commit(struct paxos_space *ps, struct paxos_instance *pi, void *msg, int msglen) { struct paxos_msghdr *hdr; pi_handle_t pih = (pi_handle_t)pi; void *extra; int ballot; log_debug("proposer commit ..."); if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) { log_error("message length incorrect, " "msglen: %d, msghdr len: %lu, extralen: %u", msglen, (long)sizeof(struct paxos_msghdr), ps->extralen); return; } extra = (char *)msg + sizeof(struct paxos_msghdr); hdr = msg; ballot = ntohl(hdr->ballot_number); if (pi->proposer->ballot != ballot) { log_debug("not the same ballot, proposer ballot: %d, " "received ballot: %d", pi->proposer->ballot, ballot); return; } pi->proposer->accepted_number++; if (!have_quorum(ps, pi->proposer->accepted_number)) return; if (pi->proposer->state == COMMITTED) return; pi->round = ballot; if (ps->p_op->commit && ps->p_op->commit(pih, extra, pi->round) < 0) return; pi->proposer->state = COMMITTED; if (pi->end) pi->end(pih, pi->round, 0); } static void acceptor_promise(struct paxos_space *ps, struct paxos_instance *pi, void *msg, int msglen) { struct paxos_msghdr *hdr; unsigned long to; pi_handle_t pih = (pi_handle_t)pi; void *extra; log_debug("acceptor promise ..."); if (pi->acceptor->state == RECOVERY) { log_debug("still in recovery"); return; } if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) { log_error("message length incorrect, " "msglen: %d, msghdr len: %lu, extralen: %u", msglen, (long)sizeof(struct paxos_msghdr), ps->extralen); return; } hdr = msg; extra = (char *)msg + sizeof(struct paxos_msghdr); if (ntohl(hdr->ballot_number) < pi->acceptor->highest_promised) { log_debug("ballot number: %d, highest promised: %d", ntohl(hdr->ballot_number), pi->acceptor->highest_promised); return; } if (ps->p_op->promise && ps->p_op->promise(pih, extra) < 0) return; pi->acceptor->highest_promised = ntohl(hdr->ballot_number); pi->acceptor->state = PROMISING; to = ntohl(hdr->from); hdr->from = htonl(ps->p_op->get_myid()); hdr->state = htonl(PROMISING); ps->p_op->send(to, msg, msglen); } static void acceptor_accepted(struct paxos_space *ps, struct paxos_instance *pi, void *msg, int msglen) { struct paxos_msghdr *hdr; unsigned long to; pi_handle_t pih = (pi_handle_t)pi; void *extra, *value; int myid = ps->p_op->get_myid(); int ballot; log_debug("acceptor accepted ..."); if (pi->acceptor->state == RECOVERY) { log_debug("still in recovery"); return; } if (msglen != sizeof(struct paxos_msghdr) + ps->extralen + ps->valuelen) { log_error("message length incorrect, msglen: " "%d, msghdr len: %lu, extralen: %u, valuelen: %u", msglen, (long)sizeof(struct paxos_msghdr), ps->extralen, ps->valuelen); return; } hdr = msg; extra = (char *)msg + sizeof(struct paxos_msghdr); ballot = ntohl(hdr->ballot_number); if (ballot < pi->acceptor->highest_promised) { log_debug("ballot: %d, highest promised: %d", ballot, pi->acceptor->highest_promised); return; } value = pi->acceptor->accepted_proposal->value; memcpy(value, (char *)msg + sizeof(struct paxos_msghdr) + ps->extralen, ps->valuelen); if (ps->p_op->accepted && ps->p_op->accepted(pih, extra, ballot, value) < 0) return; pi->acceptor->state = ACCEPTING; to = ntohl(hdr->from); hdr->from = htonl(myid); hdr->state = htonl(ACCEPTING); if (ps->p_op->broadcast) ps->p_op->broadcast(msg, sizeof(struct paxos_msghdr) + ps->extralen); else { int i; for (i = 0; i < ps->number; i++) { if (ps->role[i] & LEARNER) ps->p_op->send(i, msg, sizeof(struct paxos_msghdr) + ps->extralen); } if (!(ps->role[to] & LEARNER)) ps->p_op->send(to, msg, sizeof(struct paxos_msghdr) + ps->extralen); } } static void learner_response(struct paxos_space *ps, struct paxos_instance *pi, void *msg, int msglen) { struct paxos_msghdr *hdr; pi_handle_t pih = (pi_handle_t)pi; void *extra; int i, unused = 0, found = 0; int ballot; log_debug("learner response ..."); if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) { log_error("message length incorrect, " "msglen: %d, msghdr len: %lu, extralen: %u", msglen, (long)sizeof(struct paxos_msghdr), ps->extralen); return; } hdr = msg; extra = (char *)msg + sizeof(struct paxos_msghdr); ballot = ntohl(hdr->ballot_number); for (i = 0; i < ps->number; i++) { if (!pi->learner->learned[i].ballot) { unused = i; break; } if (pi->learner->learned[i].ballot == ballot) { pi->learner->learned[i].number++; if (pi->learner->learned[i].number > pi->learner->learned_max) pi->learner->learned_max = pi->learner->learned[i].number; found = 1; break; } } if (!found) { pi->learner->learned[unused].ballot = ntohl(hdr->ballot_number); pi->learner->learned[unused].number = 1; } if (!have_quorum(ps, pi->learner->learned_max)) return; if (ps->p_op->learned) ps->p_op->learned(pih, extra, ballot); } const struct proposer_operations generic_proposer_operations = { .prepare = proposer_prepare, .propose = proposer_propose, .commit = proposer_commit, }; const struct acceptor_operations generic_acceptor_operations = { .promise = acceptor_promise, .accepted = acceptor_accepted, }; const struct learner_operations generic_learner_operations = { .response = learner_response, }; ps_handle_t paxos_space_init(const void *name, unsigned int number, unsigned int extralen, unsigned int valuelen, const unsigned char *role, const struct paxos_operations *p_op) { struct paxos_space *ps; list_for_each_entry(ps, &ps_head, list) { if (!strcmp(ps->name, name)) { log_info("paxos space (%s) has already been " "initialized", (char *)name); return -EEXIST; } } if (!number || !valuelen || !p_op || !p_op->get_myid || !p_op->send) { log_error("invalid agruments"); return -EINVAL; } ps = malloc(sizeof(struct paxos_space)); if (!ps) { log_error("no mem for paxos space"); return -ENOMEM; } memset(ps, 0, sizeof(struct paxos_space)); strncpy(ps->name, name, PAXOS_NAME_LEN + 1); ps->number = number; ps->extralen = extralen; ps->valuelen = valuelen; ps->role = role; ps->p_op = p_op; ps->r_op = &generic_proposer_operations; ps->a_op = &generic_acceptor_operations; ps->l_op = &generic_learner_operations; list_add_tail(&ps->list, &ps_head); INIT_LIST_HEAD(&ps->pi_head); return (ps_handle_t)ps; } pi_handle_t paxos_instance_init(ps_handle_t handle, const void *name, int *prio) { struct paxos_space *ps = (struct paxos_space *)handle; struct paxos_instance *pi; struct proposer *proposer = NULL; struct acceptor *acceptor = NULL; struct learner *learner = NULL; int myid, valuelen, rv; list_for_each_entry(pi, &ps->pi_head, list) { if (!strcmp(pi->name, name)) return (pi_handle_t)pi; } if (handle <= 0 || !ps->p_op || !ps->p_op->get_myid) { log_error("invalid agruments"); rv = -EINVAL; goto out; } myid = ps->p_op->get_myid(); valuelen = ps->valuelen; pi = malloc(sizeof(struct paxos_instance)); if (!pi) { log_error("no mem for paxos instance"); rv = -ENOMEM; goto out; } memset(pi, 0, sizeof(struct paxos_instance)); strncpy(pi->name, name, PAXOS_NAME_LEN + 1); if (prio) { pi->prio = malloc(ps->number * sizeof(int)); if (!pi->prio) { log_error("no mem for prio"); rv = -ENOMEM; goto out_pi; } memcpy(pi->prio, prio, ps->number * sizeof(int)); } if (ps->role[myid] & PROPOSER) { proposer = malloc(sizeof(struct proposer)); if (!proposer) { log_error("no mem for proposer"); rv = -ENOMEM; goto out_prio; } memset(proposer, 0, sizeof(struct proposer)); proposer->state = INIT; proposer->proposal = malloc(sizeof(struct proposal) + valuelen); if (!proposer->proposal) { log_error("no mem for proposal"); rv = -ENOMEM; goto out_proposer; } memset(proposer->proposal, 0, sizeof(struct proposal) + valuelen); pi->proposer = proposer; } if (ps->role[myid] & ACCEPTOR) { acceptor = malloc(sizeof(struct acceptor)); if (!acceptor) { log_error("no mem for acceptor"); rv = -ENOMEM; goto out_proposal; } memset(acceptor, 0, sizeof(struct acceptor)); acceptor->state = INIT; acceptor->accepted_proposal = malloc(sizeof(struct proposal) + valuelen); if (!acceptor->accepted_proposal) { log_error("no mem for accepted proposal"); rv = -ENOMEM; goto out_acceptor; } memset(acceptor->accepted_proposal, 0, sizeof(struct proposal) + valuelen); pi->acceptor = acceptor; if (ps->p_op->catchup) pi->acceptor->state = RECOVERY; else pi->acceptor->state = INIT; } if (ps->role[myid] & LEARNER) { learner = malloc(sizeof(struct learner) + ps->number * sizeof(struct learned)); if (!learner) { log_error("no mem for learner"); rv = -ENOMEM; goto out_accepted_proposal; } memset(learner, 0, sizeof(struct learner) + ps->number * sizeof(struct learned)); learner->state = INIT; pi->learner = learner; } pi->ps = ps; list_add_tail(&pi->list, &ps->pi_head); return (pi_handle_t)pi; out_accepted_proposal: if (ps->role[myid] & ACCEPTOR) free(acceptor->accepted_proposal); out_acceptor: if (ps->role[myid] & ACCEPTOR) free(acceptor); out_proposal: if (ps->role[myid] & PROPOSER) free(proposer->proposal); out_proposer: if (ps->role[myid] & PROPOSER) free(proposer); out_prio: if (pi->prio) free(pi->prio); out_pi: free(pi); out: return rv; } int paxos_round_request(pi_handle_t handle, void *value, int *round, void (*end_request) (pi_handle_t handle, int round, int result)) { struct paxos_instance *pi = (struct paxos_instance *)handle; int myid = pi->ps->p_op->get_myid(); int rv = *round; if (!(pi->ps->role[myid] & PROPOSER)) { log_debug("only proposer can do this"); return -EOPNOTSUPP; } pi->proposer->state = PREPARING; pi->proposer->open_number = 0; pi->proposer->accepted_number = 0; pi->proposer->proposed = 0; memcpy(pi->proposer->proposal->value, value, pi->ps->valuelen); pi->end = end_request; pi->ps->r_op->prepare(pi, &rv); return rv; } int paxos_recovery_status_get(pi_handle_t handle) { struct paxos_instance *pi = (struct paxos_instance *)handle; int myid = pi->ps->p_op->get_myid(); if (!(pi->ps->role[myid] & ACCEPTOR)) return -EOPNOTSUPP; if (pi->acceptor->state == RECOVERY) return 1; else return 0; } int paxos_recovery_status_set(pi_handle_t handle, int recovery) { struct paxos_instance *pi = (struct paxos_instance *)handle; int myid = pi->ps->p_op->get_myid(); if (!(pi->ps->role[myid] & ACCEPTOR)) return -EOPNOTSUPP; if (recovery) pi->acceptor->state = RECOVERY; else pi->acceptor->state = INIT; return 0; } int paxos_propose(pi_handle_t handle, void *value, int round) { struct paxos_instance *pi = (struct paxos_instance *)handle; struct paxos_msghdr *hdr; void *extra, *msg; int len = sizeof(struct paxos_msghdr) + pi->ps->extralen + pi->ps->valuelen; if (!pi->proposer->ballot) pi->proposer->ballot = round; if (round != pi->proposer->ballot) { log_debug("round: %d, proposer ballot: %d", round, pi->proposer->ballot); return -EINVAL; } msg = malloc(len); if (!msg) { log_error("no mem for msg"); return -ENOMEM; } pi->proposer->state = PROPOSING; strcpy(pi->proposer->proposal->value, value); pi->proposer->accepted_number = 0; pi->round = round; memset(msg, 0, len); hdr = msg; hdr->state = htonl(PROPOSING); hdr->from = htonl(pi->ps->p_op->get_myid()); hdr->proposer_id = hdr->from; strcpy(hdr->psname, pi->ps->name); strcpy(hdr->piname, pi->name); hdr->ballot_number = htonl(pi->round); hdr->extralen = htonl(pi->ps->extralen); extra = (char *)msg + sizeof(struct paxos_msghdr); memcpy((char *)msg + sizeof(struct paxos_msghdr) + pi->ps->extralen, value, pi->ps->valuelen); if (pi->ps->p_op->propose) pi->ps->p_op->propose(handle, extra, round, value); if (pi->ps->p_op->broadcast) pi->ps->p_op->broadcast(msg, len); else { int i; for (i = 0; i < pi->ps->number; i++) { if (pi->ps->role[i] & ACCEPTOR) pi->ps->p_op->send(i, msg, len); } } free(msg); return 0; } int paxos_catchup(pi_handle_t handle) { struct paxos_instance *pi = (struct paxos_instance *)handle; return pi->ps->p_op->catchup(handle); } -int paxos_recvmsg(void *msg, int msglen) +int paxos_recvmsg(struct boothc_ticket_msg *msg) { struct paxos_msghdr *hdr = msg; struct paxos_space *ps; struct paxos_instance *pi; int found = 0; int myid; list_for_each_entry(ps, &ps_head, list) { if (!strcmp(ps->name, hdr->psname)) { found = 1; break; } } if (!found) { log_error("could not find the received ps name (%s) " "in registered list", hdr->psname); return -EINVAL; } myid = ps->p_op->get_myid(); found = 0; list_for_each_entry(pi, &ps->pi_head, list) { if (!strcmp(pi->name, hdr->piname)) { found = 1; break; } } if (!found) paxos_instance_init((ps_handle_t)ps, hdr->piname, NULL); switch (ntohl(hdr->state)) { case PREPARING: if (ps->role[myid] & ACCEPTOR) ps->a_op->promise(ps, pi, msg, msglen); break; case PROMISING: ps->r_op->propose(ps, pi, msg, msglen); break; case PROPOSING: if (ps->role[myid] & ACCEPTOR) ps->a_op->accepted(ps, pi, msg, msglen); break; case ACCEPTING: if (ntohl(hdr->proposer_id) == myid) ps->r_op->commit(ps, pi, msg, msglen); else if (ps->role[myid] & LEARNER) ps->l_op->response(ps, pi, msg, msglen); break; default: log_debug("invalid message type: %d", ntohl(hdr->state)); break; }; return 0; } diff --git a/src/paxos.h b/src/paxos.h index cff8c66..5ba81e9 100644 --- a/src/paxos.h +++ b/src/paxos.h @@ -1,83 +1,84 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _PAXOS_H #define _PAXOS_H #define PAXOS_NAME_LEN 63 #define PROPOSER 0x4 #define ACCEPTOR 0x2 #define LEARNER 0x1 typedef long ps_handle_t; typedef long pi_handle_t; struct paxos_operations { int (*get_myid) (void); int (*send) (unsigned long id, void *value, int len); int (*broadcast) (void *value, int len); int (*catchup) (pi_handle_t handle); int (*prepare) (pi_handle_t handle, void *extra); int (*promise) (pi_handle_t handle, void *extra); int (*is_prepared) (pi_handle_t handle, void *extra); int (*propose) (pi_handle_t handle, void *extra, int round, void *value); int (*accepted) (pi_handle_t handle, void *extra, int round, void *value); int (*commit) (pi_handle_t handle, void *extra, int round); int (*learned) (pi_handle_t handle, void *extra, int round); }; int paxos_recvmsg(void *msg, int msglen); +struct paxos_instance; ps_handle_t paxos_space_init(const void *name, unsigned int number, unsigned int extralen, unsigned int valuelen, const unsigned char *role, const struct paxos_operations *p_op); pi_handle_t paxos_instance_init(ps_handle_t handle, const void *name, int *prio); int paxos_round_request(pi_handle_t handle, void *value, int *round, void (*end_request) (pi_handle_t handle, int round, int result)); int paxos_round_discard(pi_handle_t handle, int round); int paxos_leader_get(pi_handle_t handle, int *round); int paxos_recovery_status_get(pi_handle_t handle); int paxos_recovery_status_set(pi_handle_t handle, int recovery); int paxos_catchup(pi_handle_t handle); int paxos_propose(pi_handle_t handle, void *value, int round); int paxos_instance_exit(pi_handle_t handle); int paxos_space_exit(ps_handle_t handle); #endif /* _PAXOS_H */ diff --git a/src/paxos_lease.c b/src/paxos_lease.c index b4f9747..1f89286 100644 --- a/src/paxos_lease.c +++ b/src/paxos_lease.c @@ -1,936 +1,936 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include "paxos.h" #include "paxos_lease.h" #include "transport.h" #include "config.h" #include "timer.h" #include "list.h" #include "log.h" #define PAXOS_LEASE_SPACE "paxoslease" #define PLEASE_VALUE_LEN 1024 #define OP_START_LEASE 0 #define OP_STOP_LEASE 1 #define LEASE_STARTED 0 #define LEASE_STOPPED 1 struct paxos_lease_msghdr { int op; int clear; int leased; }; struct paxos_lease_value { char name[PAXOS_NAME_LEN+1]; int owner; int expiry; }; struct lease_action { int op; int clear; }; struct lease_state { int round; struct paxos_lease_value *plv; unsigned long long expires; struct timerlist *timer1; struct timerlist *timer2; }; struct paxos_lease { char name[PAXOS_NAME_LEN+1]; - pi_handle_t pih; + struct paxos_instance pip; struct lease_action action; struct lease_state proposer; struct lease_state acceptor; int owner; int expiry; int renew; int failover; - int release; + int release; /* LEASE_STARTED, LEASE_STOPPED */ unsigned long long expires; void (*end_lease) (pi_handle_t, int); struct timerlist *timer; struct list_head list; }; static LIST_HEAD(lease_head); static int myid = -1; static struct paxos_operations *px_op = NULL; const struct paxos_lease_operations *p_l_op; ps_handle_t ps_handle = 0; -static int find_paxos_lease(pi_handle_t handle, struct paxos_lease **pl) +static int find_paxos_lease(struct paxos_instance *pi, struct paxos_lease **pl) { struct paxos_lease *lpl; int found = 0; list_for_each_entry(lpl, &lease_head, list) { - if (lpl->pih == handle) { + if (lpl->pip == pi) { found = 1; break; } } if (!found) log_error("cound not found the handle for paxos lease: %ld", handle); else *pl = lpl; return found; } static void end_paxos_request(pi_handle_t handle, int round, int result) { struct paxos_lease *pl; if (!find_paxos_lease(handle, &pl)) return; if (round != pl->proposer.round) { log_error("current paxos round is not the proposer round, " "current round: %d, proposer round: %d", round, pl->proposer.round); return; } if (pl->end_lease) pl->end_lease((pl_handle_t)pl, result); - - return; + + return; } static void renew_expires(unsigned long data) { struct paxos_lease *pl = (struct paxos_lease *)data; struct paxos_lease_value value; log_debug("renew expires ..."); if (pl->owner != myid) { log_debug("can not renew because I'm not the lease owner"); return; } memset(&value, 0, sizeof(struct paxos_lease_value)); strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1); value.owner = myid; value.expiry = pl->expiry; paxos_propose(pl->pih, &value, pl->proposer.round); } static void lease_expires(unsigned long data) { struct paxos_lease *pl = (struct paxos_lease *)data; pl_handle_t plh = (pl_handle_t)pl; struct paxos_lease_result plr; log_info("lease expires ... owner [%d] ticket [%s]", pl->owner, pl->name); - pl->owner = -1; + pl->owner = NO_OWNER; strcpy(plr.name, pl->name); - plr.owner = -1; + plr.owner = NO_OWNER; plr.expires = 0; plr.ballot = pl->acceptor.round; p_l_op->notify(plh, &plr); if (pl->proposer.timer1) del_timer(&pl->proposer.timer1); if (pl->proposer.timer2) del_timer(&pl->proposer.timer2); if (pl->acceptor.timer1) del_timer(&pl->acceptor.timer1); if (pl->acceptor.timer2) del_timer(&pl->acceptor.timer2); if (pl->failover) paxos_lease_acquire(plh, NOT_CLEAR_RELEASE, 1, NULL); } static void lease_retry(unsigned long data) { struct paxos_lease *pl = (struct paxos_lease *)data; struct paxos_lease_value value; int round; log_debug("lease_retry ..."); if (pl->proposer.timer2) del_timer(&pl->proposer.timer2); - if (pl->owner != -1) { + if (pl->owner != NO_OWNER) { log_debug("someone already got the lease, no need to retry"); return; } memset(&value, 0, sizeof(struct paxos_lease_value)); strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1); value.owner = myid; value.expiry = pl->expiry; pl->action.op = OP_START_LEASE; /** * We don't know whether the lease_retry after ticket grant * is manual or not, so set clear as NOT_CLEAR_RELEASE is * the only safe choice. **/ pl->action.clear = NOT_CLEAR_RELEASE; round = paxos_round_request(pl->pih, &value, &pl->acceptor.round, end_paxos_request); if (round > 0) pl->proposer.round = round; } int paxos_lease_acquire(pl_handle_t handle, int clear, int renew, void (*end_acquire) (pl_handle_t handle, int result)) { struct paxos_lease *pl = (struct paxos_lease *)handle; struct paxos_lease_value value; int round; memset(&value, 0, sizeof(struct paxos_lease_value)); strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1); value.owner = myid; value.expiry = pl->expiry; pl->renew = renew; pl->end_lease = end_acquire; pl->action.op = OP_START_LEASE; pl->action.clear = clear; round = paxos_round_request(pl->pih, &value, &pl->acceptor.round, end_paxos_request); pl->proposer.timer2 = add_timer(1 * pl->expiry / 10, (unsigned long)pl, lease_retry); if (round > 0) pl->proposer.round = round; return (round < 0)? -1: round; } int paxos_lease_release(pl_handle_t handle, void (*end_release) (pl_handle_t handle, int result)) { struct paxos_lease *pl = (struct paxos_lease *)handle; struct paxos_lease_value value; int round; log_debug("enter paxos_lease_release"); if (pl->owner != myid) { log_error("can not release the lease " "because I'm not the lease owner"); return -1; } memset(&value, 0, sizeof(struct paxos_lease_value)); strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1); pl->end_lease = end_release; pl->action.op = OP_STOP_LEASE; round = paxos_round_request(pl->pih, &value, &pl->acceptor.round, end_paxos_request); if (round > 0) pl->proposer.round = round; log_debug("exit paxos_lease_release"); return (round < 0)? -1: round; } static int lease_catchup(pi_handle_t handle) { struct paxos_lease *pl; struct paxos_lease_result plr; if (!find_paxos_lease(handle, &pl)) return -1; p_l_op->catchup(pl->name, &pl->owner, &pl->acceptor.round, &pl->expires); log_debug("catchup result: name: %s, owner: %d, ballot: %d, expires: %llu", (char *)pl->name, pl->owner, pl->acceptor.round, pl->expires); /** * 1. If no site hold the ticket, the relet will be set LEASE_STOPPED. * Grant commond will set the relet to LEASE_STARTED first, so we don't * need worry about it. * 2. If someone hold the ticket, the relet will be set LEASE_STARTED. * Because we must make sure that the site can renew, and relet also * must be set to LEASE_STARTED. **/ if (-1 == pl->owner) { pl->release = LEASE_STOPPED; return 0; } else pl->release = LEASE_STARTED; if (current_time() > pl->expires) { - plr.owner = pl->owner = -1; + plr.owner = pl->owner = NO_OWNER; plr.expires = pl->expires = 0; strcpy(plr.name, pl->name); p_l_op->notify((pl_handle_t)pl, &plr); return 0; } if (pl->owner == myid) { pl->acceptor.timer2 = add_timer(pl->expires - current_time(), (unsigned long)pl, lease_expires); if (current_time() < pl->expires - 1 * pl->expiry / 5) pl->proposer.timer1 = add_timer(pl->expires - 1 * pl->expiry / 5 - current_time(), (unsigned long)pl, renew_expires); } else pl->acceptor.timer2 = add_timer(pl->expires - current_time(), (unsigned long)pl, lease_expires); pl->proposer.round = pl->acceptor.round; plr.owner = pl->owner; plr.expires = pl->expires; plr.ballot = pl->acceptor.round; strcpy(plr.name, pl->name); p_l_op->notify((pl_handle_t)pl, &plr); return 0; } -static int lease_prepare(pi_handle_t handle, void *header) +int lease_prepare(pi_handle_t handle, struct boothc_ticket_msg *msg) { struct paxos_lease_msghdr *msghdr = header; struct paxos_lease *pl; log_debug("enter lease_prepare"); if (!find_paxos_lease(handle, &pl)) return -1; msghdr->op = htonl(pl->action.op); msghdr->clear = htonl(pl->action.clear); /** * Action of paxos_lease is only used to pass args, * so clear it now **/ memset(&pl->action, 0, sizeof(struct lease_action)); log_debug("exit lease_prepare"); return 0; } static inline int start_lease_is_prepared(pi_handle_t handle __attribute__((unused)), void *header) { struct paxos_lease_msghdr *hdr = header; log_debug("enter start_lease_is_prepared"); if (hdr->leased) { log_debug("already leased"); return 0; } else { log_debug("not leased"); return 1; } } static inline int stop_lease_is_prepared(pi_handle_t handle __attribute__((unused)), void *header __attribute__((unused))) { log_debug("enter stop_lease_is_prepared"); return 1; } -static int lease_is_prepared(pi_handle_t handle, void *header) +int lease_is_prepared(struct paxos_instance *pi, struct boothc_ticket_msg *msg) { - struct paxos_lease_msghdr *hdr = header; int ret = 0; - int op = ntohl(hdr->op); + int op = ntohl(msg->ticket.op); log_debug("enter lease_is_prepared"); - assert(OP_START_LEASE == op || OP_STOP_LEASE == op); switch (op) { case OP_START_LEASE: - ret = start_lease_is_prepared(handle, header); + ret = start_lease_is_prepared(pi, msg); break; case OP_STOP_LEASE: - ret = stop_lease_is_prepared(handle, header); + ret = stop_lease_is_prepared(pi, msg); break; + default: + assert(!"wrong lease op"); } log_debug("exit lease_is_prepared"); return ret; } static int start_lease_promise(pi_handle_t handle, void *header) { struct paxos_lease_msghdr *hdr = header; struct paxos_lease *pl; int clear = ntohl(hdr->clear); log_debug("enter start_lease_promise"); if (!find_paxos_lease(handle, &pl)) return -1; if (NOT_CLEAR_RELEASE == clear && LEASE_STOPPED == pl->release) { log_debug("could not be leased"); hdr->leased = 1; } else if (-1 == pl->owner) { log_debug("has not been leased"); hdr->leased = 0; } else { log_debug("has been leased"); hdr->leased = 1; } if (hdr->leased == 1) { log_error("the proposal collided"); return -1; } log_debug("exit start_lease_promise"); return 0; } static int stop_lease_promise(pi_handle_t handle, void *header __attribute__((unused))) { struct paxos_lease *pl; log_debug("enter stop_lease_promise"); if (!find_paxos_lease(handle, &pl)) return -1; log_debug("exit stop_lease_promise"); return 0; } static int lease_promise(pi_handle_t handle, void *header) { struct paxos_lease_msghdr *hdr = header; int ret = 0; int op = ntohl(hdr->op); log_debug("enter lease_promise"); assert(OP_START_LEASE == op || OP_STOP_LEASE == op); switch (op) { case OP_START_LEASE: ret = start_lease_promise(handle, header); break; case OP_STOP_LEASE: ret = stop_lease_promise(handle, header); break; } log_debug("exit lease_promise"); return ret; } static int start_lease_propose(pi_handle_t handle, void *extra, int round, void *value) { struct paxos_lease *pl; log_debug("enter start_lease_propose"); if (!find_paxos_lease(handle, &pl)) return -1; if (round != pl->proposer.round) { log_error("current round is not the proposer round, " "current round: %d, proposer round: %d", round, pl->proposer.round); return -1; } if (!pl->proposer.plv) { pl->proposer.plv = malloc(sizeof(struct paxos_lease_value)); if (!pl->proposer.plv) { log_error("could not alloc mem for propsoer plv"); return -ENOMEM; } } memcpy(pl->proposer.plv, value, sizeof(struct paxos_lease_value)); if (pl->proposer.timer1) del_timer(&pl->proposer.timer1); if (pl->renew) { pl->proposer.timer1 = add_timer(4 * pl->expiry / 5, (unsigned long)pl, renew_expires); pl->proposer.expires = current_time() + 4 * pl->expiry / 5; } else { pl->proposer.timer1 = add_timer(pl->expiry, (unsigned long)pl, lease_expires); pl->proposer.expires = current_time() + pl->expiry; } log_debug("exit start_lease_propose"); return 0; } static int stop_lease_propose(pi_handle_t handle, void *extra __attribute__((unused)), int round, void *value) { struct paxos_lease *pl; log_debug("enter stop_lease_propose"); if (!find_paxos_lease(handle, &pl)) return -1; if (round != pl->proposer.round) { log_error("current round is not the proposer round, " "current round: %d, proposer round: %d", round, pl->proposer.round); return -1; } if (!pl->proposer.plv) { pl->proposer.plv = malloc(sizeof(struct paxos_lease_value)); if (!pl->proposer.plv) { log_error("could not alloc mem for propsoer plv"); return -ENOMEM; } } memcpy(pl->proposer.plv, value, sizeof(struct paxos_lease_value)); log_debug("exit stop_lease_propose"); return 0; } -static int lease_propose(pi_handle_t handle, void *extra, +static int lease_propose(pi_handle_t handle, struct boothc_ticket_msg *msg, int round, void *value) { struct paxos_lease_msghdr *hdr = extra; int ret = 0; int op = ntohl(hdr->op); log_debug("enter lease_propose"); assert(OP_START_LEASE == op || OP_STOP_LEASE == op); switch (op) { case OP_START_LEASE: ret = start_lease_propose(handle, extra, round, value); break; case OP_STOP_LEASE: ret = stop_lease_propose(handle, extra, round, value); break; } log_debug("exit lease_propose"); return ret; } static int start_lease_accepted(pi_handle_t handle, void *extra, int round, void *value) { struct paxos_lease_msghdr *hdr = extra; struct paxos_lease *pl; log_debug("enter start_lease_accepted"); if (!find_paxos_lease(handle, &pl)) return -1; pl->acceptor.round = round; if (NOT_CLEAR_RELEASE == hdr->clear && LEASE_STOPPED == pl->release) { log_debug("could not be leased"); return -1; } if (!pl->acceptor.plv) { pl->acceptor.plv = malloc(sizeof(struct paxos_lease_value)); if (!pl->acceptor.plv) { log_error("could not alloc mem for acceptor plv"); return -ENOMEM; } } memcpy(pl->acceptor.plv, value, sizeof(struct paxos_lease_value)); if (pl->acceptor.timer1 && pl->acceptor.timer2 != pl->acceptor.timer1) del_timer(&pl->acceptor.timer1); pl->acceptor.timer1 = add_timer(pl->expiry, (unsigned long)pl, lease_expires); pl->acceptor.expires = current_time() + pl->expiry; log_debug("exit start_lease_accepted"); return 0; } static int stop_lease_accepted(pi_handle_t handle, void *extra __attribute__((unused)), int round, void *value) { struct paxos_lease *pl; log_debug("enter stop_lease_accepted"); if (!find_paxos_lease(handle, &pl)) return -1; pl->acceptor.round = round; if (!pl->acceptor.plv) { pl->acceptor.plv = malloc(sizeof(struct paxos_lease_value)); if (!pl->acceptor.plv) { log_error("could not alloc mem for acceptor plv"); return -ENOMEM; } } memcpy(pl->acceptor.plv, value, sizeof(struct paxos_lease_value)); log_debug("exit stop_lease_accepted"); return 0; } static int lease_accepted(pi_handle_t handle, void *extra, int round, void *value) { struct paxos_lease_msghdr *hdr = extra; int ret = 0; int op = ntohl(hdr->op); log_debug("enter lease_accepted"); assert(OP_START_LEASE == op || OP_STOP_LEASE == op); switch (op) { case OP_START_LEASE: ret = start_lease_accepted(handle, extra, round, value); break; case OP_STOP_LEASE: ret = stop_lease_accepted(handle, extra, round, value); break; } log_debug("exit lease_accepted"); return ret; } static int start_lease_commit(pi_handle_t handle, void *extra, int round) { struct paxos_lease *pl; struct paxos_lease_result plr; log_debug("enter start_lease_commit"); if (!find_paxos_lease(handle, &pl)) return -1; if (round != pl->proposer.round) { log_error("current round is not the proposer round, " "current round: %d, proposer round: %d", round, pl->proposer.round); return -1; } pl->release = LEASE_STARTED; pl->owner = pl->proposer.plv->owner; pl->expiry = pl->proposer.plv->expiry; if (pl->acceptor.timer2 != pl->acceptor.timer1) { if (pl->acceptor.timer2) del_timer(&pl->acceptor.timer2); pl->acceptor.timer2 = pl->acceptor.timer1; } strcpy(plr.name, pl->proposer.plv->name); plr.owner = pl->proposer.plv->owner; plr.expires = current_time() + pl->proposer.plv->expiry; plr.ballot = round; p_l_op->notify((pl_handle_t)pl, &plr); log_debug("exit start_lease_commit"); return 0; } static int stop_lease_commit(pi_handle_t handle, void *extra __attribute__((unused)), int round) { struct paxos_lease *pl; struct paxos_lease_result plr; log_debug("enter stop_lease_commit"); if (!find_paxos_lease(handle, &pl)) return -1; if (round != pl->proposer.round) { log_error("current round is not the proposer round, " "current round: %d, proposer round: %d", round, pl->proposer.round); return -1; } if (pl->acceptor.timer2) del_timer(&pl->acceptor.timer2); if (pl->acceptor.timer1) del_timer(&pl->acceptor.timer1); if (pl->proposer.timer2) del_timer(&pl->proposer.timer2); if (pl->proposer.timer1) del_timer(&pl->proposer.timer1); pl->release = LEASE_STOPPED; strcpy(plr.name, pl->proposer.plv->name); - plr.owner = pl->owner = -1; + plr.owner = pl->owner = NO_OWNER; plr.ballot = round; plr.expires = 0; p_l_op->notify((pl_handle_t)pl, &plr); log_debug("exit stop_lease_commit"); return 0; } static int lease_commit(pi_handle_t handle, void *extra, int round) { struct paxos_lease_msghdr *hdr = extra; int ret = 0; int op = ntohl(hdr->op); log_debug("enter lease_commit"); assert(OP_START_LEASE == op || OP_STOP_LEASE == op); switch (op) { case OP_START_LEASE: ret = start_lease_commit(handle, extra, round); break; case OP_STOP_LEASE: ret = stop_lease_commit(handle, extra, round); break; } log_debug("exit lease_commit"); return ret; } static int start_lease_learned(pi_handle_t handle, void *extra, int round) { struct paxos_lease *pl; struct paxos_lease_result plr; log_debug("enter start_lease_learned"); if (!find_paxos_lease(handle, &pl)) return -1; if (round != pl->acceptor.round) { log_error("current round is not the acceptor round, " "current round: %d, acceptor round: %d", round, pl->acceptor.round); return -1; } if (!pl->acceptor.plv) return -1; pl->release = LEASE_STARTED; pl->owner = pl->acceptor.plv->owner; pl->expiry = pl->acceptor.plv->expiry; if (pl->acceptor.timer2 != pl->acceptor.timer1) { if (pl->acceptor.timer2) del_timer(&pl->acceptor.timer2); pl->acceptor.timer2 = pl->acceptor.timer1; } strcpy(plr.name, pl->acceptor.plv->name); plr.owner = pl->acceptor.plv->owner; plr.expires = current_time() + pl->acceptor.plv->expiry; plr.ballot = round; p_l_op->notify((pl_handle_t)pl, &plr); log_debug("exit start_lease_learned"); return 0; } static int stop_lease_learned(pi_handle_t handle, void *extra __attribute__((unused)), int round) { struct paxos_lease *pl; struct paxos_lease_result plr; log_debug("enter stop_lease_learned"); if (!find_paxos_lease(handle, &pl)) return -1; if (round != pl->acceptor.round) { log_error("current round is not the acceptor round, " "current round: %d, acceptor round: %d", round, pl->acceptor.round); return -1; } if (!pl->acceptor.plv) return -1; if (pl->acceptor.timer2) del_timer(&pl->acceptor.timer2); if (pl->acceptor.timer1) del_timer(&pl->acceptor.timer1); pl->release = LEASE_STOPPED; strcpy(plr.name, pl->acceptor.plv->name); - plr.owner = pl->owner = -1; + plr.owner = pl->owner = NO_OWNER; plr.ballot = round; plr.expires = 0; p_l_op->notify((pl_handle_t)pl, &plr); log_debug("exit stop_lease_learned"); return 0; } static int lease_learned(pi_handle_t handle, void *extra, int round) { struct paxos_lease_msghdr *hdr = extra; int ret = 0; int op = ntohl(hdr->op); log_debug("enter lease_learned"); assert(OP_START_LEASE == op || OP_STOP_LEASE == op); switch (op) { case OP_START_LEASE: ret = start_lease_learned(handle, extra, round); break; case OP_STOP_LEASE: ret = stop_lease_learned(handle, extra, round); break; } log_debug("exit lease_learned"); return ret; } pl_handle_t paxos_lease_init(const void *name, unsigned int namelen, int expiry, int number, int failover, unsigned char *role, int *prio, const struct paxos_lease_operations *pl_op) { ps_handle_t psh; pi_handle_t pih; struct paxos_lease *lease; if (namelen > PAXOS_NAME_LEN) { log_error("length of paxos name is too long (%u)", namelen); return -EINVAL; } if (myid == -1) myid = pl_op->get_myid(); if (!ps_handle) { px_op = malloc(sizeof(struct paxos_operations)); if (!px_op) { log_error("could not alloc for paxos operations"); return -ENOMEM; } memset(px_op, 0, sizeof(struct paxos_operations)); px_op->get_myid = pl_op->get_myid; px_op->send = pl_op->send; px_op->broadcast = pl_op->broadcast; px_op->catchup = lease_catchup; px_op->prepare = lease_prepare; px_op->is_prepared = lease_is_prepared; px_op->promise = lease_promise; px_op->propose = lease_propose; px_op->accepted = lease_accepted; px_op->commit = lease_commit; px_op->learned = lease_learned; p_l_op = pl_op; psh = paxos_space_init(PAXOS_LEASE_SPACE, number, sizeof(struct paxos_lease_msghdr), PLEASE_VALUE_LEN, role, px_op); if (psh <= 0) { log_error("failed to initialize paxos space: %ld", psh); free(px_op); px_op = NULL; return psh; } ps_handle = psh; } lease = malloc(sizeof(struct paxos_lease)); if (!lease) { log_error("cound not alloc for paxos lease"); return -ENOMEM; } memset(lease, 0, sizeof(struct paxos_lease)); strncpy(lease->name, name, PAXOS_NAME_LEN + 1); - lease->owner = -1; + lease->owner = NO_OWNER; lease->expiry = expiry; lease->failover = failover; list_add_tail(&lease->list, &lease_head); pih = paxos_instance_init(ps_handle, name, prio); if (pih <= 0) { log_error("failed to initialize paxos instance: %ld", pih); free(lease); return pih; } lease->pih = pih; return (pl_handle_t)lease; } int paxos_lease_status_recovery(pl_handle_t handle) { struct paxos_lease *pl = (struct paxos_lease *)handle; if (paxos_recovery_status_get(pl->pih) == 1) { pl->renew = 1; if (paxos_catchup(pl->pih) == 0) paxos_recovery_status_set(pl->pih, 0); } return 0; } -int paxos_lease_on_receive(void *msg, int msglen) -{ - return paxos_recvmsg(msg, msglen); -} - +//int paxos_lease_on_receive(void *msg, int msglen) +//{ +// return paxos_recvmsg(msg, msglen); +//} +// int paxos_lease_exit(pl_handle_t handle) { struct paxos_lease *pl = (struct paxos_lease *)handle; if (px_op) free(px_op); if (pl->proposer.plv) free(pl->proposer.plv); if (pl->proposer.timer1) del_timer(&pl->proposer.timer1); if (pl->proposer.timer2) del_timer(&pl->proposer.timer2); if (pl->acceptor.plv) free(pl->acceptor.plv); if (pl->acceptor.timer1) del_timer(&pl->acceptor.timer1); if (pl->acceptor.timer2) del_timer(&pl->acceptor.timer2); return 0; } diff --git a/src/paxos_lease.h b/src/paxos_lease.h index e541b0e..321f4f4 100644 --- a/src/paxos_lease.h +++ b/src/paxos_lease.h @@ -1,74 +1,80 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _PAXOS_LEASE_H #define _PAXOS_LEASE_H +#include "booth.h" +#include "paxos.h" + #define PLEASE_NAME_LEN 63 #define NOT_CLEAR_RELEASE 0 #define CLEAR_RELEASE 1 typedef long pl_handle_t; struct paxos_lease_result { char name[PLEASE_NAME_LEN+1]; int owner; int ballot; unsigned long long expires; }; struct paxos_lease_operations { int (*get_myid) (void); int (*send) (unsigned long id, void *value, int len); int (*broadcast) (void *value, int len); int (*catchup) (const void *name, int *owner, int *ballot, unsigned long long *expires); int (*notify) (pl_handle_t handle, struct paxos_lease_result *result); }; pl_handle_t paxos_lease_init(const void *name, unsigned int namelen, int expiry, int number, int failover, unsigned char *role, int *prio, const struct paxos_lease_operations *pl_op); int paxos_lease_on_receive(void *msg, int msglen); int paxos_lease_acquire(pl_handle_t handle, int clear, int renew, void (*end_acquire) (pl_handle_t handle, int result)); /* int paxos_lease_owner_get(const void *name); int paxos_lease_epoch_get(const void *name); int paxos_lease_timeout(const void *name); */ int paxos_lease_status_recovery(pl_handle_t handle); int paxos_lease_release(pl_handle_t handle, void (*end_release) (pl_handle_t handle, int result)); int paxos_lease_exit(pl_handle_t handle); +int lease_prepare(struct paxos_instance *pi, struct boothc_ticket_msg *msg); +int lease_is_prepared(struct paxos_instance *pi, struct boothc_ticket_msg *msg); + #endif /* _PAXOS_LEASE_H */ diff --git a/src/ticket.c b/src/ticket.c index 2a6d23e..c0cc33b 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -1,626 +1,595 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include "ticket.h" #include "config.h" #include "pacemaker.h" #include "list.h" #include "log.h" #include "booth.h" #include "timer.h" #include "paxos_lease.h" #include "paxos.h" #define PAXOS_MAGIC 0xDB12 #define TK_LINE 256 #define CATCHED_VALID_TMSG 1 +#if 0 struct booth_msghdr { uint16_t magic; uint16_t checksum; uint32_t len; char data[0]; } __attribute__((packed)); -struct ticket_msg { - char id[BOOTH_NAME_LEN+1]; - uint32_t owner; - uint32_t expiry; - uint32_t ballot; - uint32_t result; -} __attribute__((packed)); - struct ticket { char id[BOOTH_NAME_LEN+1]; pl_handle_t handle; int owner; int expiry; int ballot; unsigned long long expires; struct list_head list; }; static LIST_HEAD(ticket_list); +/* Put into node data */ static unsigned char *role; +#endif + +#define foreach_ticket(i_,t_) for(i=0; (t_=booth_conf->ticket+i, iticket_count); i++) +#define foreach_node(i_,n_) for(i=0; (n_=booth_conf->node+i, inode_count); i++) /* Untrusted input, must fit (incl. \0) in a buffer of max chars. */ int check_max_len_valid(char *s, int max) { int i; for(i=0; iticket[0].name))) - return 0; + for (i = 0; i < booth_conf->ticket_count; i++) { + if (!strcmp(booth_conf->ticket[i].name, ticket)) { + if (found) + *found = booth_conf->ticket + i; + return 1; + } + } + + return 0; +} + +int find_ticket_by_handle(pl_handle_t handle, struct ticket_config **found) +{ + int i; + + if (found) + *found = NULL; for (i = 0; i < booth_conf->ticket_count; i++) { - if (!strcmp(booth_conf->ticket[i].name, ticket)) + if (booth_conf->ticket[i].handle == handle) { + if (found) + *found = booth_conf->ticket + i; return 1; + } } return 0; } + +int check_ticket(char *ticket, struct ticket_config **found) +{ + if (found) + *found = NULL; + if (!booth_conf) + return 0; + + if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name))) + return 0; + return find_ticket_by_name(ticket, found); +} + int check_site(char *site, int *is_local) { struct booth_node *node; if (!check_max_len_valid(site, sizeof(node->addr_string))) return 0; if (find_site_in_config(site, &node)) { *is_local = node->local; return 1; } return 0; } static int * ticket_priority(int i) { int j; /* TODO: need more precise check */ + /* WHAT???? node_count ticket?*/ for (j = 0; j < booth_conf->node_count; j++) { if (booth_conf->ticket[i].weight[j] == 0) return NULL; } return booth_conf->ticket[i].weight; } static int ticket_get_myid(void) { return transport()->get_myid(); } static void end_acquire(pl_handle_t handle, int error) { - struct ticket *tk; - int found = 0; + struct ticket_config *tk; log_debug("enter end_acquire"); - list_for_each_entry(tk, &ticket_list, list) { - if (tk->handle == handle) { - found = 1; - break; - } - } - - if (!found) { + if (!find_ticket_by_handle(handle, &tk)) { log_error("BUG: ticket handle %ld does not exist", handle); return; } if (error) - log_info("ticket %s was granted failed (site %d), error:%s", - tk->id, ticket_get_myid(), strerror(error)); + log_info("ticket %s failed granting for site %s, error:%s", + tk->name, local->addr_string, strerror(error)); else - log_info("ticket %s was granted successfully (site %d)", - tk->id, ticket_get_myid()); + log_info("ticket %s was granted successfully for site %s", + tk->name, local->addr_string); log_debug("exit end_acquire"); } static void end_release(pl_handle_t handle, int error) { - struct ticket *tk; - int found = 0; + struct ticket_config *tk; log_debug("enter end_release"); - list_for_each_entry(tk, &ticket_list, list) { - if (tk->handle == handle) { - found = 1; - break; - } - } - - if (!found) { + if (!find_ticket_by_handle(handle, &tk)) { log_error("BUG: ticket handle %ld does not exist", handle); return; } if (error) - log_info("ticket %s was reovked failed (site %d), error:%s", - tk->id, ticket_get_myid(), strerror(error)); + log_info("ticket %s failed revoking on site %s, error:%s", + tk->name, local->addr_string, strerror(error)); else - log_info("ticket %s was reovked successfully (site %d)", - tk->id, ticket_get_myid()); + log_info("ticket %s was revoked successfully on site %s", + tk->name, local->addr_string); log_debug("exit end_release"); } static int ticket_send(unsigned long id, void *value, int len) { +#if 0 int i, rv = -1; struct booth_node *to = NULL; - struct booth_msghdr *hdr; - void *buf; + struct boothc_ticket_msg msg; - for (i = 0; i < booth_conf->node_count; i++) { - if (booth_conf->node[i].nodeid == id) + foreach_node(i, to) + if (booth_conf->node[i].nodeid == id) { to = booth_conf->node+i; - } + break; + } if (!to) return rv; - buf = malloc(sizeof(struct booth_msghdr) + len); - if (!buf) - return -ENOMEM; - memset(buf, 0, sizeof(struct booth_msghdr) + len); - hdr = buf; + memset(&msg, 0, sizeof(msg)); hdr->magic = htons(PAXOS_MAGIC); hdr->len = htonl(sizeof(struct booth_msghdr) + len); memcpy((char *)buf + sizeof(struct booth_msghdr), value, len); rv = transport()->send(to, buf, sizeof(struct booth_msghdr) + len); - free(buf); + frdee(buf); + */ return rv; +#endif + assert(0); } static int ticket_broadcast(void *value, int vlen) { struct booth_msghdr *hdr; - int tlen = sizeof(*hdr) + vlen; + int tlen ; +#if 0 + = sizeof(*hdr) + vlen; char buf[tlen]; hdr = (void*)buf; hdr->magic = htons(PAXOS_MAGIC); hdr->len = htonl(tlen); memcpy(hdr->data, value, vlen); +#endif return transport()->broadcast(hdr, tlen); } #if 0 static int ticket_read(const void *name, int *owner, int *ballot, unsigned long long *expires) { struct ticket *tk; int found = 0; list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, name)) { found = 1; break; } } if (!found) { log_error("BUG: ticket_read failed (ticket %s does not exist)", (char *)name); return -1; } pcmk_handler.load_ticket(tk->id, &tk->owner, &tk->ballot, &tk->expires); *owner = tk->owner; *expires = tk->expires; *ballot = tk->ballot; return 0; } #endif -static int ticket_parse(struct ticket_msg *tmsg) +static int ticket_parse(struct boothc_ticket_msg *tmsg) { - struct ticket *tk; - int found = 0; + struct ticket_config *tk; - list_for_each_entry(tk, &ticket_list, list) { - if (!strcmp(tk->id, tmsg->id)) { - found = 1; - if (tk->ballot < tmsg->ballot) - tk->ballot = tmsg->ballot; - if (CATCHED_VALID_TMSG == tmsg->result) { - tk->owner = tmsg->owner; - tk->expires = current_time() + tmsg->expiry; - } - break; - } - } - - if (!found) + if (!find_ticket_by_name(tmsg->ticket.id, &tk)) return -1; - else - return 0; + + if (tk->ballot < ntohl(tmsg->ticket.ballot)) + tk->ballot = ntohl(tmsg->ticket.ballot); + if (CATCHED_VALID_TMSG == ntohl(tmsg->header.result)) { + tk->owner = ntohl(tmsg->ticket.owner); + tk->expires = current_time() + ntohl(tmsg->header.expiry); + } } static int ticket_catchup(const void *name, int *owner, int *ballot, unsigned long long *expires) -{ - struct ticket *tk; - int i, buflen, rv = 0; - char *buf = NULL; - struct boothc_header *h; +{ + struct ticket_config *tk; + int i, rv = 0; struct booth_node *node; - struct ticket_msg *tmsg; - int myid = ticket_get_myid(); - - if (booth_conf->node[myid].type != ARBITRATOR) { - list_for_each_entry(tk, &ticket_list, list) { - if (!strcmp(tk->id, name)) { - pcmk_handler.load_ticket(tk->id, - &tk->owner, - &tk->ballot, - &tk->expires); - if (current_time() >= tk->expires) { - tk->owner = -1; - tk->expires = 0; - } - } + struct boothc_ticket_msg msg; + time_t now; + + time(&now); + if (local->type != ARBITRATOR && + find_ticket_by_name(name, &tk)) { + pcmk_handler.load_ticket(tk->name, + &tk->owner, + &tk->ballot, + &tk->expires); + if (now >= tk->expires) { + tk->owner = NO_OWNER; + tk->expires = 0; } } - buflen = sizeof(struct boothc_header) + sizeof(struct ticket_msg); - buf = malloc(buflen); - if (!buf) - return -ENOMEM; - memset(buf, 0, buflen); - - h = (struct boothc_header *)buf; - h->magic = BOOTHC_MAGIC; - h->version = BOOTHC_VERSION; - h->cmd = BOOTHC_CMD_CATCHUP; - h->len = sizeof(struct ticket_msg); - tmsg = (struct ticket_msg *)(buf + sizeof(struct boothc_header)); - - for (i = 0; i < booth_conf->node_count; i++) { - node = booth_conf->node + i; + foreach_node(i, node) { if (node->type == SITE && - !(node->local)) { - strncpy(tmsg->id, name, BOOTH_NAME_LEN + 1); + !(node->local)) { + init_ticket_msg(&msg, BOOTHC_CMD_CATCHUP); + strncpy(msg.ticket.id, name, BOOTH_NAME_LEN + 1); + log_debug("attempting catchup from %s", node->addr_string); + rv = booth_transport[TCP].open(node); if (rv < 0) continue; log_debug("connected to %s", node->addr_string); - rv = booth_transport[TCP].send(node, buf, buflen); - if (rv < 0) { + + rv = booth_transport[TCP].send(node, &msg, sizeof(msg)); + if (rv != sizeof(msg)) goto close; - } + log_debug("sent catchup command to %s", node->addr_string); - memset(tmsg, 0, sizeof(struct ticket_msg)); - rv = booth_transport[TCP].recv(node, buf, buflen); - if (rv < 0) { - booth_transport[TCP].close(node); - continue; - } - ticket_parse(tmsg); + + rv = booth_transport[TCP].recv(node, &msg, sizeof(&msg)); + if (rv != sizeof(msg)) + goto close; + + log_debug("got catchup result from %s", node->addr_string); + ticket_parse(&msg); close: booth_transport[TCP].close(node); - memset(tmsg, 0, sizeof(struct ticket_msg)); } } - - list_for_each_entry(tk, &ticket_list, list) { - if (!strcmp(tk->id, name)) { - if (booth_conf->node[myid].type != ARBITRATOR) { - if (current_time() >= tk->expires) { - tk->owner = -1; - tk->expires = 0; - } - pcmk_handler.store_ticket(tk->id, - tk->owner, - tk->ballot, - tk->expires); - if (tk->owner == myid) - pcmk_handler.grant_ticket(tk->id); - else - pcmk_handler.revoke_ticket(tk->id); + + + if (find_ticket_by_name(name, &tk)) { + if (local->type != ARBITRATOR) { + if (current_time() >= tk->expires) { + tk->owner = NO_OWNER; + tk->expires = 0; } - *owner = tk->owner; - *expires = tk->expires; - *ballot = tk->ballot; + pcmk_handler.store_ticket(tk->name, + tk->owner, + tk->ballot, + tk->expires); + if (tk->owner == ticket_get_myid()) + pcmk_handler.grant_ticket(tk->name); + else + pcmk_handler.revoke_ticket(tk->name); } + *owner = tk->owner; + *expires = tk->expires; + *ballot = tk->ballot; } - free(buf); return rv; } static int ticket_write(pl_handle_t handle, struct paxos_lease_result *result) { - struct ticket *tk; - int found = 0; - - list_for_each_entry(tk, &ticket_list, list) { - if (tk->handle == handle) { - found = 1; - break; - } - } - if (!found) { + struct ticket_config *tk; + + if (!find_ticket_by_handle(handle, &tk)) { log_error("BUG: ticket_write failed " "(ticket handle %ld does not exist)", handle); return -1; } + /* TODO: ntohl? */ tk->owner = result->owner; tk->expires = result->expires; tk->ballot = result->ballot; + pcmk_handler.store_ticket(tk->name, tk->owner, tk->ballot, tk->expires); if (tk->owner == ticket_get_myid()) { - pcmk_handler.store_ticket(tk->id, tk->owner, tk->ballot, tk->expires); - pcmk_handler.grant_ticket(tk->id); - } else if (tk->owner == -1) { - pcmk_handler.store_ticket(tk->id, tk->owner, tk->ballot, tk->expires); - pcmk_handler.revoke_ticket(tk->id); - } else - pcmk_handler.store_ticket(tk->id, tk->owner, tk->ballot, tk->expires); - - return 0; + pcmk_handler.grant_ticket(tk->name); + } else if (tk->owner == NO_OWNER) { + pcmk_handler.revoke_ticket(tk->name); + } + + return 0; } static void ticket_status_recovery(pl_handle_t handle) { paxos_lease_status_recovery(handle); } -int ticket_recv(void *msg, int msglen) +int ticket_recv(struct boothc_ticket_msg *msg, int msglen) { struct booth_msghdr *hdr; char *data; - hdr = msg; - if (ntohs(hdr->magic) != PAXOS_MAGIC || - ntohl(hdr->len) != msglen) { - log_error("message received error"); + if (check_boothc_header(hdr, sizeof(*msg)) < 0 || + msglen != sizeof(*msg)) { + log_error("message receive error"); return -1; } - data = (char *)msg + sizeof(struct booth_msghdr); - - return paxos_lease_on_receive(data, - msglen - sizeof(struct booth_msghdr)); + return paxos_recvmsg(msg); } int grant_ticket(char *ticket) { struct ticket *tk; int found = 0; list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, ticket)) { found = 1; break; } } if (!found) { log_error("ticket %s does not exist", ticket); return BOOTHC_RLT_SYNC_FAIL; } if (tk->owner == ticket_get_myid()) return BOOTHC_RLT_SYNC_SUCC; else { int ret = paxos_lease_acquire(tk->handle, CLEAR_RELEASE, 1, end_acquire); if (ret >= 0) tk->ballot = ret; return (ret < 0)? BOOTHC_RLT_SYNC_FAIL: BOOTHC_RLT_ASYNC; } } int revoke_ticket(char *ticket) { struct ticket *tk; int found = 0; list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, ticket)) { found = 1; break; } } if (!found) { log_error("ticket %s does not exist", ticket); return BOOTHC_RLT_SYNC_FAIL; } - if (tk->owner == -1) + if (tk->owner == NO_OWNER) return BOOTHC_RLT_SYNC_SUCC; else { int ret = paxos_lease_release(tk->handle, end_release); if (ret >= 0) tk->ballot = ret; return (ret < 0)? BOOTHC_RLT_SYNC_FAIL: BOOTHC_RLT_ASYNC; } } int get_ticket_info(char *name, int *owner, int *expires) { struct ticket *tk; list_for_each_entry(tk, &ticket_list, list) { if (!strncmp(tk->id, name, BOOTH_NAME_LEN + 1)) { if(owner) *owner = tk->owner; if(expires) *expires = tk->expires; return 0; } } return -1; } int list_ticket(char **pdata, unsigned int *len) { struct ticket *tk; char timeout_str[100]; char node_name[BOOTH_NAME_LEN]; char tmp[TK_LINE]; *pdata = NULL; *len = 0; list_for_each_entry(tk, &ticket_list, list) { memset(tmp, 0, TK_LINE); strncpy(timeout_str, "INF", sizeof(timeout_str)); strncpy(node_name, "None", sizeof(node_name)); - if (tk->owner < MAX_NODES && tk->owner > -1) + if (tk->owner < MAX_NODES && tk->owner > NO_OWNER) strncpy(node_name, booth_conf->node[tk->owner].addr_string, sizeof(node_name)); if (tk->expires != 0) strftime(timeout_str, sizeof(timeout_str), "%Y/%m/%d %H:%M:%S", localtime((time_t *)&tk->expires)); snprintf(tmp, TK_LINE, "ticket: %s, owner: %s, expires: %s\n", tk->id, node_name, timeout_str); *pdata = realloc(*pdata, *len + TK_LINE); if (*pdata == NULL) return -ENOMEM; memset(*pdata + *len, 0, TK_LINE); memcpy(*pdata + *len, tmp, TK_LINE); *len += TK_LINE; } return 0; } -int catchup_ticket(char **pdata, unsigned int len) +int catchup_ticket(struct ticket_msg *msg, struct ticket_config *tc) { - struct ticket_msg *tmsg; - struct ticket *tk; - - assert(len == sizeof(struct ticket_msg)); - tmsg = (struct ticket_msg *)(*pdata); - list_for_each_entry(tk, &ticket_list, list) { - if (strcmp(tk->id, tmsg->id)) - continue; - - tmsg->ballot = tk->ballot; - if (tk->owner == ticket_get_myid() - && current_time() < tk->expires) { - tmsg->result = CATCHED_VALID_TMSG; - tmsg->expiry = tk->expires - current_time(); - tmsg->owner = tk->owner; - } + msg->ballot = htonl(tk->ballot); + if (tk->owner == ticket_get_myid() + && current_time() < tk->expires) { + msg->expiry = htonl(tk->expires - current_time()); + msg->owner = htonl(tk->owner); + return CATCHED_VALID_TMSG; } - return 0; + return -1; } const struct paxos_lease_operations ticket_operations = { .get_myid = ticket_get_myid, .send = ticket_send, .broadcast = ticket_broadcast, .catchup = ticket_catchup, .notify = ticket_write, }; int setup_ticket(void) { struct ticket *tk, *tmp; int i, rv; pl_handle_t plh; int myid; role = malloc(booth_conf->node_count * sizeof(unsigned char)); if (!role) return -ENOMEM; memset(role, 0, booth_conf->node_count * sizeof(unsigned char)); for (i = 0; i < booth_conf->node_count; i++) { if (booth_conf->node[i].type == SITE) role[i] = PROPOSER | ACCEPTOR | LEARNER; else if (booth_conf->node[i].type == ARBITRATOR) role[i] = ACCEPTOR | LEARNER; } for (i = 0; i < booth_conf->ticket_count; i++) { tk = malloc(sizeof(struct ticket)); if (!tk) { rv = -ENOMEM; goto out; } memset(tk, 0, sizeof(struct ticket)); strcpy(tk->id, booth_conf->ticket[i].name); - tk->owner = -1; + tk->owner = NO_OWNER; tk->expiry = booth_conf->ticket[i].expiry; list_add_tail(&tk->list, &ticket_list); plh = paxos_lease_init(tk->id, BOOTH_NAME_LEN, tk->expiry, booth_conf->node_count, 1, role, ticket_priority(i), &ticket_operations); if (plh <= 0) { log_error("paxos lease initialization failed"); rv = plh; goto out; } tk->handle = plh; } myid = ticket_get_myid(); assert(myid < booth_conf->node_count); if (role[myid] & ACCEPTOR) { list_for_each_entry(tk, &ticket_list, list) { ticket_status_recovery(tk->handle); } } return 0; out: list_for_each_entry_safe(tk, tmp, &ticket_list, list) { list_del(&tk->list); } free(role); return rv; } diff --git a/src/ticket.h b/src/ticket.h index 87327ef..245930a 100644 --- a/src/ticket.h +++ b/src/ticket.h @@ -1,35 +1,40 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _TICKET_H #define _TICKET_H +#include "config.h" + #define DEFAULT_TICKET_EXPIRY 600 -int check_ticket(char *ticket); +int check_ticket(char *ticket, struct ticket_config **tc); int check_site(char *site, int *local); int grant_ticket(char *ticket); int revoke_ticket(char *ticket); int list_ticket(char **pdata, unsigned int *len); -int catchup_ticket(char **pdata, unsigned int len); +int catchup_ticket(struct ticket_msg *msg, struct ticket_config *tc); int ticket_recv(void *msg, int msglen); int setup_ticket(void); int get_ticket_info(char *name, int *owner, int *expires); int check_max_len_valid(char *s, int max); +int find_ticket_by_name(const char *ticket, struct ticket_config **found); +int find_ticket_by_handle(pl_handle_t handle, struct ticket_config **found); + #endif /* _TICKET_H */ diff --git a/src/timer.c b/src/timer.c index dbc8f3b..5871983 100644 --- a/src/timer.c +++ b/src/timer.c @@ -1,103 +1,94 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include "log.h" #include "timer.h" #define MSEC_IN_SEC 1000 extern int poll_timeout; static LIST_HEAD(timer_head); -unsigned long long current_time(void) -{ - struct timeval tv; - - gettimeofday(&tv, NULL); - - return tv.tv_sec; -} - struct timerlist * add_timer(unsigned long expires, unsigned long data, void (*function) (unsigned long data)) { struct timerlist *timer; timer = malloc(sizeof(struct timerlist)); if (!timer) { log_error("failed to alloc mem for timer"); return NULL; } memset(timer, 0, sizeof(struct timerlist)); timer->expires = current_time() + expires; timer->data = data; timer->function = function; list_add_tail(&timer->entry, &timer_head); return timer; } int del_timer(struct timerlist **timer) { (*timer)->expires = -2; (*timer)->data = 0; (*timer)->function = NULL; *timer = NULL; return 0; } void process_timerlist(void) { struct timerlist *timer, *safe; if (list_empty(&timer_head)) return; list_for_each_entry_safe(timer, safe, &timer_head, entry) { if (timer->expires == -2) { list_del(&timer->entry); free(timer); } else if (current_time() >= timer->expires) { timer->expires = -1; timer->function(timer->data); } } } int timerlist_init(void) { poll_timeout = MSEC_IN_SEC; return 0; } void timerlist_exit(void) { struct timerlist *timer, *safe; list_for_each_entry_safe(timer, safe, &timer_head, entry) { list_del(&timer->entry); free(timer); } } diff --git a/src/transport.c b/src/transport.c index 65c38c9..4da2a6d 100644 --- a/src/transport.c +++ b/src/transport.c @@ -1,640 +1,682 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include "list.h" #include "booth.h" #include "log.h" #include "config.h" #include "paxos_lease.h" #include "transport.h" #define BOOTH_IPADDR_LEN (sizeof(struct in6_addr)) #define NETLINK_BUFSIZE 16384 #define SOCKET_BUFFER_SIZE 160000 #define FRAME_SIZE_MAX 10000 extern struct client *client; extern struct pollfd *pollfd; struct booth_node *local = NULL; struct tcp_conn { int s; struct sockaddr to; struct list_head list; }; static LIST_HEAD(tcp); struct udp_context { int s; struct iovec iov_recv; char iov_buffer[FRAME_SIZE_MAX]; } udp; static int (*deliver_fn) (void *msg, int msglen); static void parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int len) { while (RTA_OK(rta, len)) { if (rta->rta_type <= max) tb[rta->rta_type] = rta; rta = RTA_NEXT(rta,len); } } static int find_address(unsigned char ipaddr[BOOTH_IPADDR_LEN], int family, int prefixlen, int fuzzy_allowed, struct booth_node **me) { int i; struct booth_node *node; int bytes, bits_left, mask; unsigned char node_bits, ip_bits; uint8_t *n_a; bytes = prefixlen / 8; bits_left = prefixlen % 8; /* One bit left to check means ignore 7 lowest bits. */ mask = ~( (1 << (8 - bits_left)) -1); for (i = 0; i < booth_conf->node_count; i++) { node = booth_conf->node + i; if (family != node->family) continue; n_a = node_to_addr_pointer(node); if (memcmp(ipaddr, n_a, node->addrlen) == 0) { found: *me = node; return 1; } if (!fuzzy_allowed) continue; // assert(bytes <= node->addrlen); //#include // printf("node->addr %s, fam %d, prefix %d; %llx vs %llx, bytes %d\n", node->addr, node->family, prefixlen, *((long long*)&node->in6), *((long long*)ipaddr), bytes); /* Check prefix, whole bytes */ if (memcmp(ipaddr, n_a, bytes) != 0) continue; //printf("bits %d\n", bits_left); if (!bits_left) goto found; node_bits = n_a[bytes]; ip_bits = ipaddr[bytes]; //printf("nodebits %x ip %x mask %x\n", node_bits, ip_bits, mask); if (((node_bits ^ ip_bits) & mask) == 0) goto found; } return 0; } int _find_myself(int family, struct booth_node **mep, int fuzzy_allowed); int _find_myself(int family, struct booth_node **mep, int fuzzy_allowed) { int fd; struct sockaddr_nl nladdr; struct booth_node *me; unsigned char ipaddr[BOOTH_IPADDR_LEN]; static char rcvbuf[NETLINK_BUFSIZE]; struct { struct nlmsghdr nlh; struct rtgenmsg g; } req; if (local) goto found; me = NULL; if (mep) *mep = NULL; fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE); if (fd < 0) { log_error("failed to create netlink socket"); return 0; } setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); memset(&nladdr, 0, sizeof(nladdr)); nladdr.nl_family = AF_NETLINK; memset(&req, 0, sizeof(req)); req.nlh.nlmsg_len = sizeof(req); req.nlh.nlmsg_type = RTM_GETADDR; req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST; req.nlh.nlmsg_pid = 0; req.nlh.nlmsg_seq = 1; req.g.rtgen_family = family; if (sendto(fd, (void *)&req, sizeof(req), 0, (struct sockaddr*)&nladdr, sizeof(nladdr)) < 0) { close(fd); log_error("failed to send data to netlink socket"); return 0; } while (1) { int status; struct nlmsghdr *h; struct iovec iov = { rcvbuf, sizeof(rcvbuf) }; struct msghdr msg = { (void *)&nladdr, sizeof(nladdr), &iov, 1, NULL, 0, 0 }; status = recvmsg(fd, &msg, 0); if (!status) { close(fd); log_error("failed to recvmsg from netlink socket"); return 0; } h = (struct nlmsghdr *)rcvbuf; if (h->nlmsg_type == NLMSG_DONE) break; if (h->nlmsg_type == NLMSG_ERROR) { close(fd); log_error("netlink socket recvmsg error"); return 0; } while (NLMSG_OK(h, status)) { if (h->nlmsg_type == RTM_NEWADDR) { struct ifaddrmsg *ifa = NLMSG_DATA(h); struct rtattr *tb[IFA_MAX+1]; int len = h->nlmsg_len - NLMSG_LENGTH(sizeof(*ifa)); memset(tb, 0, sizeof(tb)); parse_rtattr(tb, IFA_MAX, IFA_RTA(ifa), len); memset(ipaddr, 0, BOOTH_IPADDR_LEN); memcpy(ipaddr, RTA_DATA(tb[IFA_ADDRESS]), BOOTH_IPADDR_LEN); if (find_address(ipaddr, ifa->ifa_family, ifa->ifa_prefixlen, fuzzy_allowed, &me)) goto out; } h = NLMSG_NEXT(h, status); } } out: close(fd); if (!me) return 0; me->local = 1; local = me; found: if (mep) *mep = local; return 1; } int find_myself(struct booth_node **mep, int fuzzy_allowed) { return _find_myself(AF_INET6, mep, fuzzy_allowed) || _find_myself(AF_INET, mep, fuzzy_allowed); } -static int booth_get_myid(void) + +/** Checks the header fields for validity. + * cf. init_header(). + * For @len_incl_data < 0 the length is not checked. + * Return <0 if error, else bytes read. */ +int check_boothc_header(struct boothc_header *h, int len_incl_data) { - return local ? local->nodeid : -1; + int l; + + if (h->magic != htonl(BOOTHC_MAGIC)) { + log_error("magic error %x", ntohl(h->magic)); + return -EINVAL; + } + if (h->version != htonl(BOOTHC_VERSION)) { + log_error("version error %x", ntohl(h->version)); + return -EINVAL; + } + + + l = ntohl(h->len); + if (l < sizeof(*h) || + l > sizeof(boothc_ticket_site_msg)) { + log_error("length %d out of range", l); + return -EINVAL; + + + if (len_incl_data < 0) + return 0; + + if (l != len_incl_data) { + log_error("length error - got %d, wanted %d", + l, len_incl_data); + return -EINVAL; + } + + return len_incl_data; } static void process_dead(int ci) { struct tcp_conn *conn, *safe; list_for_each_entry_safe(conn, safe, &tcp, list) { if (conn->s == client[ci].fd) { list_del(&conn->list); free(conn); break; } } close(client[ci].fd); client[ci].workfn = NULL; client[ci].fd = -1; pollfd[ci].fd = -1; } static void process_tcp_listener(int ci) { int fd, i, one = 1; socklen_t addrlen = sizeof(struct sockaddr); struct sockaddr addr; struct tcp_conn *conn; fd = accept(client[ci].fd, &addr, &addrlen); if (fd < 0) { log_error("process_tcp_listener: accept error %d %d", fd, errno); return; } setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one)); conn = malloc(sizeof(struct tcp_conn)); if (!conn) { log_error("failed to alloc mem"); return; } memset(conn, 0, sizeof(struct tcp_conn)); conn->s = fd; memcpy(&conn->to, &addr, sizeof(struct sockaddr)); list_add_tail(&conn->list, &tcp); i = client_add(fd, process_connection, process_dead); log_debug("client connection %d fd %d", i, fd); } static int setup_tcp_listener(void) { int s, rv; s = socket(local->family, SOCK_STREAM, 0); if (s == -1) { log_error("failed to create tcp socket %s", strerror(errno)); return s; } rv = bind(s, &local->sa6, local->saddrlen); if (rv == -1) { log_error("failed to bind socket %s", strerror(errno)); return rv; } rv = listen(s, 5); if (rv == -1) { log_error("failed to listen on socket %s", strerror(errno)); return rv; } return s; } static int booth_tcp_init(void * unused __attribute__((unused))) { int rv; if (booth_get_myid() < 0) return -1; rv = setup_tcp_listener(); if (rv < 0) return rv; client_add(rv, process_tcp_listener, NULL); return 0; } static int connect_nonb(int sockfd, const struct sockaddr *saptr, socklen_t salen, int sec) { int flags, n, error; socklen_t len; fd_set rset, wset; struct timeval tval; flags = fcntl(sockfd, F_GETFL, 0); fcntl(sockfd, F_SETFL, flags | O_NONBLOCK); error = 0; if ( (n = connect(sockfd, saptr, salen)) < 0) if (errno != EINPROGRESS) return -1; if (n == 0) goto done; /* connect completed immediately */ FD_ZERO(&rset); FD_SET(sockfd, &rset); wset = rset; tval.tv_sec = sec; tval.tv_usec = 0; if ((n = select(sockfd + 1, &rset, &wset, NULL, sec ? &tval : NULL)) == 0) { /* leave outside function to close */ /* timeout */ /* close(sockfd); */ errno = ETIMEDOUT; return -1; } if (FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) { len = sizeof(error); if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) return -1; /* Solaris pending error */ } else { log_error("select error: sockfd not set"); return -1; } done: fcntl(sockfd, F_SETFL, flags); /* restore file status flags */ if (error) { /* leave outside function to close */ /* close(sockfd); */ errno = error; return -1; } return 0; } int booth_tcp_open(struct booth_node *to) { int s, rv; - if (to->tcp_fd >= 0) + if (to->tcp_fd >= STDERR_FILENO) goto found; s = socket(to->family, SOCK_STREAM, 0); if (s == -1) { log_error("cannot create socket of family %d", to->family); return -1; -} + } rv = connect_nonb(s, (struct sockaddr *)&to->sa6, to->saddrlen, 10); if (rv == -1) { if( errno == ETIMEDOUT) log_error("connection to %s timeout", to->addr_string); else log_error("connection to %s error %s", to->addr_string, strerror(errno)); goto error; } to->tcp_fd = s; found: return 1; error: if (s >= 0) close(s); return -1; } int booth_tcp_send(struct booth_node *to, void *buf, int len) { return do_write(to->tcp_fd, buf, len); } static int booth_tcp_recv(struct booth_node *from, void *buf, int len) { - return do_read(from->tcp_fd, buf, len); + int got; + /* Needs timeouts! */ + got = do_read(from->tcp_fd, buf, len); + if (got < 0) + return got; + if (got != len) + return -EINVAL; + return len; } static int booth_tcp_close(struct booth_node *to) { - if (to && to->tcp_fd >= 0) { - close(to->tcp_fd); + if (to) { + if (to->tcp_fd > STDERR_FILENO) + close(to->tcp_fd); to->tcp_fd = -1; } return 0; } static int booth_tcp_exit(void) { return 0; } int setup_udp_server(int try_only) { int rv; unsigned int recvbuf_size; udp.s = socket(local->family, SOCK_DGRAM, 0); if (udp.s == -1) { log_error("failed to create udp socket %s", strerror(errno)); return -1; } rv = fcntl(udp.s, F_SETFL, O_NONBLOCK); if (rv == -1) { log_error("failed to set non-blocking operation " "on udp socket: %s", strerror(errno)); close(udp.s); return -1; } rv = bind(udp.s, (struct sockaddr *)&local->sa6, local->saddrlen); if (try_only) { rv = (rv == -1) ? errno : 0; close(udp.s); return rv; } if (rv == -1) { log_error("failed to bind socket %s", strerror(errno)); close(udp.s); return -1; } recvbuf_size = SOCKET_BUFFER_SIZE; rv = setsockopt(udp.s, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, sizeof(recvbuf_size)); if (rv == -1) { log_error("failed to set recvbuf size"); close(udp.s); return -1; } return udp.s; } static void process_recv(int ci) { struct msghdr msg_recv; struct sockaddr_storage system_from; int received; unsigned char *msg_offset; + /* TODO: allocate on stack? */ msg_recv.msg_name = &system_from; msg_recv.msg_namelen = sizeof (struct sockaddr_storage); msg_recv.msg_iov = &udp.iov_recv; msg_recv.msg_iovlen = 1; msg_recv.msg_control = 0; msg_recv.msg_controllen = 0; msg_recv.msg_flags = 0; received = recvmsg(client[ci].fd, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT); if (received == -1) return; msg_offset = udp.iov_recv.iov_base; deliver_fn(msg_offset, received); } static int booth_udp_init(void *f) { memset(&udp, 0, sizeof(struct udp_context)); udp.iov_recv.iov_base = udp.iov_buffer; udp.iov_recv.iov_len = FRAME_SIZE_MAX; udp.s = setup_udp_server(0); if (udp.s == -1) return -1; deliver_fn = f; client_add(udp.s, process_recv, NULL); return 0; } static int booth_udp_send(struct booth_node *to, void *buf, int len) { struct msghdr msg; struct iovec iovec; unsigned int iov_len; int rv; iovec.iov_base = (void *)buf; iovec.iov_len = len; iov_len = 1; msg.msg_name = &to->sa6; msg.msg_namelen = to->addrlen; msg.msg_iov = (void *)&iovec; msg.msg_iovlen = iov_len; msg.msg_control = 0; msg.msg_controllen = 0; msg.msg_flags = 0; rv = sendmsg(udp.s, &msg, MSG_NOSIGNAL); if (rv < 0) return rv; return 0; } static int booth_udp_broadcast(void *buf, int len) { int i; if (!booth_conf || !booth_conf->node_count) return -1; for (i = 0; i < booth_conf->node_count; i++) booth_udp_send(booth_conf->node+i, buf, len); return 0; } static int booth_udp_exit(void) { return 0; } /* SCTP transport layer has not been developed yet */ static int booth_sctp_init(void *f __attribute__((unused))) { return 0; } static int booth_sctp_send(struct booth_node * to __attribute__((unused)), void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int booth_sctp_broadcast(void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int booth_sctp_exit(void) { return 0; } const struct booth_transport booth_transport[TRANSPORT_ENTRIES] = { [TCP] = { .name = "TCP", .init = booth_tcp_init, .get_myid = booth_get_myid, .open = booth_tcp_open, .send = booth_tcp_send, .recv = booth_tcp_recv, .close = booth_tcp_close, .exit = booth_tcp_exit }, [UDP] = { .name = "UDP", .init = booth_udp_init, .get_myid = booth_get_myid, .send = booth_udp_send, .broadcast = booth_udp_broadcast, .exit = booth_udp_exit }, [SCTP] = { .name = "SCTP", .init = booth_sctp_init, .get_myid = booth_get_myid, .send = booth_sctp_send, .broadcast = booth_sctp_broadcast, .exit = booth_sctp_exit } }; const struct booth_transport *local_transport = booth_transport+TCP; diff --git a/src/transport.h b/src/transport.h index f14aacf..1319568 100644 --- a/src/transport.h +++ b/src/transport.h @@ -1,88 +1,72 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _TRANSPORT_H #define _TRANSPORT_H #include "booth.h" -struct booth_node { - int nodeid; - int type; - int local; - - char addr_string[BOOTH_NAME_LEN]; - - int tcp_fd; - - unsigned short family; - union { - struct sockaddr_in sa4; - struct sockaddr_in6 sa6; - }; - int saddrlen; - int addrlen; -} __attribute__((packed)); - typedef enum { - TCP = 0, - UDP = 1, - SCTP = 2, - TRANSPORT_ENTRIES = 3, + TCP = 1, + UDP, + SCTP, + TRANSPORT_ENTRIES, } transport_layer_t; typedef enum { - ARBITRATOR = 1, + ARBITRATOR = 0x50, SITE, CLIENT, DAEMON, STATUS, } action_t; struct booth_transport { const char *name; int (*init) (void *); int (*get_myid) (void); int (*open) (struct booth_node *); int (*send) (struct booth_node *, void *, int); int (*recv) (struct booth_node *, void *, int); int (*broadcast) (void *, int); int (*close) (struct booth_node *); int (*exit) (void); }; const struct booth_transport booth_transport[TRANSPORT_ENTRIES]; int find_myself(struct booth_node **me, int fuzzy_allowed); +int check_boothc_header(struct boothc_header *data, int len_incl_data); + int setup_udp_server(int try_only); int booth_tcp_open(struct booth_node *to); int booth_tcp_send(struct booth_node *to, void *buf, int len); inline static void * node_to_addr_pointer(struct booth_node *node) { switch (node->family) { case AF_INET: return &node->sa4.sin_addr; case AF_INET6: return &node->sa6.sin6_addr; } return NULL; } extern const struct booth_transport *local_transport; -extern struct booth_node *local; + #endif /* _TRANSPORT_H */ diff --git a/tools/booth_resource_monitord.c b/tools/booth_resource_monitord.c index 3120740..0a12db9 100644 --- a/tools/booth_resource_monitord.c +++ b/tools/booth_resource_monitord.c @@ -1,2042 +1,2042 @@ /* ------------------------------------------------------------------------- * booth_resource_monitord --- The monitoring of the resources which depended on the ticket. * This program watches the resource that depended on the ticket. * When abnormality occurs in a resource, move a ticket to other sites using booth. * * Copyright (c) 2012 NIPPON TELEGRAPH AND TELEPHONE CORPORATION * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * ------------------------------------------------------------------------- */ #include "b_config.h" #include #include #include #include #include #include #include #ifdef HAVE_GETOPT_H # include #endif /* booth find myself */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "booth_resource_monitord.h" GMainLoop *mainloop; char *booth_config_file; char *pid_file; int max_failures = 30; GHashTable *booth_tickets; GHashTable *tickets; GHashTable *tmp_tickets; GList *sites; GList *exclude_tickets; cib_t *cib; crm_ipc_t *crmd_channel; char *booth_resource_monitord_uuid; int crmd_message_timer_id = -1; int revoke_check_timeout = 5; gboolean do_crmd_query = FALSE; gboolean need_shutdown = FALSE; void clean_up(int rc) { crm_debug("Clean up to %s.", crm_system_name); if (cib != NULL) { crm_info("Clean up to CIB session."); cib->cmds->signoff(cib); cib_delete(cib); cib = NULL; } if (booth_config_file != NULL) { crm_trace("free() booth_config_file."); free(booth_config_file); booth_config_file = NULL; } if (pid_file != NULL) { crm_trace("free() pid_file."); free(pid_file); pid_file = NULL; } if (rc > 0) { crm_exit(rc); } } void free_ticket(gpointer data) { GListPtr gIter = NULL; ticket_info_t *ticket = (ticket_info_t *) data; crm_debug("Free ticket name[%s]", ticket->name); free(ticket->name); for (gIter = ticket->resources; gIter != NULL; gIter = gIter->next) { resource_info_t *resource = (resource_info_t *) gIter->data; if (resource->id != NULL) { free(resource->id); } free(resource); } g_list_free(ticket->resources); free(ticket); return; } void shutdown_called(int nsig) { need_shutdown = TRUE; crm_info("Shutdown was called. signal[%d]", nsig); if (mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); } else { clean_up(EX_OK); crm_exit(0); } return; } void print_ticket_summary(gpointer key, gpointer value, gpointer user_data) { GListPtr gIter = NULL; ticket_info_t *ticket = (ticket_info_t *) value; crm_debug ("Ticket name[%s] monitored[%s] grant[%s] standby[%s] expected[%d].", ticket->name, ticket->monitored ? "TRUE" : "FALSE", ticket->granted ? "granted" : "revoked", ticket->standby ? "TRUE" : "FALSE", ticket->expected_count); for (gIter = ticket->resources; gIter != NULL; gIter = gIter->next) { resource_info_t *rsc = (resource_info_t *) gIter->data; crm_debug("resource[%s] target-role[%s]", rsc->id, role2text(rsc->target_role)); } return; } void unpack_cluster_status(pe_working_set_t *data_set) { xmlNode *current_cib = NULL; crm_trace("Unpack cluster status."); qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE); set_working_set_defaults(data_set); current_cib = get_cib_copy(cib); data_set->input = copy_xml(current_cib); cluster_status(data_set); free_xml(current_cib); qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_TRUE); return; } resource_t *find_resource_from_list(const char *search_rsc_id, GListPtr list) { GListPtr gIter = NULL; resource_t *faund_rsc = NULL; crm_trace("Find rsc[%s]", search_rsc_id); for (gIter = list; gIter != NULL; gIter = gIter->next) { resource_t *rsc = (resource_t *) gIter->data; crm_trace("Resource id[%s].", rsc->id); if (g_list_length(rsc->children) > 0) { crm_trace("Resource id[%s] have a children.", rsc->id); faund_rsc = find_resource_from_list(search_rsc_id, rsc->children); } if (safe_str_eq(search_rsc_id, rsc->id)) { faund_rsc = rsc; } if (faund_rsc != NULL) { crm_trace("Faund resource id[%s].", faund_rsc->id); break; } } return faund_rsc; } void print_info_summary(int nsig) { GListPtr gIter = NULL; for (gIter = sites; gIter != NULL; gIter = gIter->next) { site_info_t *site = (site_info_t *) gIter->data; crm_debug("Site address[%s] local[%s].", site->addr, site->local ? "TRUE" : "FALSE"); } g_hash_table_foreach(tickets, print_ticket_summary, NULL); return; } int grant_ticket(ticket_info_t *ticket) { FILE *p; int rc; char cmd[COMMAND_MAX]; char new_owner_ip[IPADDR_LEN]; site_info_t *site = NULL; exclude_ticket_info_t *exclude_ticket = NULL; GListPtr gIter = NULL; GListPtr gIter2 = NULL; memset(new_owner_ip, 0, IPADDR_LEN); /* Set the IP of the failover destination */ for (gIter = sites; gIter != NULL; gIter = gIter->next) { gboolean exclude_flag = FALSE; site = (site_info_t *) gIter->data; if (site->local) continue; for (gIter2 = exclude_tickets; gIter2 != NULL; gIter2 = gIter2->next) { exclude_ticket = (exclude_ticket_info_t *) gIter2->data; if (safe_str_eq(ticket->name, exclude_ticket->ticket) && safe_str_eq(site->addr, exclude_ticket->site)) exclude_flag = TRUE; } if (exclude_flag) { crm_debug("Site address[%s] is exclude site", site->addr); continue; } crm_debug ("The site[%s] was chosen as the movement place of a ticket.", site->addr); strcpy(new_owner_ip, site->addr); break; } if (strlen(new_owner_ip) == 0) { crm_err("Failed to select the destination of the ticket."); return -1; } /* used site is turned back */ sites = g_list_remove(sites, site); sites = g_list_append(sites, site); snprintf(cmd, COMMAND_MAX, "booth client grant -t %s -s %s", ticket->name, new_owner_ip); crm_info("Command: '%s' was executed", cmd); p = popen(cmd, "r"); if (p == NULL) { crm_perror(LOG_ERR, "popen() call failed"); return -1; } rc = pclose(p); if (rc == -1) { crm_perror(LOG_ERR, "pclose() call failed"); return -1; } else if (rc > 0) { crm_err("Failed to execute booth command. exit code %d", WEXITSTATUS(rc)); return -1; } crm_info("Ticket[%s] was granted to %s.", ticket->name, new_owner_ip); return 0; } int revoke_ticket(ticket_info_t *ticket) { FILE *p; int rc; char cmd[COMMAND_MAX]; char owner_ip[IPADDR_LEN]; GListPtr gIter = NULL; memset(owner_ip, 0, IPADDR_LEN); /* use own site ip */ for (gIter = sites; gIter != NULL; gIter = gIter->next) { site_info_t *site = (site_info_t *) gIter->data; crm_trace("site address[%s].", site->addr); if (site->local) { crm_info("%s is own site address.", site->addr); strcpy(owner_ip, site->addr); break; } } if (strlen(owner_ip) == 0) { crm_err("Failed to pick the holder of the ticket."); return -1; } snprintf(cmd, COMMAND_MAX, "booth client revoke -t %s -s %s", ticket->name, owner_ip); crm_info("Command: '%s' was executed", cmd); p = popen(cmd, "r"); if (p == NULL) { crm_perror(LOG_ERR, "popen() call failed"); return -1; } rc = pclose(p); if (rc == -1) { crm_perror(LOG_ERR, "pclose() call failed"); return -1; } else if (rc > 0) { crm_err("Failed to execute booth command. exit code %d", WEXITSTATUS(rc)); return -1; } crm_info("Ticket[%s] was revoked by %s.", ticket->name, owner_ip); return 0; } void update_tickets_info(gpointer key, gpointer value, gpointer user_data) { ticket_info_t *manage_ticket = (ticket_info_t *) value; ticket_t *cluster_ticket = NULL; pe_working_set_t *data_set = NULL; if (user_data == NULL) { crm_err("Failed to unpack cluster status."); return; } data_set = (pe_working_set_t *) user_data; cluster_ticket = g_hash_table_lookup(data_set->tickets, manage_ticket->name); if (cluster_ticket == NULL) { crm_info("State of the ticket[%s] is not yet in the cluster.", manage_ticket->name); return; } manage_ticket->granted = cluster_ticket->granted; manage_ticket->standby = cluster_ticket->standby; crm_trace ("Ticket name[%s] monitored[%s] grant[%s] standby[%s] expected[%d].", manage_ticket->name, manage_ticket->monitored ? "TRUE" : "FALSE", manage_ticket->granted ? "granted" : "revoked", manage_ticket->standby ? "TRUE" : "FALSE", manage_ticket->expected_count); return; } void failover_ticket(gpointer key, gpointer value, gpointer user_data) { int rc, i; gboolean revoke_succeed = FALSE; pe_working_set_t data_set; ticket_info_t *manage_ticket = (ticket_info_t *) value; ticket_t *cluster_ticket = NULL; if (manage_ticket->failover != TRUE) { crm_trace("Ticket[%s] does not have to failover it.", manage_ticket->name); return; } crm_info("Failover ticket[%s].", manage_ticket->name); rc = revoke_ticket(manage_ticket); if (rc != 0) { crm_err("Failed in revoke of ticket[%s].", manage_ticket->name); manage_ticket->failover = FALSE; return; } /* check the completion of the revoke */ for (i = 0; i <= revoke_check_timeout; i++) { const char *owner = NULL; const char *expires = NULL; unpack_cluster_status(&data_set); cluster_ticket = g_hash_table_lookup(data_set.tickets, manage_ticket->name); if (cluster_ticket == NULL) { crm_err("Failed to get information for the ticket[%s], " "can not confirm the success of the revoke.", manage_ticket->name); cleanup_calculations(&data_set); manage_ticket->failover = FALSE; return; } owner = g_hash_table_lookup(cluster_ticket->state, "owner"); expires = g_hash_table_lookup(cluster_ticket->state, "expires"); crm_debug("ticket[%s] granted=%s owner=%s expires=%s", cluster_ticket->id, cluster_ticket->granted ? "true" : "false", owner, expires); if (cluster_ticket->granted == FALSE && - safe_str_eq(owner, "-1") && safe_str_eq(expires, "0")) { + safe_str_eq(owner, "NO_OWNER") && safe_str_eq(expires, "0")) { revoke_succeed = TRUE; cleanup_calculations(&data_set); break; } cleanup_calculations(&data_set); sleep(1); } if (revoke_succeed == FALSE) { crm_err("Failed in revoke of ticket[%s]. Reason: Timeout.", manage_ticket->name); manage_ticket->failover = FALSE; return; } rc = grant_ticket(manage_ticket); if (rc != 0) { crm_err("Failed in grant of ticket[%s].", manage_ticket->name); manage_ticket->failover = FALSE; return; } crm_info("Ticket[%s] failover succeeded.", manage_ticket->name); manage_ticket->failover = FALSE; return; } int check_ticket_condition(ticket_info_t *manage_ticket) { crm_trace ("Ticket name[%s] monitored[%s] grant[%s] standby[%s] expected[%d].", manage_ticket->name, manage_ticket->monitored ? "TRUE" : "FALSE", manage_ticket->granted ? "granted" : "revoked", manage_ticket->standby ? "TRUE" : "FALSE", manage_ticket->expected_count); /* The state of a ticket checks in "revoke" or "standby" */ if (manage_ticket->granted == FALSE || manage_ticket->standby == TRUE) { crm_debug("Ticket[%s] is revoked or standby.", manage_ticket->name); return 1; } return 0; } int check_resource_role(resource_t *cluster_resource, resource_info_t *manage_resource) { gboolean flag; enum rsc_role_e rsc_target_role; crm_trace("Cluster resource id[%s] is role[%s].", cluster_resource->id, role2text(cluster_resource->role)); /* If the role of the resource is specified by the user */ flag = get_target_role(cluster_resource, &rsc_target_role); if (flag && manage_resource->target_role != rsc_target_role) { crm_trace("Cluster resource id[%s] target-role[%s].", cluster_resource->id, role2text(rsc_target_role)); return -1; } /* When role of the resource becomes prospective role */ if (cluster_resource->role == manage_resource->target_role) { crm_trace("Role[%s] of the resource[%s] is expected role.", role2text(cluster_resource->role), cluster_resource->id); return 1; } crm_trace("Role[%s] of the resource[%s] is not expected role[%s].", role2text(cluster_resource->role), cluster_resource->id, role2text(manage_resource->target_role)); /* When role of the resource does not become prospective role */ return 0; } void check_ticket_failover_need(gpointer key, gpointer value, gpointer user_data) { int rc; int count = 0; GListPtr gIter = NULL; ticket_info_t *manage_ticket = (ticket_info_t *) value; GListPtr cluster_resource_list = (GListPtr) user_data; /* Determine whether there is a need for failover */ rc = check_ticket_condition(manage_ticket); if (rc != 0) { crm_info("Ticket name[%s] is not a condition to be monitored.", manage_ticket->name); manage_ticket->monitored = FALSE; manage_ticket->failover = FALSE; return; } for (gIter = manage_ticket->resources; gIter != NULL; gIter = gIter->next) { resource_info_t *manage_resource = (resource_info_t *) gIter->data; resource_t *cluster_resource = NULL; crm_trace("Ticket[%s] find resource[%s].", manage_ticket->name, manage_resource->id); cluster_resource = find_resource_from_list(manage_resource->id, cluster_resource_list); if (cluster_resource == NULL) { crm_err("Resource[%s] is not found in the cluster." " This resource is ignored.", manage_resource->id); continue; } /* check role of the resource */ rc = check_resource_role(cluster_resource, manage_resource); if (rc == 1) { crm_debug("Role of resources[%s] is expected role.", cluster_resource->id); count = count + 1; } else if (rc == -1) { crm_warn("Role of resources[%s] was changed explicitly." " Stop the monitoring of the ticket[%s].", cluster_resource->id, manage_ticket->name); manage_ticket->monitored = FALSE; return; } else { crm_warn("Role of resources[%s] is not expected role.", cluster_resource->id); } } crm_trace("expected count[%d] vs real count[%d].", manage_ticket->expected_count, count); if (manage_ticket->expected_count == count) { crm_info ("All the resources depending on ticket name[%s] started or promoted.", manage_ticket->name); manage_ticket->monitored = TRUE; } else if (manage_ticket->monitored && manage_ticket->expected_count != count) { crm_info("Ticket name[%s] is required for failover.", manage_ticket->name); manage_ticket->monitored = FALSE; manage_ticket->failover = TRUE; } return; } void start_resource_monitor(void) { pe_working_set_t data_set; crm_trace("Start resource monitor."); unpack_cluster_status(&data_set); /* update a ticket in the latest CIB information */ g_hash_table_foreach(tickets, update_tickets_info, &data_set); /* determine whether failover of the ticket is necessary */ g_hash_table_foreach(tickets, check_ticket_failover_need, data_set.resources); /* failover of a ticket is performed */ g_hash_table_foreach(tickets, failover_ticket, NULL); print_info_summary(0); cleanup_calculations(&data_set); crm_trace("End resource monitor."); return; } gboolean docHasTag(xmlNode *root, const char *tag) { xmlNode *child = NULL; crm_trace("Find tag[%s]", tag); for (child = __xml_first_child(root); child != NULL; child = __xml_next(child)) { if (safe_str_eq((const char *)child->name, tag)) { crm_trace("Faund tag[%s]", (const char *)child->name); return TRUE; } if (child->children) { crm_trace("xmlNode[%s] has children", (const char *)child->name); if (docHasTag(child, tag)) { return TRUE; } } } return FALSE; } int search_xml_children(GListPtr *children, xmlNode *root, const char *tag, const char *field, const char *value, gboolean search_matches) { int match_found = 0; CRM_CHECK(root != NULL, return FALSE); CRM_CHECK(children != NULL, return FALSE); if (tag != NULL && safe_str_neq(tag, crm_element_name(root))) { } else if (value != NULL && safe_str_neq(value, crm_element_value(root, field))) { } else { *children = g_list_append(*children, root); match_found = 1; } if (search_matches || match_found == 0) { xmlNode *child = NULL; for (child = __xml_first_child(root); child; child = __xml_next(child)) { match_found += search_xml_children(children, child, tag, field, value, search_matches); } } return match_found; } void register_monitor_resource(ticket_info_t *ticket, resource_t *resource, enum rsc_role_e target_role) { resource_info_t *manage_resource = NULL; if (resource->variant != pe_native) { crm_debug ("Resource id[%s] type is not primitive, does not register.", resource->id); return; } if (is_set(resource->flags, pe_rsc_orphan)) { crm_notice("Resource id[%s] is ORPHAN, does not register.", resource->id); return; } manage_resource = calloc(1, sizeof(resource_info_t)); memset(manage_resource, 0, sizeof(resource_info_t)); manage_resource->id = strdup(resource->id); manage_resource->target_role = target_role; manage_resource->variant = resource->variant; crm_info ("Registered ticket[%s] monitor resource[%s] set target role [%s].", ticket->name, manage_resource->id, role2text(target_role)); ticket->resources = g_list_append(ticket->resources, manage_resource); return; } void register_monitor_resource_children(ticket_info_t *ticket, GListPtr list, enum rsc_role_e target_role) { GListPtr gIter = NULL; for (gIter = list; gIter != NULL; gIter = gIter->next) { resource_t *rsc = (resource_t *) gIter->data; if (g_list_length(rsc->children) > 0) { crm_trace("Resource id[%s] have a children.", rsc->id); register_monitor_resource_children(ticket, rsc->children, target_role); } register_monitor_resource(ticket, rsc, target_role); } return; } void create_ticket_info(xmlNode *rsc_ticket, gboolean update, pe_working_set_t *data_set) { ticket_info_t *ticket = NULL; const char *id = NULL; const char *ticket_name = NULL; const char *ticket_target_role = NULL; enum rsc_role_e target_role; int child_count = 1; resource_t *child = NULL; id = crm_element_value(rsc_ticket, XML_ATTR_ID); ticket_name = crm_element_value(rsc_ticket, XML_TICKET_ATTR_TICKET); crm_debug("rsc_ticket id[%s] name[%s]", id, ticket_name); /* check whether it is a ticket managed in booth */ if (g_hash_table_lookup(booth_tickets, ticket_name) == NULL) { crm_info("Ticket name[%s] is not managed in booth.", ticket_name); return; } crm_trace("Ticket name[%s] is managed in booth.", ticket_name); ticket = g_hash_table_lookup(tmp_tickets, ticket_name); if (ticket == NULL) { ticket = calloc(1, sizeof(ticket_info_t)); memset(ticket, 0, sizeof(ticket_info_t)); ticket->name = strdup(ticket_name); ticket->need_delete = FALSE; g_hash_table_insert(tmp_tickets, ticket->name, ticket); } /* acquire the value of the rsc-role attribute of rsc_ticket */ ticket_target_role = crm_element_value(rsc_ticket, XML_COLOC_ATTR_SOURCE_ROLE); if (ticket_target_role == NULL) { target_role = RSC_ROLE_STARTED; } else { target_role = text2role(ticket_target_role); } /* When rsc_ticket has rsc attribute */ if (xmlHasProp(rsc_ticket, (const xmlChar*)"rsc")) { resource_t *cluster_resource = NULL; const char *rsc_id = NULL; const char *clone_max = NULL; const char *master_max = NULL; rsc_id = crm_element_value(rsc_ticket, "rsc"); cluster_resource = find_resource_from_list(rsc_id, data_set->resources); if (cluster_resource == NULL) { crm_err ("Resource id[%s] to depend on the ticket was not found.", rsc_id); return; } /* The number of resources is registered into management information */ switch (cluster_resource->variant) { case pe_native: crm_trace("Resource[%s] is native.", cluster_resource->id); register_monitor_resource(ticket, cluster_resource, target_role); ticket->expected_count = ticket->expected_count + 1; break; case pe_group: crm_trace("Resource[%s] is group.", cluster_resource->id); register_monitor_resource_children(ticket, cluster_resource->children, target_role); ticket->expected_count = ticket->expected_count + g_list_length(cluster_resource->children); break; case pe_clone: crm_trace("Resource[%s] is clone.", cluster_resource->id); register_monitor_resource_children(ticket, cluster_resource->children, target_role); clone_max = g_hash_table_lookup(cluster_resource->meta, XML_RSC_ATTR_INCARNATION_MAX); child = (resource_t *) g_list_nth_data(cluster_resource->children, 0); if (child != NULL && child->variant == pe_group) { child_count = g_list_length(child->children); } if (clone_max != NULL) { crm_trace("Clone resource[%s] clone_max[%s].", cluster_resource->id, clone_max); ticket->expected_count = ticket->expected_count + crm_parse_int(clone_max, NULL) * child_count; } else { /* When there is no setup of clone_max, the number of nodes is set up */ crm_trace("Clone resource[%s] node num[%d].", cluster_resource->id, g_list_length(data_set->nodes)); ticket->expected_count = ticket->expected_count + g_list_length(data_set->nodes) * child_count; } break; case pe_master: crm_trace("Resource[%s] is master.", cluster_resource->id); register_monitor_resource_children(ticket, cluster_resource->children, target_role); child = (resource_t *) g_list_nth_data(cluster_resource->children, 0); if (child != NULL && child->variant == pe_group) { child_count = g_list_length(child->children); } if (target_role == RSC_ROLE_STARTED) { clone_max = g_hash_table_lookup(cluster_resource->meta, XML_RSC_ATTR_INCARNATION_MAX); if (clone_max != NULL) { crm_trace ("Clone resource[%s] clone_max[%s].", cluster_resource->id, clone_max); ticket->expected_count = ticket->expected_count + crm_parse_int(clone_max, NULL) * child_count; } else { /* When there is no setup of clone_max, the number of nodes is set up */ crm_trace ("Clone resource[%s] node num[%d].", cluster_resource->id, g_list_length(data_set->nodes)); ticket->expected_count = ticket->expected_count + g_list_length(data_set->nodes) * child_count; } } else { master_max = g_hash_table_lookup(cluster_resource->meta, XML_RSC_ATTR_MASTER_MAX); if (master_max != NULL) { crm_trace ("Master resource[%s] master_max[%s].", cluster_resource->id, master_max); ticket->expected_count = ticket->expected_count + crm_parse_int(master_max, NULL) * child_count; } else { /* When there is no setup of master_max, 1 is set up as a default value */ crm_trace ("Master resource[%s] master_max is 1.", cluster_resource->id); ticket->expected_count = ticket->expected_count + 1 * child_count; } } break; default: crm_warn("Unknown type resource[%s].", cluster_resource->id); break; } } else { /* TODO: At a present stage, it does not correspond to the notation of resource_set */ crm_warn("rsc_ticket(id=%s) is notation which does not support." " Ignore this rsc_ticket constraint.", id); return; } crm_debug("Ticket name[%s] expected count[%d].", ticket->name, ticket->expected_count); return; } ticket_info_t *copy_ticket_info(ticket_info_t *copy_ticket) { GListPtr gIter = NULL; ticket_info_t *ticket = NULL; ticket = calloc(1, sizeof(ticket_info_t)); ticket = memcpy(ticket, copy_ticket, sizeof(ticket_info_t)); ticket->name = strdup(copy_ticket->name); ticket->resources = NULL; for (gIter = copy_ticket->resources; gIter != NULL; gIter = gIter->next) { resource_info_t *copy_resource = (resource_info_t *) gIter->data; resource_info_t *resource = NULL; resource = calloc(1, sizeof(resource_info_t)); resource = memcpy(resource, copy_resource, sizeof(resource_info_t)); resource->id = strdup(copy_resource->id); crm_debug("Copy resource [%s]", resource->id); ticket->resources = g_list_append(ticket->resources, resource); } return ticket; } int compare_resource(GListPtr rsc_list, resource_info_t *rsc) { GListPtr gIter = NULL; crm_trace("Compare resource id[%s]", rsc->id); for (gIter = rsc_list; gIter != NULL; gIter = gIter->next) { resource_info_t *register_rsc = (resource_info_t *) gIter->data; crm_trace("Compare resource id[%s]", register_rsc->id); if (safe_str_eq(rsc->id, register_rsc->id)) { crm_debug("Match resource id[%s]", register_rsc->id); if (rsc->target_role == register_rsc->target_role) { crm_debug ("Role of resource id[%s] not changed.", rsc->id); return 0; } } } return 1; } int compare_ticket_info(ticket_info_t *tmp_ticket, ticket_info_t *register_ticket) { int rc; GListPtr gIter = NULL; crm_trace("Compare ticket name[%s]", tmp_ticket->name); if (g_list_length(tmp_ticket->resources) != g_list_length(register_ticket->resources)) { crm_debug ("The number of the resources to watch of ticket name[%s] changed.", tmp_ticket->name); return 1; } for (gIter = tmp_ticket->resources; gIter != NULL; gIter = gIter->next) { resource_info_t *rsc = (resource_info_t *) gIter->data; rc = compare_resource(register_ticket->resources, rsc); if (rc != 0) { crm_debug("Resource of ticket name[%s] changed.", tmp_ticket->name); return 1; } } return 0; } void register_tickets(gpointer key, gpointer value, gpointer user_data) { int rc; gboolean complete; ticket_info_t *tmp_ticket = (ticket_info_t *) value; ticket_info_t *register_ticket = NULL; crm_debug("Register ticket name[%s]", tmp_ticket->name); register_ticket = g_hash_table_lookup(tickets, tmp_ticket->name); if (register_ticket == NULL) { register_ticket = copy_ticket_info(tmp_ticket); crm_info("Register new information ticket name[%s].", register_ticket->name); g_hash_table_insert(tickets, register_ticket->name, register_ticket); return; } rc = compare_ticket_info(tmp_ticket, register_ticket); if (rc != 0) { crm_info("Ticket name[%s] was changed.", register_ticket->name); tmp_ticket->monitored = register_ticket->monitored; complete = g_hash_table_remove(tickets, register_ticket->name); if (complete != TRUE) { crm_err("Failed to delete registered ticket"); return; } register_ticket = copy_ticket_info(tmp_ticket); g_hash_table_insert(tickets, register_ticket->name, register_ticket); } else { crm_info("Ticket name[%s] was no changed.", register_ticket->name); register_ticket->need_delete = FALSE; } return; } void delete_unnecessary_ticket(gpointer key, gpointer value, gpointer user_data) { gboolean complete; ticket_info_t *ticket = (ticket_info_t *) value; if (ticket->need_delete == FALSE) { ticket->need_delete = TRUE; return; } /* The information on the ticket deleted from cib * information is deleted from management information */ crm_info("Delete unnecessary ticket name[%s].", ticket->name); complete = g_hash_table_remove(tickets, ticket->name); if (complete != TRUE) { crm_err("Failed to delete registered ticket"); } return; } void create_information(void) { int match_num; GListPtr match_list = NULL; GListPtr gIter = NULL; pe_working_set_t data_set; xmlNode *current_cib = NULL; tmp_tickets = g_hash_table_new_full(crm_str_hash, g_str_equal, NULL, free_ticket); if (tickets == NULL) { tickets = g_hash_table_new_full(crm_str_hash, g_str_equal, NULL, free_ticket); } current_cib = get_cib_copy(cib); /* find rsc_ticket from CIB information */ match_num = search_xml_children(&match_list, current_cib, XML_CONS_TAG_RSC_TICKET, NULL, NULL, TRUE); if (match_num == 0) { crm_warn("CIB does not have information of <%s>.", XML_CONS_TAG_RSC_TICKET); goto out; } unpack_cluster_status(&data_set); for (gIter = match_list; gIter != NULL; gIter = gIter->next) { xmlNode *match = NULL; match = (xmlNode *) gIter->data; create_ticket_info(match, FALSE, &data_set); } free_xml(current_cib); cleanup_calculations(&data_set); out: g_hash_table_foreach(tmp_tickets, register_tickets, NULL); g_hash_table_destroy(tmp_tickets); g_hash_table_foreach(tickets, delete_unnecessary_ticket, NULL); return; } int do_dc_health(void) { gboolean rc; const char *sys_to = NULL; const char *crmd_operation = NULL; xmlNode *msg_data = NULL; xmlNode *cmd = NULL; sys_to = CRM_SYSTEM_DC; crmd_operation = CRM_OP_PING; crm_trace("Do dc health."); if (crmd_channel == NULL) { crm_err ("The IPC connection is not valid, cannot send anything"); return 1; } cmd = create_request(crmd_operation, msg_data, NULL, sys_to, crm_system_name, booth_resource_monitord_uuid); /* send it */ crm_trace("Send health check message."); rc = crm_ipc_send(crmd_channel, cmd, 0, 0, NULL); if (rc == FALSE) { crm_err("Failed to send ipc messege to CRMd."); return 1; } free_xml(cmd); return 0; } gboolean do_dc_health_start(gpointer data) { int rc = 0; rc = do_dc_health(); if (rc != 0) { crm_err("Failed in a state inquiry of DC."); clean_up(1); } return FALSE; } void cib_diff_notify(const char *event, xmlNode *msg) { int rc = -1; const char *op = NULL; const char *value = NULL; unsigned int log_level = LOG_INFO; int crmd_transition_delay = 0; pe_working_set_t data_set; xmlNode *diff = NULL; xmlNode *update = get_message_xml(msg, F_CIB_UPDATE); if (msg == NULL) { crm_err("NULL update"); return; } crm_element_value_int(msg, F_CIB_RC, &rc); op = crm_element_value(msg, F_CIB_OPERATION); diff = get_message_xml(msg, F_CIB_UPDATE_RESULT); if (rc < pcmk_ok) { log_level = LOG_WARNING; do_crm_log(log_level, "[%s] %s ABORTED: %s", event, op, pcmk_strerror(rc)); return; } if (diff) { crm_log_xml_trace(diff, "cib_diff"); /* It is checked whether change has been in configuration */ if (docHasTag(diff, XML_CIB_TAG_CONFIGURATION)) { crm_trace("Change configuration."); create_information(); } else { crm_trace("Not change configuration."); } log_cib_diff(LOG_TRACE, diff, op); } if (update != NULL) { crm_log_xml_trace(update, "raw_update"); } unpack_cluster_status(&data_set); value = g_hash_table_lookup(data_set.config_hash, "crmd-transition-delay"); crmd_transition_delay = crm_get_msec(value); if (crmd_transition_delay < 0) { crmd_transition_delay = 0; } cleanup_calculations(&data_set); crm_trace("Set crmd-transition-delay is %d msec", crmd_transition_delay); /* Nothing will be done if it has asked DC */ if (do_crmd_query) { crm_trace("Already queried crmd."); goto out; } /* The state of DC is checked */ crm_trace("Query for dc health."); do_crmd_query = TRUE; g_timeout_add(crmd_transition_delay + 1000, do_dc_health_start, NULL); out: return; } void usage(const char *cmd, int exit_status) { FILE *stream; stream = exit_status ? stderr : stdout; fprintf(stream, "usage: %s [-%s]\n", cmd, OPTARGS); fprintf(stream, " Basic options\n"); fprintf(stream, "\t--%s (-%c) \t\tFile in which to store the process' PID\n" "\t\t\t\t\t\t* Default=%s\n", "pid-file", 'p', PID_FILE); fprintf(stream, "\t--%s (-%c) \t\tAppoint a place with booth.conf.\n" "\t\t\t\t\t\t* Default=%s\n", "booth-config", 'b', BOOTH_CONFIG_FILE); fprintf(stream, "\t--%s (-%c) \t\t\tRun in daemon mode\n", "daemonize", 'D'); fprintf(stream, "\t--%s (-%c) \t\t\t\tRun in verbose mode\n", "verbose", 'V'); fprintf(stream, "\t--%s (-%c) \t\t\t\tThis text\n", "help", 'h'); fflush(stream); clean_up(exit_status); } void cib_connection_destroy(gpointer user_data) { cib_t *conn = user_data; /* Ensure IPC is cleaned up */ conn->cmds->signoff(conn); if (need_shutdown) { crm_info("Connection to the CIB terminated."); } else { crm_err("Connection to the CIB terminated."); clean_up(1); } return; } int cib_connect(void) { int rc = -ENOTCONN; int attempts = 0; cib = cib_new(); while (rc != pcmk_ok && attempts++ < max_failures) { crm_trace("Connecting to CIB. attempt %d", attempts); rc = cib->cmds->signon(cib, crm_system_name, cib_query); if (rc != pcmk_ok) { crm_trace("Waiting signing on to the CIB service\n"); sleep(1); } } if (rc != pcmk_ok) { crm_err("Signon to CIB failed: %s", pcmk_strerror(rc)); return rc; } if (rc == pcmk_ok) { /* set a function called at the time of CIB cutting */ crm_trace("Setting dnotify."); rc = cib->cmds->set_connection_dnotify(cib, cib_connection_destroy); if (rc != pcmk_ok) { crm_err("Failed to setting dnotify: %s", pcmk_strerror(rc)); return rc; } /* set a function called at the time of CIB change */ crm_trace("Setting notify callback."); rc = cib->cmds->add_notify_callback(cib, T_CIB_DIFF_NOTIFY, cib_diff_notify); if (rc != pcmk_ok) { crm_err("Failed to setting notify callback: %s", pcmk_strerror(rc)); return rc; } } return rc; } void parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int len) { while (RTA_OK(rta, len)) { if (rta->rta_type <= max) { tb[rta->rta_type] = rta; } rta = RTA_NEXT(rta, len); } } int search_self_node_ip(site_info_t *site) { int fd, addrlen, found = 0; struct sockaddr_nl nladdr; unsigned char ndaddr[IPADDR_LEN]; unsigned char ipaddr[IPADDR_LEN]; static char rcvbuf[NETLINK_BUFSIZE]; struct { struct nlmsghdr nlh; struct rtgenmsg g; } req; memset(ipaddr, 0, IPADDR_LEN); memset(ndaddr, 0, IPADDR_LEN); if (site->family == AF_INET) { inet_pton(AF_INET, site->addr, ndaddr); addrlen = sizeof(struct in_addr); } else if (site->family == AF_INET6) { inet_pton(AF_INET6, site->addr, ndaddr); addrlen = sizeof(struct in6_addr); } else { crm_err("Invalid INET family"); return 0; } fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE); if (fd < 0) { crm_err("Failed to create netlink socket"); return 0; } setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); memset(&nladdr, 0, sizeof(nladdr)); nladdr.nl_family = AF_NETLINK; memset(&req, 0, sizeof(req)); req.nlh.nlmsg_len = sizeof(req); req.nlh.nlmsg_type = RTM_GETADDR; req.nlh.nlmsg_flags = NLM_F_ROOT | NLM_F_MATCH | NLM_F_REQUEST; req.nlh.nlmsg_pid = 0; req.nlh.nlmsg_seq = 1; req.g.rtgen_family = AF_INET; if (sendto(fd, (void *)&req, sizeof(req), 0, (struct sockaddr *)&nladdr, sizeof(nladdr)) < 0) { close(fd); crm_err("Failed to send data to netlink socket"); return 0; } while (1) { int status; struct nlmsghdr *h; struct iovec iov = { rcvbuf, sizeof(rcvbuf) }; struct msghdr msg = { (void *)&nladdr, sizeof(nladdr), &iov, 1, NULL, 0, 0 }; status = recvmsg(fd, &msg, 0); if (status <= 0) { close(fd); crm_err("Failed to recvmsg from netlink socket"); return 0; } h = (struct nlmsghdr *)rcvbuf; if (h->nlmsg_type == NLMSG_DONE) break; if (h->nlmsg_type == NLMSG_ERROR) { close(fd); crm_err("Netlink socket recvmsg error"); return 0; } while (NLMSG_OK(h, status)) { if (h->nlmsg_type == RTM_NEWADDR) { struct ifaddrmsg *ifa = NLMSG_DATA(h); struct rtattr *tb[IFA_MAX + 1]; int len = h->nlmsg_len - NLMSG_LENGTH(sizeof(*ifa)); memset(tb, 0, sizeof(tb)); parse_rtattr(tb, IFA_MAX, IFA_RTA(ifa), len); memcpy(ipaddr, RTA_DATA(tb[IFA_ADDRESS]), IPADDR_LEN); if (!memcmp(ipaddr, ndaddr, addrlen)) { found = 1; goto out; } } h = NLMSG_NEXT(h, status); } } out: close(fd); return found; } int read_booth_config(void) { char line[1024]; FILE *fp; char *s, *key, *val, *expiry, *weight; int in_quotes, got_equals, got_quotes; int lineno = 0; int rc = 0; int fclose_rc; booth_tickets = g_hash_table_new(crm_str_hash, g_str_equal); fp = fopen(booth_config_file, "r"); if (!fp) { crm_err("Failed to open %s: %s", booth_config_file, strerror(errno)); return -1; } while (fgets(line, sizeof(line), fp)) { lineno++; s = line; while (*s == ' ') s++; if (*s == '#' || *s == '\n') continue; if (*s == '-' || *s == '.' || *s == '/' || *s == '+' || *s == '(' || *s == ')' || *s == ':' || *s == ',' || *s == '@' || *s == '=' || *s == '"') { crm_err("Invalid key name in config file " "('%c', line %d char %ld)", *s, lineno, (long)(s - line)); rc = -1; goto out; } key = s; /* will point to the key on the left hand side */ val = NULL; /* will point to the value on the right hand side */ in_quotes = 0; /* true iff we're inside a double-quoted string */ got_equals = 0; /* true iff we're on the RHS of the = assignment */ got_quotes = 0; /* true iff the RHS is quoted */ while (*s != '\n' && *s != '\0') { if (!(*s >= 'a' && *s <= 'z') && !(*s >= 'A' && *s <= 'Z') && !(*s >= '0' && *s <= '9') && !(*s == '_') && !(*s == '-') && !(*s == '.') && !(*s == '/') && !(*s == ' ') && !(*s == '+') && !(*s == '(') && !(*s == ')') && !(*s == ':') && !(*s == ';') && !(*s == ',') && !(*s == '@') && !(*s == '=') && !(*s == '"')) { crm_err ("Invalid character ('%c', line %d char %ld)" " in config file", *s, lineno, (long)(s - line)); rc = -1; goto out; } if (*s == '=' && !got_equals) { got_equals = 1; *s = '\0'; val = s + 1; } else if ((*s == '=' || *s == '_' || *s == '-' || *s == '.') && got_equals && !in_quotes) { crm_err ("Invalid config file format: unquoted '%c' " "(line %d char %ld)", *s, lineno, (long)(s - line)); rc = -1; goto out; } else if ((*s == '/' || *s == '+' || *s == '(' || *s == ')' || *s == ':' || *s == ',' || *s == '@') && !in_quotes) { crm_err ("Invalid config file format: unquoted '%c' " "(line %d char %ld)", *s, lineno, (long)(s - line)); rc = -1; goto out; } else if ((*s == ' ') && !in_quotes && !got_quotes) { crm_err ("Invalid config file format: unquoted whitespace " "(line %d char %ld)", lineno, (long)(s - line)); rc = -1; goto out; } else if (*s == '"' && !got_equals) { crm_err ("Invalid config file format: unexpected quotes " "(line %d char %ld)", lineno, (long)(s - line)); rc = -1; goto out; } else if (*s == '"' && !in_quotes) { in_quotes = 1; if (val) { val++; got_quotes = 1; } } else if (*s == '"' && in_quotes) { in_quotes = 0; *s = '\0'; } s++; } if (!got_equals) { crm_err ("Invalid config file format: missing '=' (lineno %d)", lineno); rc = -1; goto out; } if (in_quotes) { crm_err ("Invalid config file format: unterminated quotes (lineno %d)", lineno); rc = -1; goto out; } if (!got_quotes) *s = '\0'; if (strlen(key) > BOOTH_NAME_LEN || strlen(val) > BOOTH_NAME_LEN) { crm_err("key/value too long"); rc = -1; goto out; } if (!strcmp(key, "site")) { site_info_t *site = calloc(1, sizeof(site_info_t)); memset(site, 0, sizeof(site_info_t)); strcpy(site->addr, val); site->family = AF_INET; crm_trace("Site address[%s].", site->addr); if (search_self_node_ip(site) == 1) { crm_trace("Site[%s] is local site.", site->addr); site->local = TRUE; } sites = g_list_append(sites, site); } if (!strcmp(key, "ticket")) { char *ticket_name = calloc(1, BOOTH_NAME_LEN); memset(ticket_name, 0, BOOTH_NAME_LEN); expiry = index(val, ';'); weight = rindex(val, ';'); if (!expiry) { crm_trace("Not expire"); strcpy(ticket_name, val); } else if (expiry && expiry == weight) { crm_trace("Expire only"); *expiry++ = '\0'; while (*expiry == ' ') expiry++; strcpy(ticket_name, val); } else { crm_trace("Expire and weight"); *expiry++ = '\0'; *weight++ = '\0'; while (*expiry == ' ') expiry++; while (*weight == ' ') weight++; strcpy(ticket_name, val); } crm_info("Registered booth managed ticket[%s].", ticket_name); g_hash_table_insert(booth_tickets, ticket_name, ticket_name); } if (!strcmp(key, "exclude_ticket")) { exclude_ticket_info_t *exclude_ticket = calloc(1, sizeof (exclude_ticket_info_t)); char *ticket_name = NULL; ticket_name = index(val, ';'); if (ticket_name == NULL) { crm_err("exclude ticket format error. " "there is no ';'"); rc = -1; goto out; } *ticket_name++ = '\0'; exclude_ticket->site = strdup(val); exclude_ticket->ticket = strdup(ticket_name); exclude_tickets = g_list_append(exclude_tickets, exclude_ticket); } } out: fclose_rc = fclose(fp); if (fclose_rc != 0) { crm_perror(LOG_ERR, "fclose() call failed"); rc = -1; } return rc; } gboolean validate_crm_message(xmlNode *msg, const char *sys, const char *uuid, const char *msg_type) { const char *type = NULL; const char *crm_msg_reference = NULL; if (msg == NULL) { return FALSE; } type = crm_element_value(msg, F_CRM_MSG_TYPE); crm_msg_reference = crm_element_value(msg, XML_ATTR_REFERENCE); if (type == NULL) { crm_info("No message type defined."); return FALSE; } else if (msg_type != NULL && strcasecmp(msg_type, type) != 0) { crm_info("Expecting a (%s) message but received a (%s).", msg_type, type); return FALSE; } if (crm_msg_reference == NULL) { crm_info("No message crm_msg_reference defined."); return FALSE; } return TRUE; } int crmd_ipc_msg_callback(const char *buffer, ssize_t length, gpointer user_data) { xmlNode *msg = string2xml(buffer); xmlNode *data = NULL; const char *dc = NULL; const char *state = NULL; const char *result = NULL; g_source_remove(crmd_message_timer_id); crmd_message_timer_id = -1; if (msg == NULL) { crm_info("XML in IPC message was not valid... " "discarding."); } else if (validate_crm_message (msg, crm_system_name, booth_resource_monitord_uuid, XML_ATTR_RESPONSE) == FALSE) { crm_trace("Message was not a CRM response. Discarding."); } else { result = crm_element_value(msg, XML_ATTR_RESULT); if (result == NULL || strcasecmp(result, "ok") == 0) { result = "pass"; } else { result = "fail"; } dc = crm_element_value(msg, F_CRM_HOST_FROM); data = get_message_xml(msg, F_CRM_DATA); state = crm_element_value(data, "crmd_state"); crm_trace("Cluster status of %s@%s: %s (%s).", crm_element_value(data, XML_PING_ATTR_SYSFROM), dc, state, crm_element_value(data, XML_PING_ATTR_STATUS)); if (safe_str_eq(state, "S_IDLE")) { /* Since the state is S_IDLE, the resource for surveillance is checked */ crm_info ("Cluster status is %s: resource monitoring start.", state); start_resource_monitor(); do_crmd_query = FALSE; } else { /* When a state is except S_IDLE, a state is checked again */ crm_trace("State of the DC is not S_IDLE."); crmd_message_timer_id = g_timeout_add(1 * 1000, do_dc_health_start, NULL); } } free_xml(msg); msg = NULL; return 0; } void crmd_ipc_connection_destroy(gpointer user_data) { crm_err("Connection to CRMd was terminated"); if (mainloop) { g_main_quit(mainloop); } else { crm_exit(1); } } struct ipc_client_callbacks crm_callbacks = { .dispatch = crmd_ipc_msg_callback, .destroy = crmd_ipc_connection_destroy }; int crmd_connect(void) { xmlNode *xml = NULL; mainloop_io_t *src = NULL; int attempts = 0; booth_resource_monitord_uuid = calloc(1, 11); if (booth_resource_monitord_uuid == NULL) { crm_err("Failed to allocate memory."); return -1; } snprintf(booth_resource_monitord_uuid, 10, "%d", getpid()); booth_resource_monitord_uuid[10] = '\0'; crm_trace("uuid[%s]", booth_resource_monitord_uuid); while (src == NULL && attempts++ < max_failures) { crm_trace("Connecting to CRMd. attempt %d", attempts); src = mainloop_add_ipc_client(CRM_SYSTEM_CRMD, G_PRIORITY_DEFAULT, 0, NULL, &crm_callbacks); if (src == NULL) { crm_trace("Waiting signing on to the CRMd service."); sleep(1); } } crmd_channel = mainloop_get_ipc_client(src); if (crmd_channel == NULL) { crm_err("Failed in a connection trial with CRMd."); return -1; } xml = create_hello_message(booth_resource_monitord_uuid, crm_system_name, "0", "1"); crm_ipc_send(crmd_channel, xml, 0, 0, NULL); free_xml(xml); crm_debug("Signing on to the CRMd service."); return 0; } int main(int argc, char **argv) { int rc; int argerr = 0; int flag; gboolean daemonize = FALSE; #ifdef HAVE_GETOPT_H int option_index = 0; static struct option long_options[] = { /* Top-level Options */ {"verbose", 0, 0, 'V'}, {"help", 0, 0, 'h'}, {"pid-file", 1, 0, 'p'}, {"booth-config", 1, 0, 'b'}, {"daemonize", 0, 0, 'D'}, {0, 0, 0, 0} }; #endif signal(SIGTERM, shutdown_called); signal(SIGINT, shutdown_called); signal(SIGPIPE, SIG_IGN); booth_config_file = strdup(BOOTH_CONFIG_FILE); pid_file = strdup(PID_FILE); crm_log_init(basename(argv[0]), LOG_INFO, TRUE, FALSE, argc, argv, FALSE); while (1) { #ifdef HAVE_GETOPT_H flag = getopt_long(argc, argv, OPTARGS, long_options, &option_index); #else flag = getopt(argc, argv, OPTARGS); #endif if (flag == -1) break; switch (flag) { case 'V': crm_bump_log_level(argc, argv); break; case 'p': free(pid_file); pid_file = strdup(optarg); break; case 'b': free(booth_config_file); booth_config_file = strdup(optarg); break; case 'D': daemonize = TRUE; break; case 'h': usage(crm_system_name, EX_USAGE); break; default: ++argerr; break; } } if (optind < argc) { printf("non-option ARGV-elements: "); while (optind < argc) { crm_err("%s ", argv[optind]); printf("%s ", argv[optind]); optind++; } printf("\n"); argerr++; } if (argerr > 0) { printf("Options exist that can not be processed.\n"); usage(crm_system_name, EX_USAGE); } crm_make_daemon(crm_system_name, daemonize, pid_file); crm_info("Initializing %s.", crm_system_name); crm_trace("connect to CRMd."); rc = crmd_connect(); if (rc == 0) { crm_info("Succeeded to connect CRMd."); } else { crm_err("Failed to connect CRMd."); clean_up(1); } crm_trace("connect to CIB."); rc = cib_connect(); if (rc == pcmk_ok) { crm_info("Succeeded to connect CIB."); } else { crm_err("Failed to connect CIB."); clean_up(1); } rc = read_booth_config(); if (rc != 0) { crm_err("Failed to reading of %s.", booth_config_file); clean_up(1); } create_information(); start_resource_monitor(); crm_info("Starting %s.", crm_system_name); mainloop = g_main_new(FALSE); mainloop_add_signal(SIGTERM, shutdown_called); mainloop_add_signal(SIGINT, shutdown_called); mainloop_add_signal(SIGHUP, print_info_summary); g_main_run(mainloop); crm_info("Exiting %s.", crm_system_name); clean_up(EX_OK); return 0; }