diff --git a/src/main.c b/src/main.c index f59438f..96ae69f 100644 --- a/src/main.c +++ b/src/main.c @@ -1,1249 +1,1249 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "log.h" #include "booth.h" #include "config.h" #include "transport.h" #include "timer.h" #include "pacemaker.h" #include "ticket.h" #define RELEASE_VERSION "1.0" #define CLIENT_NALLOC 32 int daemonize = 0; static int client_maxi; static int client_size = 0; struct client *client = NULL; struct pollfd *pollfd = NULL; typedef enum { BOOTHD_STARTED=0, BOOTHD_STARTING } BOOTH_DAEMON_STATE; int poll_timeout = -1; typedef enum { OP_LIST = 1, OP_GRANT, OP_REVOKE, } operation_t; struct command_line { int type; /* ACT_ */ int op; /* OP_ */ char configfile[BOOTH_PATH_LEN]; char lockfile[BOOTH_PATH_LEN]; struct boothc_ticket_site_msg msg; }; static struct command_line cl; int do_read(int fd, void *buf, size_t count) { int rv, off = 0; while (off < count) { rv = read(fd, (char *)buf + off, count - off); if (rv == 0) return -1; if (rv == -1 && errno == EINTR) continue; if (rv == -1) return -1; off += rv; } return 0; } int do_write(int fd, void *buf, size_t count) { int rv, off = 0; retry: rv = write(fd, (char *)buf + off, count); if (rv == -1 && errno == EINTR) goto retry; /* If we cannot write _any_ data, we'd be in an (potential) loop. */ if (rv <= 0) { log_error("write failed: %s (%d)", strerror(errno), errno); return rv; } if (rv != count) { count -= rv; off += rv; goto retry; } return 0; } static int do_local_connect_and_write(void *data, int len, struct booth_node **ret) { struct booth_node *node; int rv; if (ret) *ret = NULL; /* Use locally reachable address, ie. in same cluster. */ if (!find_myself(&node, 1)) { log_error("Cannot find local cluster."); return ENOENT; } if (ret) *ret = node; /* Always use TCP within cluster. */ rv = booth_tcp_open(node); if (rv < 0) goto out; rv = booth_tcp_send(node, data, len); out: return rv; } static void client_alloc(void) { int i; if (!client) { client = malloc(CLIENT_NALLOC * sizeof(struct client)); pollfd = malloc(CLIENT_NALLOC * sizeof(struct pollfd)); } else { client = realloc(client, (client_size + CLIENT_NALLOC) * sizeof(struct client)); pollfd = realloc(pollfd, (client_size + CLIENT_NALLOC) * sizeof(struct pollfd)); if (!pollfd) log_error("can't alloc for pollfd"); } if (!client || !pollfd) log_error("can't alloc for client array"); for (i = client_size; i < client_size + CLIENT_NALLOC; i++) { client[i].workfn = NULL; client[i].deadfn = NULL; client[i].fd = -1; pollfd[i].fd = -1; pollfd[i].revents = 0; } client_size += CLIENT_NALLOC; } static void client_dead(int ci) { close(client[ci].fd); client[ci].workfn = NULL; client[ci].fd = -1; pollfd[ci].fd = -1; } int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci)) { int i; if (!client) client_alloc(); again: for (i = 0; i < client_size; i++) { if (client[i].fd == -1) { client[i].workfn = workfn; if (deadfn) client[i].deadfn = deadfn; else client[i].deadfn = client_dead; client[i].fd = fd; pollfd[i].fd = fd; pollfd[i].events = POLLIN; if (i > client_maxi) client_maxi = i; return i; } } client_alloc(); goto again; } void process_connection(int ci) { struct boothc_ticket_site_msg msg; struct ticket_config *tc; int is_local, rv, len, exp, olen; void (*deadfn) (int ci); char *data; rv = do_read(client[ci].fd, &msg.header, sizeof(msg.header)); if (rv < 0) { if (errno == ECONNRESET) log_debug("client %d connection reset for fd %d", ci, client[ci].fd); goto kill; } if (check_boothc_header(&msg.header, -1) < 0) goto kill; /* Basic sanity checks already done. */ len = ntohl(msg.header.length); if (len) { if (len != sizeof(msg)) { bad_len: log_error("got wrong length %u", len); return; } exp = len - sizeof(msg.header); rv = do_read(client[ci].fd, msg.header.data, exp); if (rv != exp) { log_error("connection %d read data error %d, wanted %d", ci, rv, exp); goto kill; } } olen = 0; /* Commands have input msg; * and output rv, data and olen (excluding header). */ switch (ntohl(msg.header.cmd)) { case BOOTHC_CMD_LIST: assert(!data); rv = list_ticket(&data, &olen); goto reply; case BOOTHC_CMD_GRANT: /* Expect boothc_ticket_site_msg. */ if (len != sizeof(msg)) goto bad_len; /* Need to return ticket name etc. */ olen = len; data = msg.header.data; if (!check_ticket(msg.ticket.id, &tc)) { rv = BOOTHC_RLT_INVALID_ARG; goto reply; } if (tc->owner != NO_OWNER) { log_error("client want to get an granted " "ticket %s", msg.ticket.id); rv = BOOTHC_RLT_OVERGRANT; goto reply; } if (!check_site(msg.site.site, &is_local)) { rv = BOOTHC_RLT_INVALID_ARG; goto reply; } if (is_local) rv = grant_ticket(msg.ticket.id); else rv = BOOTHC_RLT_REMOTE_OP; break; case BOOTHC_CMD_REVOKE: /* Expect boothc_ticket_site_msg. */ if (len != sizeof(msg)) goto bad_len; olen = len; data = msg.header.data; if (!check_ticket(msg.ticket.id, &tc)) { msg.header.result = BOOTHC_RLT_INVALID_ARG; goto reply; } if (!check_site(msg.site.site, &is_local)) { msg.header.result = BOOTHC_RLT_INVALID_ARG; goto reply; } if (is_local) msg.header.result = revoke_ticket(msg.ticket.id); else msg.header.result = BOOTHC_RLT_REMOTE_OP; break; case BOOTHC_CMD_CATCHUP: /* Expect boothc_ticket_site_msg. */ if (len != sizeof(msg)) goto bad_len; /* Need to return ticket name etc. */ olen = len; data = msg.header.data; if (!check_ticket(msg.ticket.id, &tc)) { rv = BOOTHC_RLT_INVALID_ARG; goto reply; } rv = catchup_ticket(&msg.ticket, tc); /* Only answer if we're the owner. */ if (rv == -1) goto kill; break; default: log_error("connection %d cmd %x unknown", ci, ntohl(msg.header.cmd)); goto kill; } reply: msg.header.result = htonl(rv); msg.header.length = htonl(olen + sizeof(msg.header)); rv = do_write(client[ci].fd, &msg.header, sizeof(msg.header)); if (rv < 0) log_error("connection %d write error %d", ci, rv); if (len) { rv = do_write(client[ci].fd, data, olen); if (rv < 0) log_error("connection %d write error %d", ci, rv); } return; kill: deadfn = client[ci].deadfn; if(deadfn) { deadfn(ci); } return; } static void process_listener(int ci) { int fd, i; fd = accept(client[ci].fd, NULL, NULL); if (fd < 0) { log_error("process_listener: accept error for fd %d: %s (%d)", client[ci].fd, strerror(errno), errno); if (client[ci].deadfn) client[ci].deadfn(ci); return; } i = client_add(fd, process_connection, NULL); log_debug("add client connection %d fd %d", i, fd); } static int setup_config(int type) { int rv; rv = read_config(cl.configfile); if (rv < 0) goto out; /* Set "local" pointer, ignoring errors. */ find_myself(NULL, 0); rv = check_config(type); if (rv < 0) goto out; /* Per default the PID file name is derived from the * configuration name. */ if (!cl.lockfile[0]) { snprintf(cl.lockfile, sizeof(cl.lockfile)-1, "%s/%s.pid", BOOTH_RUN_DIR, booth_conf->name); } out: return rv; } static int setup_transport(void) { int rv; rv = transport()->init(ticket_recv); if (rv < 0) { log_error("failed to init booth_transport %s", transport()->name); goto out; } rv = booth_transport[TCP].init(NULL); if (rv < 0) { log_error("failed to init booth_transport[TCP]"); goto out; } out: return rv; } static int setup_timer(void) { return timerlist_init(); } static int write_daemon_state(int fd, int state) { char buffer[1024]; int rv, size; size = sizeof(buffer) - 1; rv = snprintf(buffer, size, "booth_pid=%d " "booth_state=%s " "booth_type=%s " "booth_cfg_name='%s' " "booth_addr_string='%s' " "booth_port=%d\n", getpid(), ( state == BOOTHD_STARTED ? "started" : state == BOOTHD_STARTING ? "starting" : "invalid"), type_to_string(local->type), booth_conf->name, local->addr_string, booth_conf->port); if (rv < 0 || rv == size) { log_error("Buffer filled up in write_daemon_state()."); return -1; } size = rv; rv = ftruncate(fd, 0); if (rv < 0) { log_error("lockfile %s truncate error %d: %s", cl.lockfile, errno, strerror(errno)); return rv; } rv = lseek(fd, 0, SEEK_SET); if (rv < 0) { log_error("lseek set fd(%d) offset to 0 error, return(%d), message(%s)", fd, rv, strerror(errno)); rv = -1; return rv; } rv = write(fd, buffer, size); if (rv != size) { log_error("write to fd(%d, %d) returned %d, errno %d, message(%s)", fd, size, rv, errno, strerror(errno)); return -1; } return 0; } static int loop(int fd) { void (*workfn) (int ci); void (*deadfn) (int ci); int rv, i; rv = setup_timer(); if (rv < 0) goto fail; rv = setup_transport(); if (rv < 0) goto fail; rv = setup_ticket(); if (rv < 0) goto fail; client_add(rv, process_listener, NULL); rv = write_daemon_state(fd, BOOTHD_STARTED); if (rv != 0) { log_error("write daemon state %d to lockfile error %s: %s", BOOTHD_STARTED, cl.lockfile, strerror(errno)); goto fail; } if (cl.type == ARBITRATOR) log_info("BOOTH arbitrator daemon started"); else if (cl.type == SITE) log_info("BOOTH cluster site daemon started"); while (1) { rv = poll(pollfd, client_maxi + 1, poll_timeout); if (rv == -1 && errno == EINTR) continue; if (rv < 0) { log_error("poll failed: %s (%d)", strerror(errno), errno); goto fail; } for (i = 0; i <= client_maxi; i++) { if (client[i].fd < 0) continue; if (pollfd[i].revents & POLLIN) { workfn = client[i].workfn; if (workfn) workfn(i); } if (pollfd[i].revents & (POLLERR | POLLHUP | POLLNVAL)) { deadfn = client[i].deadfn; if (deadfn) deadfn(i); } } process_timerlist(); } return 0; fail: return -1; } static int query_get_string_answer(cmd_request_t cmd) { struct booth_node *node; struct boothc_header h, reply; char *data; int data_len; int rv; data = NULL; init_header(&h, cmd, 0, 0); rv = do_local_connect_and_write(&h, sizeof(h), &node); if (rv < 0) goto out; rv = local_transport->recv(node, &reply, sizeof(reply)); if (rv < 0) goto out_free; data_len = ntohl(reply.length) - sizeof(reply); data = malloc(data_len); if (!data) { rv = -ENOMEM; goto out_free; } rv = local_transport->recv(node, data, data_len); if (rv < 0) goto out_free; do_write(STDOUT_FILENO, data, data_len); rv = 0; out_free: free(data); local_transport->close(node); out: return rv; } static int do_command(cmd_request_t cmd) { struct booth_node *node, *to; struct boothc_header reply; int rv; node = NULL; to = NULL; init_ticket_site_header(&cl.msg, cmd); rv = do_local_connect_and_write(&cl.msg, sizeof(cl.msg), &node); if (rv < 0) goto out_close; rv = local_transport->recv(node, &reply, sizeof(reply)); if (rv < 0) goto out_close; if (reply.result == BOOTHC_RLT_INVALID_ARG) { log_info("invalid argument!"); rv = -1; goto out_close; } if (reply.result == BOOTHC_RLT_OVERGRANT) { log_info("You're granting a granted ticket " "If you wanted to migrate a ticket," "use revoke first, then use grant"); rv = -1; goto out_close; } if (reply.result == BOOTHC_RLT_REMOTE_OP) { if (!find_site_in_config(cl.msg.site.site, &to)) { log_error("Redirected to unknown site %s.", cl.msg.site.site); rv = -1; goto out_close; } rv = booth_transport[TCP].open(to); if (rv < 0) { goto out_close; } rv = booth_transport[TCP].send(to, &cl.msg, sizeof(cl.msg)); if (rv < 0) { booth_transport[TCP].close(to); goto out_close; } rv = booth_transport[TCP].recv(to, &reply, sizeof(struct boothc_header)); if (rv < 0) { booth_transport[TCP].close(to); goto out_close; } booth_transport[TCP].close(to); } if (reply.result == BOOTHC_RLT_ASYNC) { if (cmd == BOOTHC_CMD_GRANT) log_info("grant command sent, result will be returned " "asynchronously, you can get the result from " "the log files"); else if (cmd == BOOTHC_CMD_REVOKE) log_info("revoke command sent, result will be returned " "asynchronously, you can get the result from " "the log files."); else log_error("internal error reading reply result!"); rv = 0; } else if (reply.result == BOOTHC_RLT_SYNC_SUCC) { if (cmd == BOOTHC_CMD_GRANT) log_info("grant succeeded!"); else if (cmd == BOOTHC_CMD_REVOKE) log_info("revoke succeeded!"); rv = 0; } else if (reply.result == BOOTHC_RLT_SYNC_FAIL) { if (cmd == BOOTHC_CMD_GRANT) log_info("grant failed!"); else if (cmd == BOOTHC_CMD_REVOKE) log_info("revoke failed!"); rv = -1; } else { log_error("internal error!"); rv = -1; } out_close: if (node) local_transport->close(node); if (to) booth_transport[TCP].close(to); return rv; } static int do_grant(void) { return do_command(BOOTHC_CMD_GRANT); } static int do_revoke(void) { return do_command(BOOTHC_CMD_REVOKE); } static int _lockfile(int mode, int *fdp, pid_t *locked_by) { struct flock lock; int fd, rv; /* After reboot the directory may not yet exist. * Try to create it, but ignore errors. */ if (strncmp(cl.lockfile, BOOTH_RUN_DIR, strlen(BOOTH_RUN_DIR)) == 0) mkdir(BOOTH_RUN_DIR, 0775); if (locked_by) *locked_by = 0; *fdp = -1; fd = open(cl.lockfile, mode, 0664); if (fd < 0) return errno; *fdp = fd; lock.l_type = F_WRLCK; lock.l_start = 0; lock.l_whence = SEEK_SET; lock.l_len = 0; lock.l_pid = 0; if (fcntl(fd, F_SETLK, &lock) == 0) return 0; rv = errno; if (locked_by) if (fcntl(fd, F_GETLK, &lock) == 0) *locked_by = lock.l_pid; return rv; } static int lockfile(void) { int rv, fd; fd = -1; rv = _lockfile(O_CREAT | O_WRONLY, &fd, NULL); if (fd == -1) { log_error("lockfile %s open error %d: %s", cl.lockfile, rv, strerror(rv)); return -1; } if (rv < 0) { log_error("lockfile %s setlk error %d: %s", cl.lockfile, rv, strerror(rv)); goto fail; } rv = write_daemon_state(fd, BOOTHD_STARTING); if (rv != 0) { log_error("write daemon state %d to lockfile error %s: %s", BOOTHD_STARTING, cl.lockfile, strerror(errno)); goto fail; } return fd; fail: close(fd); return -1; } static void unlink_lockfile(int fd) { unlink(cl.lockfile); close(fd); } static void print_usage(void) { printf("Usage:\n"); printf("booth [options]\n"); printf("\n"); printf("Types:\n"); printf(" arbitrator: daemon running on arbitrator\n"); printf(" site: daemon running on cluster site\n"); printf(" client: command running from client\n"); printf("\n"); printf("Operations:\n"); printf("Please note that operations are valid iff type is client!\n"); printf(" list: List all the tickets\n"); printf(" grant: Grant ticket T(-t T) to site S(-s S)\n"); printf(" revoke: Revoke ticket T(-t T) from site S(-s S)\n"); printf("\n"); printf("Options:\n"); printf(" -c FILE Specify config file [default " BOOTH_DEFAULT_CONF "]\n"); printf(" -l LOCKFILE Specify lock file path\n"); printf(" -D Enable debugging to stderr and don't fork\n"); printf(" -t ticket name\n"); printf(" -S report local daemon status (for site and arbitrator)\n"); printf(" RA script compliant return codes.\n"); printf(" -s site name\n"); printf(" -h Print this help, then exit\n"); } #define OPTION_STRING "c:Dl:t:s:hS" void safe_copy(char *dest, char *value, size_t buflen, const char *description) { int content_len = buflen - 1; if (strlen(value) >= content_len) { fprintf(stderr, "'%s' exceeds maximum %s length of %d\n", value, description, content_len); exit(EXIT_FAILURE); } strncpy(dest, value, content_len); dest[content_len] = 0; } static int host_convert(char *hostname, char *ip_str, size_t ip_size) { struct addrinfo *result = NULL, hints = {0}; int re = -1; memset(&hints, 0, sizeof(hints)); hints.ai_family = BOOTH_PROTO_FAMILY; hints.ai_socktype = SOCK_DGRAM; re = getaddrinfo(hostname, NULL, &hints, &result); if (re == 0) { struct in_addr addr = ((struct sockaddr_in *)result->ai_addr)->sin_addr; const char *re_ntop = inet_ntop(BOOTH_PROTO_FAMILY, &addr, ip_str, ip_size); if (re_ntop == NULL) { re = -1; } } freeaddrinfo(result); return re; } static int read_arguments(int argc, char **argv) { int optchar; char *arg1 = argv[1]; char *op = NULL; char site_arg[INET_ADDRSTRLEN] = {0}; if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") || !strcmp(arg1, "-h")) { print_usage(); exit(EXIT_SUCCESS); } if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") || !strcmp(arg1, "-V")) { printf("%s %s (built %s %s)\n", argv[0], RELEASE_VERSION, __DATE__, __TIME__); exit(EXIT_SUCCESS); } if (strcmp(arg1, "arbitrator") == 0 || strcmp(arg1, "site") == 0 || strcmp(arg1, "start") == 0 || strcmp(arg1, "daemon") == 0) { cl.type = DAEMON; optind = 2; } else if (strcmp(arg1, "status") == 0) { cl.type = STATUS; optind = 2; } else if (strcmp(arg1, "client") == 0) { cl.type = CLIENT; if (argc < 3) { print_usage(); exit(EXIT_FAILURE); } op = argv[2]; optind = 3; } else { cl.type = CLIENT; op = argv[1]; optind = 2; } switch (cl.type) { case ARBITRATOR: break; case SITE: break; case CLIENT: if (!strcmp(op, "list")) cl.op = OP_LIST; else if (!strcmp(op, "grant")) cl.op = OP_GRANT; else if (!strcmp(op, "revoke")) cl.op = OP_REVOKE; else { fprintf(stderr, "client operation \"%s\" is unknown\n", op); exit(EXIT_FAILURE); } break; } while (optind < argc) { optchar = getopt(argc, argv, OPTION_STRING); switch (optchar) { case 'c': safe_copy(cl.configfile, optarg, sizeof(cl.configfile), "config file"); break; case 'D': daemonize = 1; debug_level++; break; case 'l': safe_copy(cl.lockfile, optarg, sizeof(cl.lockfile), "lock file"); break; case 't': if (cl.op == OP_GRANT || cl.op == OP_REVOKE) { safe_copy(cl.msg.ticket.id, optarg, sizeof(cl.msg.ticket.id), "ticket name"); } else { print_usage(); exit(EXIT_FAILURE); } break; case 's': if (cl.op == OP_GRANT || cl.op == OP_REVOKE) { int re = host_convert(optarg, site_arg, INET_ADDRSTRLEN); if (re == 0) { safe_copy(cl.msg.site.site, site_arg, sizeof(cl.msg.ticket), "site name"); } else { safe_copy(cl.msg.site.site, optarg, sizeof(cl.msg.ticket), "site name"); } } else { print_usage(); exit(EXIT_FAILURE); } break; case 'h': print_usage(); exit(EXIT_SUCCESS); break; case ':': case '?': fprintf(stderr, "Please use '-h' for usage.\n"); exit(EXIT_FAILURE); break; default: fprintf(stderr, "unknown option: %s\n", argv[optind]); exit(EXIT_FAILURE); break; }; } return 0; } static void set_scheduler(void) { struct sched_param sched_param; struct rlimit rlimit; int rv; rlimit.rlim_cur = RLIM_INFINITY; rlimit.rlim_max = RLIM_INFINITY; setrlimit(RLIMIT_MEMLOCK, &rlimit); rv = mlockall(MCL_CURRENT | MCL_FUTURE); if (rv < 0) { log_error("mlockall failed"); } rv = sched_get_priority_max(SCHED_RR); if (rv != -1) { sched_param.sched_priority = rv; rv = sched_setscheduler(0, SCHED_RR, &sched_param); if (rv == -1) log_error("could not set SCHED_RR priority %d: %s (%d)", sched_param.sched_priority, strerror(errno), errno); } else { log_error("could not get maximum scheduler priority err %d", errno); } } static void set_oom_adj(int val) { FILE *fp; fp = fopen("/proc/self/oom_adj", "w"); if (!fp) return; fprintf(fp, "%i", val); fclose(fp); } static int do_status(int type) { pid_t pid; int rv, lock_fd, ret; const char *reason = NULL; char lockfile_data[1024], *cp; ret = PCMK_OCF_NOT_RUNNING; /* TODO: query all, and return quit only if it's _cleanly_ not * running, ie. _neither_ of port/lockfile/process is available? * * Currently a single failure says "not running", even if "only" the * lockfile has been removed. */ rv = setup_config(type); if (rv) { reason = "Error reading configuration."; ret = PCMK_LSB_UNKNOWN_ERROR; goto quit; } if (!local) { reason = "No Service IP active here."; goto quit; } rv = _lockfile(O_RDWR, &lock_fd, &pid); if (rv == 0) { reason = "PID file not locked."; goto quit; } if (lock_fd == -1) { reason = "No PID file."; goto quit; } if (pid) { fprintf(stdout, "booth_lockpid=%d ", pid); fflush(stdout); } rv = read(lock_fd, lockfile_data, sizeof(lockfile_data) - 1); if (rv < 4) { reason = "Cannot read lockfile data."; ret = PCMK_LSB_UNKNOWN_ERROR; goto quit; } lockfile_data[rv] = 0; if (lock_fd != -1) close(lock_fd); /* Make sure it's only a single line */ cp = strchr(lockfile_data, '\r'); if (cp) *cp = 0; cp = strchr(lockfile_data, '\n'); if (cp) *cp = 0; rv = setup_udp_server(1); if (rv == 0) { reason = "UDP port not in use."; goto quit; } fprintf(stdout, "booth_lockfile='%s' %s\n", cl.lockfile, lockfile_data); if (daemonize) fprintf(stderr, "Booth at %s port %d seems to be running.\n", local->addr_string, booth_conf->port); return 0; quit: log_debug("not running: %s", reason); /* Ie. "DEBUG" */ if (daemonize) fprintf(stderr, "not running: %s\n", reason); return ret; } static int do_server(int type) { int lock_fd = -1; int rv = -1; static char log_ent[128] = DAEMON_NAME "-"; rv = setup_config(type); if (rv < 0) goto out; if (!local) { log_error("Cannot find myself in the configuration."); exit(EXIT_FAILURE); } if (!daemonize) { if (daemon(0, 0) < 0) { perror("daemon error"); exit(EXIT_FAILURE); } } /* The lockfile must be written to _after_ the call to daemon(), so * that the lockfile contains the pid of the daemon, not the parent. */ lock_fd = lockfile(); if (lock_fd < 0) return lock_fd; strcat(log_ent, type_to_string(local->type)); cl_log_set_entity(log_ent); cl_log_enable_stderr(debug_level ? TRUE : FALSE); cl_log_set_facility(HA_LOG_FACILITY); cl_inherit_logging_environment(0); if (local->type == ARBITRATOR) log_info("BOOTH arbitrator daemon is starting."); else if (local->type == SITE) log_info("BOOTH cluster site daemon is starting."); set_scheduler(); set_oom_adj(-16); set_proc_title("%s %s for [%s]:%d", DAEMON_NAME, type_to_string(local->type), local->addr_string, booth_conf->port); rv = loop(lock_fd); out: if (lock_fd >= 0) unlink_lockfile(lock_fd); return rv; } static int do_client(void) { int rv = -1; rv = setup_config(CLIENT); if (rv < 0) { log_error("cannot read config"); goto out; } switch (cl.op) { case OP_LIST: - rv = query_get_string_answer(BOOTHC_CMD_LIST); + rv = query_get_string_answer(CMD_LIST); break; case OP_GRANT: rv = do_grant(); break; case OP_REVOKE: rv = do_revoke(); break; } out: return rv; } int main(int argc, char *argv[], char *envp[]) { int rv; init_set_proc_title(argc, argv, envp); memset(&cl, 0, sizeof(cl)); strncpy(cl.configfile, BOOTH_DEFAULT_CONF, BOOTH_PATH_LEN - 1); cl.lockfile[0] = 0; debug_level = 0; cl_log_set_entity("booth"); cl_log_enable_stderr(TRUE); cl_log_set_facility(0); rv = read_arguments(argc, argv); if (rv < 0) goto out; switch (cl.type) { case STATUS: rv = do_status(cl.type); break; case ARBITRATOR: case DAEMON: case SITE: rv = do_server(cl.type); break; case CLIENT: rv = do_client(); break; } out: /* Normalize values. 0x100 would be seen as "OK" by waitpid(). */ return rv >= 0 && rv < 0x70 ? rv : 1; } diff --git a/src/paxos.c b/src/paxos.c index 3c86052..ba263fe 100644 --- a/src/paxos.c +++ b/src/paxos.c @@ -1,816 +1,816 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include "list.h" #include "booth.h" #include "config.h" #include "paxos.h" #include "log.h" /* Use numbers that are unlikely to conflict with other enums. */ typedef enum { - INIT = 0x5104, - PREPARING, - PROMISING, - PROPOSING, - ACCEPTING, - RECOVERY, - COMMITTED, - REJECTED, + OP_INIT = 0x5104, + OP_PREPARING, + OP_PROMISING, + OP_PROPOSING, + OP_ACCEPTING, + OP_RECOVERY, + OP_COMMITTED, + OP_REJECTED, } paxos_state_t; struct proposal { int ballot_number; char value[0]; }; struct learned { int ballot; int number; }; struct paxos_msghdr { paxos_state_t state; int from; char psname[PAXOS_NAME_LEN+1]; char piname[PAXOS_NAME_LEN+1]; int ballot_number; int proposer_id; unsigned int extralen; unsigned int valuelen; }; struct proposer { int state; int ballot; int open_number; int accepted_number; int proposed; struct proposal *proposal; }; struct acceptor { int state; int highest_promised; struct proposal *accepted_proposal; }; struct learner { int state; int learned_max; int learned_ballot; struct learned learned[0]; }; struct paxos_space; struct paxos_instance; struct proposer_operations { void (*prepare) (struct paxos_instance *, int *); void (*propose) (struct paxos_space *, struct paxos_instance *, void *, int); void (*commit) (struct paxos_space *, struct paxos_instance *, void *, int); }; struct acceptor_operations { void (*promise) (struct paxos_space *, struct paxos_instance *, void *, int); void (*accepted) (struct paxos_space *, struct paxos_instance *, void *, int); }; struct learner_operations { void (*response) (struct paxos_space *, struct paxos_instance *, void *, int); }; struct paxos_space { char name[PAXOS_NAME_LEN+1]; unsigned int number; unsigned int extralen; unsigned int valuelen; const unsigned char *role; const struct paxos_operations *p_op; const struct proposer_operations *r_op; const struct acceptor_operations *a_op; const struct learner_operations *l_op; struct list_head list; struct list_head pi_head; }; struct paxos_instance { char name[PAXOS_NAME_LEN+1]; int round; int *prio; struct proposer *proposer; struct acceptor *acceptor; struct learner *learner; void (*end) (pi_handle_t pih, int round, int result); struct list_head list; struct paxos_space *ps; }; static LIST_HEAD(ps_head); static int have_quorum(struct paxos_space *ps, int member) { int i, sum = 0; for (i = 0; i < ps->number; i++) { if (ps->role[i] & ACCEPTOR) sum++; } if (member * 2 > sum) return 1; else return 0; } static int next_ballot_number(struct paxos_instance *pi) { int ballot; int myid = pi->ps->p_op->get_myid(); if (pi->prio) ballot = pi->prio[myid]; else ballot = myid; while (ballot <= pi->round) ballot += pi->ps->number; return ballot; } static void prepare_a_message(struct boothc_ticket_msg *msg, int state, struct paxos_instance *pax_inst) { msg->ticket.state = htonl(state); msg->ticket.proposer_id = msg->header.from = htonl(booth_get_myid()); // strcpy(hdr->psname, pax_inst->ps->name); // strcpy(hdr->piname, pax_inst->name); msg->ticket.ballot = htonl(pax_inst->round); // hdr->extralen = htonl(pax_inst->ps->extralen); // extra = (char *)msg + sizeof(struct paxos_msghdr); // memcpy((char *)msg + sizeof(struct paxos_msghdr) + pi->ps->extralen, // value, pax_inst->ps->valuelen); } static void proposer_prepare(struct paxos_instance *pi, int *round) { struct boothc_ticket_msg msg; int ballot; log_debug("preposer prepare ..."); if (*round > pi->round) pi->round = *round; ballot = next_ballot_number(pi); pi->proposer->ballot = ballot; - prepare_a_message(&msg, PREPARING, pi); + prepare_a_message(&msg, OP_PREPARING, pi); if (lease_prepare(pi, &msg) < 0) return; transport()->broadcast(&msg, sizeof(msg)); *round = ballot; } static void proposer_propose(struct paxos_space *ps, struct paxos_instance *pi, struct boothc_ticket_msg *msg, int msglen) { struct paxos_msghdr *hdr; int ballot; log_debug("proposer propose ..."); ballot = ntohl(hdr->ballot_number); if (pi->proposer->ballot != ballot) { log_debug("not the same ballot, proposer ballot: %d, " "received ballot: %d", pi->proposer->ballot, ballot); return; } if (lease_is_prepared(pi, msg)) pi->proposer->open_number++; if (!have_quorum(ps, pi->proposer->open_number)) return; if (pi->proposer->proposed) return; pi->proposer->proposed = 1; value = pi->proposer->proposal->value; if (lease_propose(pih, &msg, ballot, value) < 0) return; - prepare_a_message(&msg, PROPOSING, pi); + prepare_a_message(&msg, OP_PROPOSING, pi); transport()->broadcast(&msg, sizeof(msg)) } static void proposer_commit(struct paxos_space *ps, struct paxos_instance *pi, void *msg, int msglen) { struct paxos_msghdr *hdr; pi_handle_t pih = (pi_handle_t)pi; void *extra; int ballot; log_debug("proposer commit ..."); if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) { log_error("message length incorrect, " "msglen: %d, msghdr len: %lu, extralen: %u", msglen, (long)sizeof(struct paxos_msghdr), ps->extralen); return; } extra = (char *)msg + sizeof(struct paxos_msghdr); hdr = msg; ballot = ntohl(hdr->ballot_number); if (pi->proposer->ballot != ballot) { log_debug("not the same ballot, proposer ballot: %d, " "received ballot: %d", pi->proposer->ballot, ballot); return; } pi->proposer->accepted_number++; if (!have_quorum(ps, pi->proposer->accepted_number)) return; - if (pi->proposer->state == COMMITTED) + if (pi->proposer->state == OP_COMMITTED) return; pi->round = ballot; if (ps->p_op->commit && ps->p_op->commit(pih, extra, pi->round) < 0) return; - pi->proposer->state = COMMITTED; + pi->proposer->state = OP_COMMITTED; if (pi->end) pi->end(pih, pi->round, 0); } static void acceptor_promise(struct paxos_space *ps, struct paxos_instance *pi, void *msg, int msglen) { struct paxos_msghdr *hdr; unsigned long to; pi_handle_t pih = (pi_handle_t)pi; void *extra; log_debug("acceptor promise ..."); - if (pi->acceptor->state == RECOVERY) { + if (pi->acceptor->state == OP_RECOVERY) { log_debug("still in recovery"); return; } if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) { log_error("message length incorrect, " "msglen: %d, msghdr len: %lu, extralen: %u", msglen, (long)sizeof(struct paxos_msghdr), ps->extralen); return; } hdr = msg; extra = (char *)msg + sizeof(struct paxos_msghdr); if (ntohl(hdr->ballot_number) < pi->acceptor->highest_promised) { log_debug("ballot number: %d, highest promised: %d", ntohl(hdr->ballot_number), pi->acceptor->highest_promised); return; } if (ps->p_op->promise && ps->p_op->promise(pih, extra) < 0) return; pi->acceptor->highest_promised = ntohl(hdr->ballot_number); - pi->acceptor->state = PROMISING; + pi->acceptor->state = OP_PROMISING; to = ntohl(hdr->from); hdr->from = htonl(ps->p_op->get_myid()); - hdr->state = htonl(PROMISING); + hdr->state = htonl(OP_PROMISING); ps->p_op->send(to, msg, msglen); } static void acceptor_accepted(struct paxos_space *ps, struct paxos_instance *pi, void *msg, int msglen) { struct paxos_msghdr *hdr; unsigned long to; pi_handle_t pih = (pi_handle_t)pi; void *extra, *value; int myid = ps->p_op->get_myid(); int ballot; log_debug("acceptor accepted ..."); - if (pi->acceptor->state == RECOVERY) { + if (pi->acceptor->state == OP_RECOVERY) { log_debug("still in recovery"); return; } if (msglen != sizeof(struct paxos_msghdr) + ps->extralen + ps->valuelen) { log_error("message length incorrect, msglen: " "%d, msghdr len: %lu, extralen: %u, valuelen: %u", msglen, (long)sizeof(struct paxos_msghdr), ps->extralen, ps->valuelen); return; } hdr = msg; extra = (char *)msg + sizeof(struct paxos_msghdr); ballot = ntohl(hdr->ballot_number); if (ballot < pi->acceptor->highest_promised) { log_debug("ballot: %d, highest promised: %d", ballot, pi->acceptor->highest_promised); return; } value = pi->acceptor->accepted_proposal->value; memcpy(value, (char *)msg + sizeof(struct paxos_msghdr) + ps->extralen, ps->valuelen); if (ps->p_op->accepted && ps->p_op->accepted(pih, extra, ballot, value) < 0) return; - pi->acceptor->state = ACCEPTING; + pi->acceptor->state = OP_ACCEPTING; to = ntohl(hdr->from); hdr->from = htonl(myid); - hdr->state = htonl(ACCEPTING); + hdr->state = htonl(OP_ACCEPTING); if (ps->p_op->broadcast) ps->p_op->broadcast(msg, sizeof(struct paxos_msghdr) + ps->extralen); else { int i; for (i = 0; i < ps->number; i++) { if (ps->role[i] & LEARNER) ps->p_op->send(i, msg, sizeof(struct paxos_msghdr) + ps->extralen); } if (!(ps->role[to] & LEARNER)) ps->p_op->send(to, msg, sizeof(struct paxos_msghdr) + ps->extralen); } } static void learner_response(struct paxos_space *ps, struct paxos_instance *pi, void *msg, int msglen) { struct paxos_msghdr *hdr; pi_handle_t pih = (pi_handle_t)pi; void *extra; int i, unused = 0, found = 0; int ballot; log_debug("learner response ..."); if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) { log_error("message length incorrect, " "msglen: %d, msghdr len: %lu, extralen: %u", msglen, (long)sizeof(struct paxos_msghdr), ps->extralen); return; } hdr = msg; extra = (char *)msg + sizeof(struct paxos_msghdr); ballot = ntohl(hdr->ballot_number); for (i = 0; i < ps->number; i++) { if (!pi->learner->learned[i].ballot) { unused = i; break; } if (pi->learner->learned[i].ballot == ballot) { pi->learner->learned[i].number++; if (pi->learner->learned[i].number > pi->learner->learned_max) pi->learner->learned_max = pi->learner->learned[i].number; found = 1; break; } } if (!found) { pi->learner->learned[unused].ballot = ntohl(hdr->ballot_number); pi->learner->learned[unused].number = 1; } if (!have_quorum(ps, pi->learner->learned_max)) return; if (ps->p_op->learned) ps->p_op->learned(pih, extra, ballot); } const struct proposer_operations generic_proposer_operations = { .prepare = proposer_prepare, .propose = proposer_propose, .commit = proposer_commit, }; const struct acceptor_operations generic_acceptor_operations = { .promise = acceptor_promise, .accepted = acceptor_accepted, }; const struct learner_operations generic_learner_operations = { .response = learner_response, }; ps_handle_t paxos_space_init(const void *name, unsigned int number, unsigned int extralen, unsigned int valuelen, const unsigned char *role, const struct paxos_operations *p_op) { struct paxos_space *ps; list_for_each_entry(ps, &ps_head, list) { if (!strcmp(ps->name, name)) { log_info("paxos space (%s) has already been " "initialized", (char *)name); return -EEXIST; } } if (!number || !valuelen || !p_op || !p_op->get_myid || !p_op->send) { log_error("invalid agruments"); return -EINVAL; } ps = malloc(sizeof(struct paxos_space)); if (!ps) { log_error("no mem for paxos space"); return -ENOMEM; } memset(ps, 0, sizeof(struct paxos_space)); strncpy(ps->name, name, PAXOS_NAME_LEN + 1); ps->number = number; ps->extralen = extralen; ps->valuelen = valuelen; ps->role = role; ps->p_op = p_op; ps->r_op = &generic_proposer_operations; ps->a_op = &generic_acceptor_operations; ps->l_op = &generic_learner_operations; list_add_tail(&ps->list, &ps_head); - INIT_LIST_HEAD(&ps->pi_head); + OP_INIT_LIST_HEAD(&ps->pi_head); return (ps_handle_t)ps; } pi_handle_t paxos_instance_init(ps_handle_t handle, const void *name, int *prio) { struct paxos_space *ps = (struct paxos_space *)handle; struct paxos_instance *pi; struct proposer *proposer = NULL; struct acceptor *acceptor = NULL; struct learner *learner = NULL; int myid, valuelen, rv; list_for_each_entry(pi, &ps->pi_head, list) { if (!strcmp(pi->name, name)) return (pi_handle_t)pi; } if (handle <= 0 || !ps->p_op || !ps->p_op->get_myid) { log_error("invalid agruments"); rv = -EINVAL; goto out; } myid = ps->p_op->get_myid(); valuelen = ps->valuelen; pi = malloc(sizeof(struct paxos_instance)); if (!pi) { log_error("no mem for paxos instance"); rv = -ENOMEM; goto out; } memset(pi, 0, sizeof(struct paxos_instance)); strncpy(pi->name, name, PAXOS_NAME_LEN + 1); if (prio) { pi->prio = malloc(ps->number * sizeof(int)); if (!pi->prio) { log_error("no mem for prio"); rv = -ENOMEM; goto out_pi; } memcpy(pi->prio, prio, ps->number * sizeof(int)); } if (ps->role[myid] & PROPOSER) { proposer = malloc(sizeof(struct proposer)); if (!proposer) { log_error("no mem for proposer"); rv = -ENOMEM; goto out_prio; } memset(proposer, 0, sizeof(struct proposer)); - proposer->state = INIT; + proposer->state = OP_INIT; proposer->proposal = malloc(sizeof(struct proposal) + valuelen); if (!proposer->proposal) { log_error("no mem for proposal"); rv = -ENOMEM; goto out_proposer; } memset(proposer->proposal, 0, sizeof(struct proposal) + valuelen); pi->proposer = proposer; } if (ps->role[myid] & ACCEPTOR) { acceptor = malloc(sizeof(struct acceptor)); if (!acceptor) { log_error("no mem for acceptor"); rv = -ENOMEM; goto out_proposal; } memset(acceptor, 0, sizeof(struct acceptor)); - acceptor->state = INIT; + acceptor->state = OP_INIT; acceptor->accepted_proposal = malloc(sizeof(struct proposal) + valuelen); if (!acceptor->accepted_proposal) { log_error("no mem for accepted proposal"); rv = -ENOMEM; goto out_acceptor; } memset(acceptor->accepted_proposal, 0, sizeof(struct proposal) + valuelen); pi->acceptor = acceptor; if (ps->p_op->catchup) - pi->acceptor->state = RECOVERY; + pi->acceptor->state = OP_RECOVERY; else - pi->acceptor->state = INIT; + pi->acceptor->state = OP_INIT; } if (ps->role[myid] & LEARNER) { learner = malloc(sizeof(struct learner) + ps->number * sizeof(struct learned)); if (!learner) { log_error("no mem for learner"); rv = -ENOMEM; goto out_accepted_proposal; } memset(learner, 0, sizeof(struct learner) + ps->number * sizeof(struct learned)); - learner->state = INIT; + learner->state = OP_INIT; pi->learner = learner; } pi->ps = ps; list_add_tail(&pi->list, &ps->pi_head); return (pi_handle_t)pi; out_accepted_proposal: if (ps->role[myid] & ACCEPTOR) free(acceptor->accepted_proposal); out_acceptor: if (ps->role[myid] & ACCEPTOR) free(acceptor); out_proposal: if (ps->role[myid] & PROPOSER) free(proposer->proposal); out_proposer: if (ps->role[myid] & PROPOSER) free(proposer); out_prio: if (pi->prio) free(pi->prio); out_pi: free(pi); out: return rv; } int paxos_round_request(pi_handle_t handle, void *value, int *round, void (*end_request) (pi_handle_t handle, int round, int result)) { struct paxos_instance *pi = (struct paxos_instance *)handle; int myid = pi->ps->p_op->get_myid(); int rv = *round; if (!(pi->ps->role[myid] & PROPOSER)) { log_debug("only proposer can do this"); return -EOPNOTSUPP; } - pi->proposer->state = PREPARING; + pi->proposer->state = OP_PREPARING; pi->proposer->open_number = 0; pi->proposer->accepted_number = 0; pi->proposer->proposed = 0; memcpy(pi->proposer->proposal->value, value, pi->ps->valuelen); pi->end = end_request; pi->ps->r_op->prepare(pi, &rv); return rv; } int paxos_recovery_status_get(pi_handle_t handle) { struct paxos_instance *pi = (struct paxos_instance *)handle; int myid = pi->ps->p_op->get_myid(); if (!(pi->ps->role[myid] & ACCEPTOR)) return -EOPNOTSUPP; - if (pi->acceptor->state == RECOVERY) + if (pi->acceptor->state == OP_RECOVERY) return 1; else return 0; } int paxos_recovery_status_set(pi_handle_t handle, int recovery) { struct paxos_instance *pi = (struct paxos_instance *)handle; int myid = pi->ps->p_op->get_myid(); if (!(pi->ps->role[myid] & ACCEPTOR)) return -EOPNOTSUPP; if (recovery) - pi->acceptor->state = RECOVERY; + pi->acceptor->state = OP_RECOVERY; else - pi->acceptor->state = INIT; + pi->acceptor->state = OP_INIT; return 0; } int paxos_propose(pi_handle_t handle, void *value, int round) { struct paxos_instance *pi = (struct paxos_instance *)handle; struct paxos_msghdr *hdr; void *extra, *msg; int len = sizeof(struct paxos_msghdr) + pi->ps->extralen + pi->ps->valuelen; if (!pi->proposer->ballot) pi->proposer->ballot = round; if (round != pi->proposer->ballot) { log_debug("round: %d, proposer ballot: %d", round, pi->proposer->ballot); return -EINVAL; } msg = malloc(len); if (!msg) { log_error("no mem for msg"); return -ENOMEM; } - pi->proposer->state = PROPOSING; + pi->proposer->state = OP_PROPOSING; strcpy(pi->proposer->proposal->value, value); pi->proposer->accepted_number = 0; pi->round = round; memset(msg, 0, len); hdr = msg; - hdr->state = htonl(PROPOSING); + hdr->state = htonl(OP_PROPOSING); hdr->from = htonl(pi->ps->p_op->get_myid()); hdr->proposer_id = hdr->from; strcpy(hdr->psname, pi->ps->name); strcpy(hdr->piname, pi->name); hdr->ballot_number = htonl(pi->round); hdr->extralen = htonl(pi->ps->extralen); extra = (char *)msg + sizeof(struct paxos_msghdr); memcpy((char *)msg + sizeof(struct paxos_msghdr) + pi->ps->extralen, value, pi->ps->valuelen); if (pi->ps->p_op->propose) pi->ps->p_op->propose(handle, extra, round, value); if (pi->ps->p_op->broadcast) pi->ps->p_op->broadcast(msg, len); else { int i; for (i = 0; i < pi->ps->number; i++) { if (pi->ps->role[i] & ACCEPTOR) pi->ps->p_op->send(i, msg, len); } } free(msg); return 0; } int paxos_catchup(pi_handle_t handle) { struct paxos_instance *pi = (struct paxos_instance *)handle; return pi->ps->p_op->catchup(handle); } int paxos_recvmsg(struct boothc_ticket_msg *msg) { struct paxos_msghdr *hdr = msg; struct paxos_space *ps; struct paxos_instance *pi; int found = 0; int myid; list_for_each_entry(ps, &ps_head, list) { if (!strcmp(ps->name, hdr->psname)) { found = 1; break; } } if (!found) { log_error("could not find the received ps name (%s) " "in registered list", hdr->psname); return -EINVAL; } myid = ps->p_op->get_myid(); found = 0; list_for_each_entry(pi, &ps->pi_head, list) { if (!strcmp(pi->name, hdr->piname)) { found = 1; break; } } if (!found) paxos_instance_init((ps_handle_t)ps, hdr->piname, NULL); switch (ntohl(hdr->state)) { - case PREPARING: + case OP_PREPARING: if (ps->role[myid] & ACCEPTOR) ps->a_op->promise(ps, pi, msg, msglen); break; - case PROMISING: + case OP_PROMISING: ps->r_op->propose(ps, pi, msg, msglen); break; - case PROPOSING: + case OP_PROPOSING: if (ps->role[myid] & ACCEPTOR) ps->a_op->accepted(ps, pi, msg, msglen); break; - case ACCEPTING: + case OP_ACCEPTING: if (ntohl(hdr->proposer_id) == myid) ps->r_op->commit(ps, pi, msg, msglen); else if (ps->role[myid] & LEARNER) ps->l_op->response(ps, pi, msg, msglen); break; default: log_debug("invalid message type: %d", ntohl(hdr->state)); break; }; return 0; }