diff --git a/src/main.c b/src/main.c index 9c7165b..1aa2668 100644 --- a/src/main.c +++ b/src/main.c @@ -1,1013 +1,1067 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "log.h" #include "booth.h" #include "config.h" #include "transport.h" #include "timer.h" #include "pacemaker.h" #include "ticket.h" #define RELEASE_VERSION "1.0" #define CLIENT_NALLOC 32 int log_logfile_priority = LOG_INFO; int log_syslog_priority = LOG_ERR; int log_stderr_priority = LOG_ERR; static int client_maxi; static int client_size = 0; struct client *client = NULL; struct pollfd *pollfd = NULL; int poll_timeout = -1; typedef enum { ACT_ARBITRATOR = 1, ACT_SITE, ACT_CLIENT, } booth_role_t; typedef enum { OP_LIST = 1, OP_GRANT, OP_REVOKE, } operation_t; struct command_line { int type; /* ACT_ */ int op; /* OP_ */ int debug; int force; int expiry; char site[BOOTH_NAME_LEN]; char ticket[BOOTH_NAME_LEN]; }; static struct command_line cl; int do_read(int fd, void *buf, size_t count) { int rv, off = 0; while (off < count) { rv = read(fd, (char *)buf + off, count - off); if (rv == 0) return -1; if (rv == -1 && errno == EINTR) continue; if (rv == -1) return -1; off += rv; } return 0; } int do_write(int fd, void *buf, size_t count) { int rv, off = 0; retry: rv = write(fd, (char *)buf + off, count); if (rv == -1 && errno == EINTR) goto retry; if (rv < 0) { log_error("write errno %d", errno); return rv; } if (rv != count) { count -= rv; off += rv; goto retry; } return 0; } static int do_connect(const char *sock_path) { struct sockaddr_un sun; socklen_t addrlen; int rv, fd; fd = socket(PF_UNIX, SOCK_STREAM, 0); if (fd < 0) goto out; memset(&sun, 0, sizeof(sun)); sun.sun_family = AF_UNIX; strcpy(&sun.sun_path[1], sock_path); addrlen = sizeof(sa_family_t) + strlen(sun.sun_path+1) + 1; rv = connect(fd, (struct sockaddr *) &sun, addrlen); if (rv < 0) { close(fd); fd = rv; } out: return fd; } static void init_header(struct boothc_header *h, int cmd, int option, int expiry, int result, int data_len) { memset(h, 0, sizeof(struct boothc_header)); h->magic = BOOTHC_MAGIC; h->version = BOOTHC_VERSION; h->len = data_len; h->cmd = cmd; h->option = option; h->expiry = expiry; h->result = result; } static void client_alloc(void) { int i; if (!client) { client = malloc(CLIENT_NALLOC * sizeof(struct client)); pollfd = malloc(CLIENT_NALLOC * sizeof(struct pollfd)); } else { client = realloc(client, (client_size + CLIENT_NALLOC) * sizeof(struct client)); pollfd = realloc(pollfd, (client_size + CLIENT_NALLOC) * sizeof(struct pollfd)); if (!pollfd) log_error("can't alloc for pollfd"); } if (!client || !pollfd) log_error("can't alloc for client array"); for (i = client_size; i < client_size + CLIENT_NALLOC; i++) { client[i].workfn = NULL; client[i].deadfn = NULL; client[i].fd = -1; pollfd[i].fd = -1; pollfd[i].revents = 0; } client_size += CLIENT_NALLOC; } static void client_dead(int ci) { close(client[ci].fd); client[ci].workfn = NULL; client[ci].fd = -1; pollfd[ci].fd = -1; } int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci)) { int i; if (!client) client_alloc(); again: for (i = 0; i < client_size; i++) { if (client[i].fd == -1) { client[i].workfn = workfn; if (deadfn) client[i].deadfn = deadfn; else client[i].deadfn = client_dead; client[i].fd = fd; pollfd[i].fd = fd; pollfd[i].events = POLLIN; if (i > client_maxi) client_maxi = i; return i; } } client_alloc(); goto again; } static int setup_listener(const char *sock_path) { struct sockaddr_un addr; socklen_t addrlen; int rv, s; s = socket(AF_LOCAL, SOCK_STREAM, 0); if (s < 0) { log_error("socket error %d %d", s, errno); return s; } memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_LOCAL; strcpy(&addr.sun_path[1], sock_path); addrlen = sizeof(sa_family_t) + strlen(addr.sun_path+1) + 1; rv = bind(s, (struct sockaddr *) &addr, addrlen); if (rv < 0) { log_error("bind error %d %d", rv, errno); close(s); return rv; } rv = listen(s, 5); if (rv < 0) { log_error("listen error %d %d", rv, errno); close(s); return rv; } return s; } void process_connection(int ci) { struct boothc_header h; char *data = NULL; char *site, *ticket; int local, rv; rv = do_read(client[ci].fd, &h, sizeof(h)); if (rv < 0) { log_error("connection %d read error %d", ci, rv); return; } if (h.magic != BOOTHC_MAGIC) { log_error("connection %d magic error %x", ci, h.magic); return; } if (h.version != BOOTHC_VERSION) { log_error("connection %d version error %x", ci, h.version); return; } if (h.len) { data = malloc(h.len); if (!data) { log_error("process_connection no mem %u", h.len); return; } memset(data, 0, h.len); rv = do_read(client[ci].fd, data, h.len); if (rv < 0) { log_error("connection %d read data error %d", ci, rv); goto out; } } switch (h.cmd) { case BOOTHC_CMD_LIST: break; case BOOTHC_CMD_GRANT: site = data; ticket = data + BOOTH_NAME_LEN; if (!check_ticket(ticket)) { h.result = BOOTHC_RLT_INVALID_ARG; goto reply; } if (!check_site(site, &local)) { h.result = BOOTHC_RLT_INVALID_ARG; goto reply; } if (local) h.result = grant_ticket(ticket, h.option, h.expiry); else h.result = BOOTHC_RLT_REMOTE_OP; break; case BOOTHC_CMD_REVOKE: + site = data; + ticket = data + BOOTH_NAME_LEN; + if (!check_ticket(ticket)) { + h.result = BOOTHC_RLT_INVALID_ARG; + goto reply; + } + if (!check_site(site, &local)) { + h.result = BOOTHC_RLT_INVALID_ARG; + goto reply; + } + if (local) + h.result = revoke_ticket(ticket, h.option); + else + h.result = BOOTHC_RLT_REMOTE_OP; break; default: log_error("connection %d cmd %x unknown", ci, h.cmd); break; } reply: rv = do_write(client[ci].fd, &h, sizeof(h)); if (rv < 0) log_error("connection %d write error %d", ci, rv); out: free(data); } static void process_listener(int ci) { int fd, i; fd = accept(client[ci].fd, NULL, NULL); if (fd < 0) { log_error("process_listener: accept error %d %d", fd, errno); return; } i = client_add(fd, process_connection, NULL); log_debug("client connection %d fd %d", i, fd); } static int setup_config(int type) { int rv; rv = read_config(BOOTH_DEFAULT_CONF); if (rv < 0) goto out; rv = check_config(type); if (rv < 0) goto out; out: return rv; } static int setup_transport(void) { int rv; rv = booth_transport[booth_conf->proto].init(ticket_recv); if (rv < 0) goto out; rv = booth_transport[TCP].init(NULL); if (rv < 0) goto out; out: return rv; } static int setup_timer(void) { return timerlist_init(); } static int loop(int type) { void (*workfn) (int ci); void (*deadfn) (int ci); int rv, i; rv = setup_config(type); if (rv < 0) goto fail; rv = setup_timer(); if (rv < 0) goto fail; rv = setup_transport(); if (rv < 0) goto fail; rv = setup_ticket(); if (rv < 0) goto fail; rv = setup_listener(BOOTHC_SOCK_PATH); if (rv < 0) goto fail; client_add(rv, process_listener, NULL); while (1) { rv = poll(pollfd, client_maxi + 1, poll_timeout); if (rv == -1 && errno == EINTR) continue; if (rv < 0) { log_error("poll errno %d", errno); goto fail; } for (i = 0; i <= client_maxi; i++) { if (client[i].fd < 0) continue; if (pollfd[i].revents & POLLIN) { workfn = client[i].workfn; if (workfn) workfn(i); } if (pollfd[i].revents & (POLLERR | POLLHUP | POLLNVAL)) { deadfn = client[i].deadfn; if (deadfn) deadfn(i); } } process_timerlist(); } return 0; fail: return -1; } static int do_list(void) { struct boothc_header h, *rh; char *reply = NULL, *data; int data_len; int fd, rv; init_header(&h, BOOTHC_CMD_LIST, 0, 0, 0, 0); fd = do_connect(BOOTHC_SOCK_PATH); if (fd < 0) { rv = fd; goto out; } rv = do_write(fd, &h, sizeof(h)); if (rv < 0) goto out_close; reply = malloc(sizeof(struct boothc_header)); if (!reply) { rv = -ENOMEM; goto out_close; } rv = do_read(fd, reply, sizeof(struct boothc_header)); if (rv < 0) goto out_free; rh = (struct boothc_header *)reply; data_len = rh->len; reply = realloc(reply, sizeof(struct boothc_header) + data_len); if (!reply) { rv = -ENOMEM; goto out_free; } data = reply + sizeof(struct boothc_header); rv = do_read(fd, data, data_len); if (rv < 0) goto out_free; do_write(STDOUT_FILENO, data, data_len); rv = 0; out_free: free(reply); out_close: close(fd); out: return rv; } static int do_grant(void) { char *buf; struct boothc_header *h, reply; int buflen; uint32_t force = 0; int fd, rv; buflen = sizeof(struct boothc_header) + sizeof(cl.site) + sizeof(cl.ticket); buf = malloc(buflen); if (!buf) { rv = -ENOMEM; goto out; } h = (struct boothc_header *)buf; if (cl.force) force = BOOTHC_OPT_FORCE; init_header(h, BOOTHC_CMD_GRANT, force, cl.expiry, 0, sizeof(cl.site) + sizeof(cl.ticket)); strcpy(buf + sizeof(struct boothc_header), cl.site); strcpy(buf + sizeof(struct boothc_header) + sizeof(cl.site), cl.ticket); fd = do_connect(BOOTHC_SOCK_PATH); if (fd < 0) { rv = fd; goto out_free; } rv = do_write(fd, buf, buflen); if (rv < 0) goto out_close; rv = do_read(fd, &reply, sizeof(struct boothc_header)); if (rv < 0) goto out_close; if (reply.result == BOOTHC_RLT_INVALID_ARG) { log_info("invalid argument!"); rv = -1; goto out_close; } if (reply.result == BOOTHC_RLT_REMOTE_OP) { struct booth_node to; int s; memset(&to, 0, sizeof(struct booth_node)); to.family = BOOTH_PROTO_FAMILY; strcpy(to.addr, cl.site); s = booth_transport[TCP].open(&to); if (s < 0) goto out_close; rv = booth_transport[TCP].send(s, buf, buflen); if (rv < 0) { booth_transport[TCP].close(s); goto out_close; } rv = booth_transport[TCP].recv(s, &reply, - sizeof(struct boothc_header)); + sizeof(struct boothc_header)); if (rv < 0) { booth_transport[TCP].close(s); goto out_close; } booth_transport[TCP].close(s); } if (reply.result == BOOTHC_RLT_ASYNC) { log_info("grant command sent, but result is async."); rv = 0; } else if (reply.result == BOOTHC_RLT_SYNC_SUCC) { log_info("grant succeeded!"); rv = 0; } else if (reply.result == BOOTHC_RLT_SYNC_FAIL) { log_info("grant failed!"); rv = 0; } else { log_error("internal error!"); rv = -1; } out_close: close(fd); out_free: free(buf); out: return rv; } static int do_revoke(void) { char *buf; struct boothc_header *h, reply; int buflen; + uint32_t force = 0; int fd, rv; - buflen = sizeof(struct boothc_header) + sizeof(cl.ticket); + buflen = sizeof(struct boothc_header) + + sizeof(cl.site) + sizeof(cl.ticket); buf = malloc(buflen); if (!buf) { rv = -ENOMEM; goto out; } h = (struct boothc_header *)buf; - init_header(h, BOOTHC_CMD_REVOKE, 0, 0, 0, sizeof(cl.ticket)); - strcpy(buf + sizeof(struct boothc_header), cl.ticket); + if (cl.force) + force = BOOTHC_OPT_FORCE; + init_header(h, BOOTHC_CMD_REVOKE, force, 0, 0, + sizeof(cl.site) + sizeof(cl.ticket)); + strcpy(buf + sizeof(struct boothc_header), cl.site); + strcpy(buf + sizeof(struct boothc_header) + sizeof(cl.site), cl.ticket); + fd = do_connect(BOOTHC_SOCK_PATH); if (fd < 0) { rv = fd; goto out_free; } rv = do_write(fd, buf, buflen); if (rv < 0) goto out_close; rv = do_read(fd, &reply, sizeof(struct boothc_header)); if (rv < 0) goto out_close; + + if (reply.result == BOOTHC_RLT_INVALID_ARG) { + log_info("invalid argument!"); + rv = -1; + goto out_close; + } + + if (reply.result == BOOTHC_RLT_REMOTE_OP) { + struct booth_node to; + int s; + + memset(&to, 0, sizeof(struct booth_node)); + to.family = BOOTH_PROTO_FAMILY; + strcpy(to.addr, cl.site); + + s = booth_transport[TCP].open(&to); + if (s < 0) + goto out_close; + + rv = booth_transport[TCP].send(s, buf, buflen); + if (rv < 0) { + booth_transport[TCP].close(s); + goto out_close; + } + rv = booth_transport[TCP].recv(s, &reply, + sizeof(struct boothc_header)); + if (rv < 0) { + booth_transport[TCP].close(s); + goto out_close; + } + booth_transport[TCP].close(s); + } + if (reply.result == BOOTHC_RLT_ASYNC) { log_info("revoke command sent, but result is async."); rv = 0; } else if (reply.result == BOOTHC_RLT_SYNC_SUCC) { log_info("revoke succeeded!"); rv = 0; } else if (reply.result == BOOTHC_RLT_SYNC_FAIL) { log_info("revoke failed!"); rv = 0; } else { log_error("internal error!"); rv = -1; } out_close: close(fd); out_free: free(buf); out: return rv; } static int lockfile(void) { char path[PATH_MAX]; char buf[16]; struct flock lock; int fd, rv; snprintf(path, PATH_MAX, "%s/%s", BOOTH_RUN_DIR, BOOTH_LOCKFILE_NAME); fd = open(path, O_CREAT|O_WRONLY, 0666); if (fd < 0) { log_error("lockfile open error %s: %s", path, strerror(errno)); return -1; } lock.l_type = F_WRLCK; lock.l_start = 0; lock.l_whence = SEEK_SET; lock.l_len = 0; rv = fcntl(fd, F_SETLK, &lock); if (rv < 0) { log_error("lockfile setlk error %s: %s", path, strerror(errno)); goto fail; } rv = ftruncate(fd, 0); if (rv < 0) { log_error("lockfile truncate error %s: %s", path, strerror(errno)); goto fail; } memset(buf, 0, sizeof(buf)); snprintf(buf, sizeof(buf), "%d\n", getpid()); rv = write(fd, buf, strlen(buf)); if (rv <= 0) { log_error("lockfile write error %s: %s", path, strerror(errno)); goto fail; } return fd; fail: close(fd); return -1; } static void unlink_lockfile(int fd) { char path[PATH_MAX]; snprintf(path, PATH_MAX, "%s/%s", BOOTH_RUN_DIR, BOOTH_LOCKFILE_NAME); unlink(path); close(fd); } static void print_usage(void) { printf("Usage:\n"); printf("booth [options]\n"); printf("\n"); printf("Types:\n"); printf(" arbitrator: daemon running on arbitrator\n"); printf(" site: daemon running on cluster site\n"); printf(" client: command running from client\n"); printf("\n"); printf("Operations:\n"); printf("Please note that operations are valid iff type is client!\n"); printf("list: List all the tickets\n"); printf("grant: Grant ticket T(-t T) to site S(-s S)\n"); - printf("revoke: Revoke ticket T(-t T) from local site\n"); + printf("revoke: Revoke ticket T(-t T) from site S(-s S)\n"); printf("\n"); printf("Options:\n"); printf(" -D Enable debugging to stderr and don't fork\n"); printf(" -t ticket name\n"); printf(" -s site name\n"); printf(" -f ticket attribute: force, only valid when " "granting\n"); printf(" -e ticket will failover after the expiry time if " "not being renewed,\n\t\tset it while " "granting. default: 600sec\n"); printf(" -h Print this help, then exit\n"); } #define OPTION_STRING "Dt:s:fe:h" static int read_arguments(int argc, char **argv) { int optchar; char *arg1 = argv[1]; char *op; if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") || !strcmp(arg1, "-h")) { print_usage(); exit(EXIT_SUCCESS); } if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") || !strcmp(arg1, "-V")) { printf("%s %s (built %s %s)\n", argv[0], RELEASE_VERSION, __DATE__, __TIME__); exit(EXIT_SUCCESS); } if (!strcmp(arg1, "arbitrator")) { cl.type = ACT_ARBITRATOR; optind = 2; } else if (!strcmp(arg1, "site")) { cl.type = ACT_SITE; optind = 2; } else if (!strcmp(arg1, "client")) { cl.type = ACT_CLIENT; if (argc < 3) { print_usage(); exit(EXIT_FAILURE); } op = argv[2]; optind = 3; } else { cl.type = ACT_CLIENT; op = argv[1]; optind = 2; } switch (cl.type) { case ACT_ARBITRATOR: break; case ACT_SITE: break; case ACT_CLIENT: if (!strcmp(op, "list")) cl.op = OP_LIST; else if (!strcmp(op, "grant")) cl.op = OP_GRANT; else if (!strcmp(op, "revoke")) cl.op = OP_REVOKE; else { fprintf(stderr, "client operation \"%s\" is unknown\n", op); exit(EXIT_FAILURE); } break; } while (optind < argc) { optchar = getopt(argc, argv, OPTION_STRING); switch (optchar) { case 'D': cl.debug = 1; log_logfile_priority = LOG_DEBUG; log_syslog_priority = LOG_DEBUG; break; case 't': if (cl.op == OP_GRANT || cl.op == OP_REVOKE) strcpy(cl.ticket, optarg); else { print_usage(); exit(EXIT_FAILURE); } break; case 's': if (cl.op == OP_GRANT || cl.op == OP_REVOKE) strcpy(cl.site, optarg); else { print_usage(); exit(EXIT_FAILURE); } break; case 'f': if (cl.op == OP_GRANT) cl.force = 1; else { print_usage(); exit(EXIT_FAILURE); } break; case 'e': if (cl.op == OP_GRANT) cl.expiry = atoi(optarg); else { print_usage(); exit(EXIT_FAILURE); } break; case 'h': print_usage(); exit(EXIT_SUCCESS); break; case ':': case '?': fprintf(stderr, "Please use '-h' for usage.\n"); exit(EXIT_FAILURE); break; default: fprintf(stderr, "unknown option: %c\n", optchar); exit(EXIT_FAILURE); break; }; } return 0; } static void set_scheduler(void) { struct sched_param sched_param; struct rlimit rlimit; int rv; rlimit.rlim_cur = RLIM_INFINITY; rlimit.rlim_max = RLIM_INFINITY; setrlimit(RLIMIT_MEMLOCK, &rlimit); rv = mlockall(MCL_CURRENT | MCL_FUTURE); if (rv < 0) { log_error("mlockall failed"); } rv = sched_get_priority_max(SCHED_RR); if (rv != -1) { sched_param.sched_priority = rv; rv = sched_setscheduler(0, SCHED_RR, &sched_param); if (rv == -1) log_error("could not set SCHED_RR priority %d err %d", sched_param.sched_priority, errno); } else { log_error("could not get maximum scheduler priority err %d", errno); } } static void set_oom_adj(int val) { FILE *fp; fp = fopen("/proc/self/oom_adj", "w"); if (!fp) return; fprintf(fp, "%i", val); fclose(fp); } static int do_arbitrator(void) { int fd; int rv = -1; if (!cl.debug) { if (daemon(0, 0) < 0) { perror("daemon error"); exit(EXIT_FAILURE); } } setup_logging(); fd = lockfile(); if (fd < 0) return fd; log_info("BOOTH arbitrator daemon started"); set_scheduler(); set_oom_adj(-16); rv = loop(ARBITRATOR); if (rv < 0) goto fail; unlink_lockfile(fd); close_logging(); return 0; fail: return -1; } static int do_site(void) { int fd; int rv = -1; if (!cl.debug) { if (daemon(0, 0) < 0) { perror("daemon error"); exit(EXIT_FAILURE); } } setup_logging(); fd = lockfile(); if (fd < 0) return fd; log_info("BOOTH cluster site daemon started"); set_scheduler(); set_oom_adj(-16); rv = loop(SITE); if (rv < 0) goto fail; unlink_lockfile(fd); close_logging(); return 0; fail: return -1; } static int do_client(void) { int rv = -1; setup_logging(); switch (cl.op) { case OP_LIST: rv = do_list(); break; case OP_GRANT: rv = do_grant(); break; case OP_REVOKE: rv = do_revoke(); break; } close_logging(); return rv; } int main(int argc, char *argv[]) { int rv; memset(&cl, 0, sizeof(cl)); rv = read_arguments(argc, argv); if (rv < 0) goto out; switch (cl.type) { case ACT_ARBITRATOR: rv = do_arbitrator(); break; case ACT_SITE: rv = do_site(); break; case ACT_CLIENT: rv = do_client(); break; } out: return rv ? EXIT_FAILURE : EXIT_SUCCESS; } diff --git a/src/paxos_lease.c b/src/paxos_lease.c index b1f309f..204040a 100644 --- a/src/paxos_lease.c +++ b/src/paxos_lease.c @@ -1,443 +1,454 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include "paxos.h" #include "paxos_lease.h" #include "transport.h" #include "config.h" #include "timer.h" #include "list.h" #define PAXOS_LEASE_SPACE "paxoslease" #define PLEASE_VALUE_LEN 1024 struct paxos_lease_msghdr { int leased; }; struct paxos_lease_value { char name[PAXOS_NAME_LEN+1]; int owner; int expiry; }; struct lease_state { int round; struct paxos_lease_value *plv; unsigned long long expires; struct timerlist *timer; }; struct paxos_lease { char name[PAXOS_NAME_LEN+1]; pi_handle_t pih; struct lease_state proposer; struct lease_state acceptor; int owner; int expiry; int relet; int failover; + int release; unsigned long long expires; void (*end_lease) (pi_handle_t, int); struct timerlist *timer; struct list_head list; }; static LIST_HEAD(lease_head); static int myid = -1; static struct paxos_operations *px_op = NULL; const struct paxos_lease_operations *p_l_op; ps_handle_t ps_handle = 0; static void end_paxos_request(pi_handle_t handle, int round, int result) { struct paxos_lease *pl; int found = 0; list_for_each_entry(pl, &lease_head, list) { if (pl->pih == handle) { found = 1; break; } } if (!found) return; if (round != pl->proposer.round) return; if (pl->end_lease) pl->end_lease((pl_handle_t)pl, result); return; } static void lease_expires(unsigned long data) { struct paxos_lease *pl = (struct paxos_lease *)data; pl_handle_t plh = (pl_handle_t)pl; struct paxos_lease_result plr; if (pl->owner != myid) { pl->owner = -1; strcpy(plr.name, pl->name); plr.owner = -1; plr.expires = 0; p_l_op->notify(plh, &plr); if (pl->proposer.timer) del_timer(pl->proposer.timer); if (pl->acceptor.timer) del_timer(pl->acceptor.timer); if (pl->failover) paxos_lease_acquire(plh, 1, NULL); - } else if (pl->owner == myid && pl->relet) { + } else if (pl->owner == myid && pl->relet && !pl->release) { struct paxos_lease_value value; strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1); value.owner = myid; value.expiry = pl->expiry; paxos_propose(pl->pih, &value, pl->proposer.round); } else { pl->owner = -1; strcpy(plr.name, pl->name); plr.owner = -1; plr.expires = 0; p_l_op->notify(plh, &plr); if (pl->proposer.timer) del_timer(pl->proposer.timer); if (pl->acceptor.timer) del_timer(pl->acceptor.timer); } } int paxos_lease_acquire(pl_handle_t handle, int relet, void (*end_acquire) (pl_handle_t handle, int result)) { struct paxos_lease *pl = (struct paxos_lease *)handle; struct paxos_lease_value value; int round; strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1); value.owner = myid; value.expiry = pl->expiry; pl->relet = relet; pl->end_lease = end_acquire; + pl->release = 0; round = paxos_round_request(pl->pih, &value, end_paxos_request); if (round <= 0) return -1; pl->proposer.round = round; return 0; } +int paxos_lease_release(pl_handle_t handle) +{ + struct paxos_lease *pl = (struct paxos_lease *)handle; + + pl->release = 1; + + return 0; +} + static int lease_catchup(const void *name) { struct paxos_lease *pl; int found = 0; list_for_each_entry(pl, &lease_head, list) { if (!strcmp(pl->name, name)) { found = 1; break; } } if (!found) return -1; p_l_op->catchup(name, &pl->owner, &pl->expires); return 0; } static int lease_prepared(pi_handle_t handle __attribute__((unused)), void *header) { struct paxos_lease_msghdr *hdr = header; if (hdr->leased) return 0; else return 1; } static int handle_lease_request(pi_handle_t handle, void *header) { struct paxos_lease_msghdr *hdr; struct paxos_lease *pl; int found = 0; hdr = header; list_for_each_entry(pl, &lease_head, list) { if (pl->pih == handle) { found = 1; break; } } if (!found) return -1; if (pl->owner == -1) hdr->leased = 0; else hdr->leased = 1; return 0; } static int lease_propose(pi_handle_t handle, void *extra __attribute__((unused)), int round, void *value) { struct paxos_lease *pl; int found = 0; list_for_each_entry(pl, &lease_head, list) { if (pl->pih == handle) { found = 1; break; } } if (!found) return -1; if (round != pl->proposer.round) return -1; if (!pl->proposer.plv) { pl->proposer.plv = malloc(sizeof(struct paxos_lease_value)); if (!pl->proposer.plv) return -ENOMEM; } memcpy(pl->proposer.plv, value, sizeof(struct paxos_lease_value)); if (pl->relet) { pl->proposer.timer = add_timer(4 * pl->expiry / 5, (unsigned long)pl, lease_expires); pl->proposer.expires = current_time() + 4 * pl->expiry / 5; } else { pl->proposer.timer = add_timer(pl->expiry, (unsigned long)pl, lease_expires); pl->proposer.expires = current_time() + pl->expiry; } return 0; } static int lease_accepted(pi_handle_t handle, void *extra __attribute__((unused)), int round, void *value) { struct paxos_lease *pl; int found = 0; list_for_each_entry(pl, &lease_head, list) { if (pl->pih == handle) { found = 1; break; } } if (!found) return -1; pl->acceptor.round = round; if (!pl->acceptor.plv) { pl->acceptor.plv = malloc(sizeof(struct paxos_lease_value)); if (!pl->acceptor.plv) return -ENOMEM; } memcpy(pl->acceptor.plv, value, sizeof(struct paxos_lease_value)); if (pl->acceptor.timer) mod_timer(pl->acceptor.timer, pl->expiry); else pl->acceptor.timer = add_timer(pl->expiry, (unsigned long)pl, lease_expires); pl->acceptor.expires = current_time() + pl->expiry; return 0; } static int lease_commit(pi_handle_t handle, void *extra __attribute__((unused)), int round) { struct paxos_lease *pl; struct paxos_lease_result plr; int found = 0; list_for_each_entry(pl, &lease_head, list) { if (pl->pih == handle) { found = 1; break; } } if (!found) return -1; if (round != pl->proposer.round) return -1; pl->owner = pl->proposer.plv->owner; pl->expiry = pl->proposer.plv->expiry; strcpy(plr.name, pl->proposer.plv->name); plr.owner = pl->proposer.plv->owner; plr.expires = current_time() + pl->proposer.plv->expiry; p_l_op->notify((pl_handle_t)pl, &plr); return 0; } static int lease_learned(pi_handle_t handle, void *extra __attribute__((unused)), int round) { struct paxos_lease *pl; struct paxos_lease_result plr; int found = 0; list_for_each_entry(pl, &lease_head, list) { if (pl->pih == handle) { found = 1; break; } } if (!found) return -1; if (round != pl->acceptor.round) return -1; pl->owner = pl->acceptor.plv->owner; pl->expiry = pl->acceptor.plv->expiry; strcpy(plr.name, pl->acceptor.plv->name); plr.owner = pl->acceptor.plv->owner; plr.expires = current_time() + pl->acceptor.plv->expiry; p_l_op->notify((pl_handle_t)pl, &plr); return 0; } pl_handle_t paxos_lease_init(const void *name, unsigned int namelen, int expiry, int number, int failover, unsigned char *role, int *prio, const struct paxos_lease_operations *pl_op) { ps_handle_t psh; pi_handle_t pih; struct paxos_lease *lease; if (namelen > PAXOS_NAME_LEN) return -EINVAL; if (myid == -1) myid = pl_op->get_myid(); if (!ps_handle) { px_op = malloc(sizeof(struct paxos_operations)); if (!px_op) return -ENOMEM; memset(px_op, 0, sizeof(struct paxos_operations)); px_op->get_myid = pl_op->get_myid; px_op->send = pl_op->send; px_op->broadcast = pl_op->broadcast; px_op->catchup = lease_catchup; px_op->prepare = lease_prepared; px_op->promise = handle_lease_request; px_op->propose = lease_propose; px_op->accepted = lease_accepted; px_op->commit = lease_commit; px_op->learned = lease_learned; p_l_op = pl_op; psh = paxos_space_init(PAXOS_LEASE_SPACE, number, sizeof(struct paxos_lease_msghdr), PLEASE_VALUE_LEN, role, px_op); if (psh <= 0) { free(px_op); px_op = NULL; return psh; } ps_handle = psh; } lease = malloc(sizeof(struct paxos_lease)); if (!lease) return -ENOMEM; memset(lease, 0, sizeof(struct paxos_lease)); strncpy(lease->name, name, PAXOS_NAME_LEN + 1); lease->owner = -1; lease->expiry = expiry; lease->failover = failover; list_add_tail(&lease->list, &lease_head); pih = paxos_instance_init(ps_handle, name, prio); if (pih <= 0) { free(lease); return pih; } lease->pih = pih; return (pl_handle_t)lease; } int paxos_lease_on_receive(void *msg, int msglen) { return paxos_recvmsg(msg, msglen); } int paxos_lease_exit(pl_handle_t handle) { struct paxos_lease *pl = (struct paxos_lease *)handle; if (px_op) free(px_op); if (pl->proposer.plv) free(pl->proposer.plv); if (pl->proposer.timer) del_timer(pl->proposer.timer); if (pl->acceptor.plv) free(pl->acceptor.plv); if (pl->acceptor.timer) del_timer(pl->acceptor.timer); return 0; } diff --git a/src/ticket.c b/src/ticket.c index fdb7ad6..1195f01 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -1,346 +1,375 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include "ticket.h" #include "config.h" #include "pacemaker.h" #include "list.h" #include "log.h" #include "paxos_lease.h" #include "paxos.h" #define PAXOS_MAGIC 0xDB12 struct booth_msghdr { uint16_t magic; uint16_t checksum; uint32_t len; } __attribute__((packed)); struct ticket { char id[BOOTH_NAME_LEN+1]; pl_handle_t handle; int owner; int expiry; unsigned long long expires; struct list_head list; }; static LIST_HEAD(ticket_list); static unsigned char *role; int check_ticket(char *ticket) { int i; if (!booth_conf) return 0; for (i = 0; i < booth_conf->ticket_count; i++) { if (!strcmp(booth_conf->ticket[i].name, ticket)) return 1; } return 0; } int check_site(char *site, int *local) { int i; if (!booth_conf) return 0; for (i = 0; i < booth_conf->node_count; i++) { if (booth_conf->node[i].type == SITE && !strcmp(booth_conf->node[i].addr, site)) { *local = booth_conf->node[i].local; return 1; } } return 0; } static int * ticket_priority(int i) { int j; /* TODO: need more precise check */ for (j = 0; j < booth_conf->node_count; j++) { if (booth_conf->ticket[i].weight[j] == 0) return NULL; } return booth_conf->ticket[i].weight; } static int ticket_get_myid(void) { return booth_transport[booth_conf->proto].get_myid(); } static void end_acquire(pl_handle_t handle, int result) { struct ticket *tk; int found = 0; if (result == 0) { list_for_each_entry(tk, &ticket_list, list) { if (tk->handle == handle) { tk->owner = ticket_get_myid(); found = 1; break; } } if (!found) log_error("BUG: ticket handle %d does not exist", handle); log_info("ticket %s acquired", tk->id); log_info("ticket %s granted to local (id %d)", tk->id, ticket_get_myid()); } } static int ticket_send(unsigned long id, void *value, int len) { int i, rv = -1; struct booth_node *to = NULL; struct booth_msghdr *hdr; void *buf; for (i = 0; i < booth_conf->node_count; i++) { if (booth_conf->node[i].nodeid == id) to = &booth_conf->node[i]; } if (!to) return rv; buf = malloc(sizeof(struct booth_msghdr) + len); if (!buf) return -ENOMEM; memset(buf, 0, sizeof(struct booth_msghdr) + len); hdr = buf; hdr->magic = htons(PAXOS_MAGIC); hdr->len = htonl(sizeof(struct booth_msghdr) + len); memcpy((char *)buf + sizeof(struct booth_msghdr), value, len); rv = booth_transport[booth_conf->proto].send( (unsigned long)to, buf, sizeof(struct booth_msghdr) + len); free(buf); return rv; } static int ticket_broadcast(void *value, int len) { void *buf; struct booth_msghdr *hdr; int rv; buf = malloc(sizeof(struct booth_msghdr) + len); if (!buf) return -ENOMEM; memset(buf, 0, sizeof(struct booth_msghdr) + len); hdr = buf; hdr->magic = htons(PAXOS_MAGIC); hdr->len = htonl(sizeof(struct booth_msghdr) + len); memcpy((char *)buf + sizeof(struct booth_msghdr), value, len); rv = booth_transport[booth_conf->proto].broadcast( buf, sizeof(struct booth_msghdr) + len); free(buf); return rv; } static int ticket_read(const void *name, int *owner, unsigned long long *expires) { struct ticket *tk; int found = 0; list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, name)) { found = 1; break; } } if (!found) { log_error("BUG: ticket_read failed (ticket %s does not exist)", (char *)name); return -1; } pcmk_handler.load_ticket(tk->id, &tk->owner, &tk->expires); *owner = tk->owner; *expires = tk->expires; return 0; } static int ticket_write(pl_handle_t handle, struct paxos_lease_result *result) { struct ticket *tk; int found = 0; list_for_each_entry(tk, &ticket_list, list) { if (tk->handle == handle) { found = 1; break; } } if (!found) { log_error("BUG: ticket_write failed " "(ticket handle %d does not exist)", handle); return -1; } tk->owner = result->owner; tk->expires = result->expires; if (tk->owner == ticket_get_myid()) { pcmk_handler.store_ticket(tk->id, tk->owner, tk->expires); pcmk_handler.grant_ticket(tk->id); } else if (tk->owner == -1) { pcmk_handler.store_ticket(tk->id, tk->owner, tk->expires); pcmk_handler.revoke_ticket(tk->id); } else pcmk_handler.store_ticket(tk->id, tk->owner, tk->expires); return 0; } int ticket_recv(void *msg, int msglen) { struct booth_msghdr *hdr; char *data; hdr = msg; if (ntohs(hdr->magic) != PAXOS_MAGIC || ntohl(hdr->len) != msglen) { log_error("message received error"); return -1; } data = (char *)msg + sizeof(struct booth_msghdr); return paxos_lease_on_receive(data, msglen - sizeof(struct booth_msghdr)); } int grant_ticket(char *ticket, int force, int expiry) { struct ticket *tk; int found = 0; if (force) { pcmk_handler.store_ticket(ticket, ticket_get_myid(), -1); pcmk_handler.grant_ticket(ticket); return BOOTHC_RLT_SYNC_SUCC; } if (!expiry) expiry = DEFAULT_TICKET_EXPIRY; list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, ticket)) { found = 1; break; } } if (!found) { log_error("ticket %s does not exist", ticket); return BOOTHC_RLT_SYNC_FAIL; } if (tk->owner == ticket_get_myid()) return BOOTHC_RLT_SYNC_SUCC; else { paxos_lease_acquire(tk->handle, 1, end_acquire); return BOOTHC_RLT_ASYNC; } } +int revoke_ticket(char *ticket, int force) +{ + struct ticket *tk; + int found = 0; + + list_for_each_entry(tk, &ticket_list, list) { + if (!strcmp(tk->id, ticket)) { + found = 1; + break; + } + } + if (!found) { + log_error("ticket %s does not exist", ticket); + return BOOTHC_RLT_SYNC_FAIL; + } + + if (force) { + pcmk_handler.store_ticket(tk->id, -1, 0); + pcmk_handler.revoke_ticket(tk->id); + } + + if (tk->owner == -1) + return BOOTHC_RLT_SYNC_SUCC; + else { + paxos_lease_release(tk->handle); + return BOOTHC_RLT_ASYNC; + } +} + const struct paxos_lease_operations ticket_operations = { .get_myid = ticket_get_myid, .send = ticket_send, .broadcast = ticket_broadcast, .catchup = ticket_read, .notify = ticket_write, }; int setup_ticket(void) { struct ticket *tk, *tmp; int i, rv; pl_handle_t plh; role = malloc(booth_conf->node_count * sizeof(unsigned char)); if (!role) return -ENOMEM; memset(role, 0, booth_conf->node_count * sizeof(unsigned char)); for (i = 0; i < booth_conf->node_count; i++) { if (booth_conf->node[i].type == SITE) role[i] = PROPOSER | ACCEPTOR | LEARNER; else if (booth_conf->node[i].type == ARBITRATOR) role[i] = ACCEPTOR; } for (i = 0; i < booth_conf->ticket_count; i++) { tk = malloc(sizeof(struct ticket)); if (!tk) { rv = -ENOMEM; goto out; } memset(tk, 0, sizeof(struct ticket)); strcpy(tk->id, booth_conf->ticket[i].name); tk->owner = -1; tk->expiry = booth_conf->ticket[i].expiry; if (!tk->expiry) tk->expiry = DEFAULT_TICKET_EXPIRY; list_add_tail(&tk->list, &ticket_list); plh = paxos_lease_init(tk->id, BOOTH_NAME_LEN, tk->expiry, booth_conf->node_count, 1, role, ticket_priority(i), &ticket_operations); if (plh <= 0) { log_error("paxos lease initialization failed"); rv = plh; goto out; } tk->handle = plh; } return 0; out: list_for_each_entry_safe(tk, tmp, &ticket_list, list) { list_del(&tk->list); } free(role); return rv; } diff --git a/src/ticket.h b/src/ticket.h index 12d1042..c8e05f5 100644 --- a/src/ticket.h +++ b/src/ticket.h @@ -1,30 +1,31 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _TICKET_H #define _TICKET_H #define DEFAULT_TICKET_EXPIRY 600 int check_ticket(char *ticket); int check_site(char *site, int *local); int grant_ticket(char *ticket, int force, int expiry); +int revoke_ticket(char *ticket, int force); int ticket_recv(void *msg, int msglen); int setup_ticket(void); #endif /* _TICKET_H */