diff --git a/src/main.c b/src/main.c index 4a8ac29..a4ee0cd 100644 --- a/src/main.c +++ b/src/main.c @@ -1,1126 +1,1045 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "log.h" #include "booth.h" #include "config.h" #include "transport.h" #include "timer.h" #include "pacemaker.h" #include "ticket.h" #define RELEASE_VERSION "1.0" #define CLIENT_NALLOC 32 int log_logfile_priority = LOG_INFO; int log_syslog_priority = LOG_ERR; int log_stderr_priority = LOG_ERR; int daemonize = 0; static int client_maxi; static int client_size = 0; struct client *client = NULL; struct pollfd *pollfd = NULL; int poll_timeout = -1; typedef enum { ACT_ARBITRATOR = 1, ACT_SITE, ACT_CLIENT, } booth_role_t; typedef enum { OP_LIST = 1, OP_GRANT, OP_REVOKE, } operation_t; struct command_line { int type; /* ACT_ */ int op; /* OP_ */ char configfile[BOOTH_PATH_LEN]; char lockfile[BOOTH_PATH_LEN]; char site[BOOTH_NAME_LEN]; char ticket[BOOTH_NAME_LEN]; }; static struct command_line cl; int do_read(int fd, void *buf, size_t count) { int rv, off = 0; while (off < count) { rv = read(fd, (char *)buf + off, count - off); if (rv == 0) return -1; if (rv == -1 && errno == EINTR) continue; if (rv == -1) return -1; off += rv; } return 0; } int do_write(int fd, void *buf, size_t count) { int rv, off = 0; retry: rv = write(fd, (char *)buf + off, count); if (rv == -1 && errno == EINTR) goto retry; if (rv < 0) { log_error("write failed: %s (%d)", strerror(errno), errno); return rv; } if (rv != count) { count -= rv; off += rv; goto retry; } return 0; } static int do_connect(const char *sock_path) { struct sockaddr_un sun; socklen_t addrlen; int rv, fd; fd = socket(PF_UNIX, SOCK_STREAM, 0); if (fd < 0) { log_error("failed to create socket: %s (%d)", strerror(errno), errno); goto out; } memset(&sun, 0, sizeof(sun)); sun.sun_family = AF_UNIX; strcpy(&sun.sun_path[1], sock_path); addrlen = sizeof(sa_family_t) + strlen(sun.sun_path+1) + 1; rv = connect(fd, (struct sockaddr *) &sun, addrlen); if (rv < 0) { if (errno == ECONNREFUSED) log_error("Connection to boothd was refused; " "please ensure that you are on a " "machine which has boothd running."); else log_error("failed to connect: %s (%d)", strerror(errno), errno); close(fd); fd = rv; } out: return fd; } static void init_header(struct boothc_header *h, int cmd, int result, int data_len) { memset(h, 0, sizeof(struct boothc_header)); h->magic = BOOTHC_MAGIC; h->version = BOOTHC_VERSION; h->len = data_len; h->cmd = cmd; h->result = result; } static void client_alloc(void) { int i; if (!client) { client = malloc(CLIENT_NALLOC * sizeof(struct client)); pollfd = malloc(CLIENT_NALLOC * sizeof(struct pollfd)); } else { client = realloc(client, (client_size + CLIENT_NALLOC) * sizeof(struct client)); pollfd = realloc(pollfd, (client_size + CLIENT_NALLOC) * sizeof(struct pollfd)); if (!pollfd) log_error("can't alloc for pollfd"); } if (!client || !pollfd) log_error("can't alloc for client array"); for (i = client_size; i < client_size + CLIENT_NALLOC; i++) { client[i].workfn = NULL; client[i].deadfn = NULL; client[i].fd = -1; pollfd[i].fd = -1; pollfd[i].revents = 0; } client_size += CLIENT_NALLOC; } static void client_dead(int ci) { close(client[ci].fd); client[ci].workfn = NULL; client[ci].fd = -1; pollfd[ci].fd = -1; } int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci)) { int i; if (!client) client_alloc(); again: for (i = 0; i < client_size; i++) { if (client[i].fd == -1) { client[i].workfn = workfn; if (deadfn) client[i].deadfn = deadfn; else client[i].deadfn = client_dead; client[i].fd = fd; pollfd[i].fd = fd; pollfd[i].events = POLLIN; if (i > client_maxi) client_maxi = i; return i; } } client_alloc(); goto again; } static int setup_listener(const char *sock_path) { struct sockaddr_un addr; socklen_t addrlen; int rv, s; s = socket(AF_LOCAL, SOCK_STREAM, 0); if (s < 0) { log_error("socket error %d: %s (%d)", s, strerror(errno), errno); return s; } memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_LOCAL; strcpy(&addr.sun_path[1], sock_path); addrlen = sizeof(sa_family_t) + strlen(addr.sun_path+1) + 1; rv = bind(s, (struct sockaddr *) &addr, addrlen); if (rv < 0) { log_error("bind error %d: %s (%d)", rv, strerror(errno), errno); close(s); return rv; } rv = listen(s, 5); if (rv < 0) { log_error("listen error %d: %s (%d)", rv, strerror(errno), errno); close(s); return rv; } return s; } void process_connection(int ci) { struct boothc_header h; char *data = NULL; char *site, *ticket; int ticket_owner; int local, rv; void (*deadfn) (int ci); rv = do_read(client[ci].fd, &h, sizeof(h)); if (rv < 0) { if (errno == ECONNRESET) log_debug("client %d connection reset for fd %d", ci, client[ci].fd); deadfn = client[ci].deadfn; if(deadfn) { deadfn(ci); } return; } if (h.magic != BOOTHC_MAGIC) { log_error("connection %d magic error %x", ci, h.magic); return; } if (h.version != BOOTHC_VERSION) { log_error("connection %d version error %x", ci, h.version); return; } if (h.len) { data = malloc(h.len); if (!data) { log_error("process_connection no mem %u", h.len); return; } memset(data, 0, h.len); rv = do_read(client[ci].fd, data, h.len); if (rv < 0) { log_error("connection %d read data error %d", ci, rv); goto out; } } switch (h.cmd) { case BOOTHC_CMD_LIST: assert(!data); h.result = list_ticket(&data, &h.len); break; case BOOTHC_CMD_GRANT: h.len = 0; site = data; ticket = data + BOOTH_NAME_LEN; if (!check_ticket(ticket)) { h.result = BOOTHC_RLT_INVALID_ARG; goto reply; } if (get_ticket_info(ticket, &ticket_owner, NULL) == 0) { if (ticket_owner > -1) { log_error("client want to get an granted " "ticket %s", ticket); h.result = BOOTHC_RLT_OVERGRANT; goto reply; } } else { log_error("can not get ticket %s's info", ticket); h.result = BOOTHC_RLT_INVALID_ARG; goto reply; } if (!check_site(site, &local)) { h.result = BOOTHC_RLT_INVALID_ARG; goto reply; } if (local) h.result = grant_ticket(ticket); else h.result = BOOTHC_RLT_REMOTE_OP; break; case BOOTHC_CMD_REVOKE: h.len = 0; site = data; ticket = data + BOOTH_NAME_LEN; if (!check_ticket(ticket)) { h.result = BOOTHC_RLT_INVALID_ARG; goto reply; } if (!check_site(site, &local)) { h.result = BOOTHC_RLT_INVALID_ARG; goto reply; } if (local) h.result = revoke_ticket(ticket); else h.result = BOOTHC_RLT_REMOTE_OP; break; case BOOTHC_CMD_CATCHUP: h.result = catchup_ticket(&data, h.len); break; default: log_error("connection %d cmd %x unknown", ci, h.cmd); break; } reply: rv = do_write(client[ci].fd, &h, sizeof(h)); if (rv < 0) log_error("connection %d write error %d", ci, rv); if (h.len) { rv = do_write(client[ci].fd, data, h.len); if (rv < 0) log_error("connection %d write error %d", ci, rv); } out: free(data); } static void process_listener(int ci) { int fd, i; fd = accept(client[ci].fd, NULL, NULL); if (fd < 0) { log_error("process_listener: accept error for fd %d: %s (%d)", fd, strerror(errno), errno); return; } i = client_add(fd, process_connection, NULL); log_debug("add client connection %d fd %d", i, fd); } static int setup_config(int type) { int rv; rv = read_config(cl.configfile); if (rv < 0) goto out; rv = check_config(type); if (rv < 0) goto out; out: return rv; } static int setup_transport(void) { int rv; transport_layer_t proto = booth_conf->proto; rv = booth_transport[proto].init(ticket_recv); if (rv < 0) { log_error("failed to init booth_transport[%d]", proto); goto out; } rv = booth_transport[TCP].init(NULL); if (rv < 0) { log_error("failed to init booth_transport[TCP]"); goto out; } out: return rv; } static int setup_timer(void) { return timerlist_init(); } static int setup(int type) { int rv; rv = setup_config(type); if (rv < 0) goto fail; rv = setup_timer(); if (rv < 0) goto fail; rv = setup_transport(); if (rv < 0) goto fail; rv = setup_ticket(); if (rv < 0) goto fail; rv = setup_listener(BOOTHC_SOCK_PATH); if (rv < 0) goto fail; client_add(rv, process_listener, NULL); return 0; fail: return -1; } static int loop(void) { void (*workfn) (int ci); void (*deadfn) (int ci); int rv, i; while (1) { rv = poll(pollfd, client_maxi + 1, poll_timeout); if (rv == -1 && errno == EINTR) continue; if (rv < 0) { log_error("poll failed: %s (%d)", strerror(errno), errno); goto fail; } for (i = 0; i <= client_maxi; i++) { if (client[i].fd < 0) continue; if (pollfd[i].revents & POLLIN) { workfn = client[i].workfn; if (workfn) workfn(i); } if (pollfd[i].revents & (POLLERR | POLLHUP | POLLNVAL)) { deadfn = client[i].deadfn; if (deadfn) deadfn(i); } } process_timerlist(); } return 0; fail: return -1; } static int do_list(void) { struct boothc_header h, *rh; char *reply = NULL, *data; int data_len; int fd, rv; init_header(&h, BOOTHC_CMD_LIST, 0, 0); fd = do_connect(BOOTHC_SOCK_PATH); if (fd < 0) { rv = fd; goto out; } rv = do_write(fd, &h, sizeof(h)); if (rv < 0) goto out_close; reply = malloc(sizeof(struct boothc_header)); if (!reply) { rv = -ENOMEM; goto out_close; } rv = do_read(fd, reply, sizeof(struct boothc_header)); if (rv < 0) goto out_free; rh = (struct boothc_header *)reply; data_len = rh->len; reply = realloc(reply, sizeof(struct boothc_header) + data_len); if (!reply) { rv = -ENOMEM; goto out_free; } data = reply + sizeof(struct boothc_header); rv = do_read(fd, data, data_len); if (rv < 0) goto out_free; do_write(STDOUT_FILENO, data, data_len); rv = 0; out_free: free(reply); out_close: close(fd); out: return rv; } -static inline void load_bar(int x, int n, int r, int w) -{ - int i; - float ratio; - int c; - - /* Only update r times.*/ - if ( x % (n / r) != 0 ) return; - - /* Calculuate the ratio of complete-to-incomplete.*/ - ratio = x / (float)n; - c = ratio * w; - - /* Show the percentage complete.*/ - printf("%3d%% [", (int)(ratio * 100)); - - /* Show the load bar.*/ - for (i = 0; i < c; i++) - printf("="); - for (i = c; i < w; i++) - printf(" "); - - printf("]"); - printf("\r"); - fflush(stdout); -} - -static void counting_down(int total_time) -{ - struct winsize size; - int screen_width; - int i; - - ioctl(STDIN_FILENO, TIOCGWINSZ, (char*)&size); - screen_width = size.ws_col / (float)2; - - /* ignore signals */ - signal(SIGTERM, SIG_IGN); - signal(SIGINT, SIG_IGN); - signal(SIGHUP, SIG_IGN); - - i = 0; - while (i <= total_time) { - load_bar(i, total_time, total_time, screen_width); - sleep(1); - i++; - } - - log_info("\nCounting Down Over...\n"); -} static int do_command(cmd_request_t cmd) { char *buf; struct boothc_header *h, reply; int buflen; int fd, rv; - int expire_time; - int i; buflen = sizeof(struct boothc_header) + sizeof(cl.site) + sizeof(cl.ticket); buf = malloc(buflen); if (!buf) { rv = -ENOMEM; goto out; } h = (struct boothc_header *)buf; init_header(h, cmd, 0, sizeof(cl.site) + sizeof(cl.ticket)); strcpy(buf + sizeof(struct boothc_header), cl.site); strcpy(buf + sizeof(struct boothc_header) + sizeof(cl.site), cl.ticket); fd = do_connect(BOOTHC_SOCK_PATH); if (fd < 0) { rv = fd; goto out_free; } rv = do_write(fd, buf, buflen); if (rv < 0) goto out_close; rv = do_read(fd, &reply, sizeof(struct boothc_header)); if (rv < 0) goto out_close; if (reply.result == BOOTHC_RLT_INVALID_ARG) { log_info("invalid argument!"); rv = -1; goto out_close; } if (reply.result == BOOTHC_RLT_OVERGRANT) { log_info("You're granting a granted ticket" "If you wanted to migrate a ticket," "use revoke first, then use grant"); rv = -1; goto out_close; } if (reply.result == BOOTHC_RLT_REMOTE_OP) { struct booth_node to; int s; memset(&to, 0, sizeof(struct booth_node)); to.family = BOOTH_PROTO_FAMILY; strcpy(to.addr, cl.site); s = booth_transport[TCP].open(&to); if (s < 0) goto out_close; rv = booth_transport[TCP].send(s, buf, buflen); if (rv < 0) { booth_transport[TCP].close(s); goto out_close; } rv = booth_transport[TCP].recv(s, &reply, sizeof(struct boothc_header)); if (rv < 0) { booth_transport[TCP].close(s); goto out_close; } booth_transport[TCP].close(s); } if (reply.result == BOOTHC_RLT_ASYNC) { if (cmd == BOOTHC_CMD_GRANT) log_info("grant command sent, result will be returned " "asynchronously, you can get the result from " "the log files"); - else if (cmd == BOOTHC_CMD_REVOKE) { + else if (cmd == BOOTHC_CMD_REVOKE) log_info("revoke command sent, result will be returned " "asynchronously, you can get the result from " - "the log files after the ticket expiry time."); - i = 0; - /* FIXME: if we access the server then get the actual - * remaining time the waiting will be shorter, for now, - * client is just waiting the expiry time. - */ - read_config(cl.configfile); - while (i < booth_conf->ticket_count) { - if (!strncmp(booth_conf->ticket[i].name, cl.ticket, - BOOTH_NAME_LEN)) { - expire_time = booth_conf->ticket[i].expiry; - log_info("You have to wait %d seconds to " - "ensure all timer has expired!", - expire_time); - counting_down(expire_time); - rv = 0; - break; - } - i++; - /* no ticket found in conf file */ - if( i == booth_conf->ticket_count ) { - log_error("check your config file, " - "ticket %s not found", cl.ticket); - log_error("your booth's config file may " - "not be the same!"); - break; - rv = -1; - } - } - } + "the log files."); else - log_error("internal error when reading reply result!"); + log_error("internal error reading reply result!"); rv = 0; } else if (reply.result == BOOTHC_RLT_SYNC_SUCC) { if (cmd == BOOTHC_CMD_GRANT) log_info("grant succeeded!"); else if (cmd == BOOTHC_CMD_REVOKE) log_info("revoke succeeded!"); rv = 0; } else if (reply.result == BOOTHC_RLT_SYNC_FAIL) { if (cmd == BOOTHC_CMD_GRANT) log_info("grant failed!"); else if (cmd == BOOTHC_CMD_REVOKE) log_info("revoke failed!"); rv = 0; } else { log_error("internal error!"); rv = -1; } out_close: close(fd); out_free: free(buf); out: return rv; } static int do_grant(void) { return do_command(BOOTHC_CMD_GRANT); } static int do_revoke(void) { return do_command(BOOTHC_CMD_REVOKE); } static int lockfile(void) { char buf[16]; struct flock lock; int fd, rv; fd = open(cl.lockfile, O_CREAT|O_WRONLY, 0666); if (fd < 0) { log_error("lockfile open error %s: %s", cl.lockfile, strerror(errno)); return -1; } lock.l_type = F_WRLCK; lock.l_start = 0; lock.l_whence = SEEK_SET; lock.l_len = 0; rv = fcntl(fd, F_SETLK, &lock); if (rv < 0) { log_error("lockfile setlk error %s: %s", cl.lockfile, strerror(errno)); goto fail; } rv = ftruncate(fd, 0); if (rv < 0) { log_error("lockfile truncate error %s: %s", cl.lockfile, strerror(errno)); goto fail; } memset(buf, 0, sizeof(buf)); snprintf(buf, sizeof(buf), "%d\n", getpid()); rv = write(fd, buf, strlen(buf)); if (rv <= 0) { log_error("lockfile write error %s: %s", cl.lockfile, strerror(errno)); goto fail; } return fd; fail: close(fd); return -1; } static void unlink_lockfile(int fd) { unlink(cl.lockfile); close(fd); } static void print_usage(void) { printf("Usage:\n"); printf("booth [options]\n"); printf("\n"); printf("Types:\n"); printf(" arbitrator: daemon running on arbitrator\n"); printf(" site: daemon running on cluster site\n"); printf(" client: command running from client\n"); printf("\n"); printf("Operations:\n"); printf("Please note that operations are valid iff type is client!\n"); printf(" list: List all the tickets\n"); printf(" grant: Grant ticket T(-t T) to site S(-s S)\n"); printf(" revoke: Revoke ticket T(-t T) from site S(-s S)\n"); printf("\n"); printf("Options:\n"); printf(" -c FILE Specify config file [default " BOOTH_DEFAULT_CONF "]\n"); printf(" -l LOCKFILE Specify lock file [default " BOOTH_DEFAULT_LOCKFILE "]\n"); printf(" -D Enable debugging to stderr and don't fork\n"); printf(" -t ticket name\n"); printf(" -s site name\n"); printf(" -h Print this help, then exit\n"); } #define OPTION_STRING "c:Dl:t:s:h" static char *logging_entity = NULL; void safe_copy(char *dest, char *value, size_t buflen, const char *description) { if (strlen(value) >= buflen) { fprintf(stderr, "'%s' exceeds maximum %s length of %ld\n", value, description, (long)(buflen - 1)); exit(EXIT_FAILURE); } strncpy(dest, value, buflen - 1); } static int read_arguments(int argc, char **argv) { int optchar; char *arg1 = argv[1]; char *op = NULL; if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") || !strcmp(arg1, "-h")) { print_usage(); exit(EXIT_SUCCESS); } if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") || !strcmp(arg1, "-V")) { printf("%s %s (built %s %s)\n", argv[0], RELEASE_VERSION, __DATE__, __TIME__); exit(EXIT_SUCCESS); } if (!strcmp(arg1, "arbitrator")) { cl.type = ACT_ARBITRATOR; logging_entity = (char *) DAEMON_NAME "-arbitrator"; optind = 2; } else if (!strcmp(arg1, "site")) { cl.type = ACT_SITE; logging_entity = (char *) DAEMON_NAME "-site"; optind = 2; } else if (!strcmp(arg1, "client")) { cl.type = ACT_CLIENT; if (argc < 3) { print_usage(); exit(EXIT_FAILURE); } op = argv[2]; optind = 3; } else { cl.type = ACT_CLIENT; op = argv[1]; optind = 2; } switch (cl.type) { case ACT_ARBITRATOR: break; case ACT_SITE: break; case ACT_CLIENT: if (!strcmp(op, "list")) cl.op = OP_LIST; else if (!strcmp(op, "grant")) cl.op = OP_GRANT; else if (!strcmp(op, "revoke")) cl.op = OP_REVOKE; else { fprintf(stderr, "client operation \"%s\" is unknown\n", op); exit(EXIT_FAILURE); } break; } while (optind < argc) { optchar = getopt(argc, argv, OPTION_STRING); switch (optchar) { case 'c': safe_copy(cl.configfile, optarg, sizeof(cl.configfile), "config file"); break; case 'D': daemonize = 1; debug_level = 1; log_logfile_priority = LOG_DEBUG; log_syslog_priority = LOG_DEBUG; break; case 'l': safe_copy(cl.lockfile, optarg, sizeof(cl.lockfile), "lock file"); break; case 't': if (cl.op == OP_GRANT || cl.op == OP_REVOKE) { safe_copy(cl.ticket, optarg, sizeof(cl.ticket), "ticket name"); } else { print_usage(); exit(EXIT_FAILURE); } break; case 's': if (cl.op == OP_GRANT || cl.op == OP_REVOKE) { safe_copy(cl.site, optarg, sizeof(cl.ticket), "site name"); } else { print_usage(); exit(EXIT_FAILURE); } break; case 'h': print_usage(); exit(EXIT_SUCCESS); break; case ':': case '?': fprintf(stderr, "Please use '-h' for usage.\n"); exit(EXIT_FAILURE); break; default: fprintf(stderr, "unknown option: %s\n", argv[optind]); exit(EXIT_FAILURE); break; }; } return 0; } static void set_scheduler(void) { struct sched_param sched_param; struct rlimit rlimit; int rv; rlimit.rlim_cur = RLIM_INFINITY; rlimit.rlim_max = RLIM_INFINITY; setrlimit(RLIMIT_MEMLOCK, &rlimit); rv = mlockall(MCL_CURRENT | MCL_FUTURE); if (rv < 0) { log_error("mlockall failed"); } rv = sched_get_priority_max(SCHED_RR); if (rv != -1) { sched_param.sched_priority = rv; rv = sched_setscheduler(0, SCHED_RR, &sched_param); if (rv == -1) log_error("could not set SCHED_RR priority %d: %s (%d)", sched_param.sched_priority, strerror(errno), errno); } else { log_error("could not get maximum scheduler priority err %d", errno); } } static void set_oom_adj(int val) { FILE *fp; fp = fopen("/proc/self/oom_adj", "w"); if (!fp) return; fprintf(fp, "%i", val); fclose(fp); } static int do_server(int type) { int fd = -1; int rv = -1; rv = setup(type); if (rv < 0) goto out; if (!daemonize) { if (daemon(0, 0) < 0) { perror("daemon error"); exit(EXIT_FAILURE); } } /* The lock cannot be obtained before the call to daemon(), otherwise the lockfile would contain the pid of the parent, not the daemon. */ fd = lockfile(); if (fd < 0) return fd; if (type == ARBITRATOR) log_info("BOOTH arbitrator daemon started"); else if (type == SITE) log_info("BOOTH cluster site daemon started"); set_scheduler(); set_oom_adj(-16); rv = loop(); out: if (fd >= 0) unlink_lockfile(fd); return rv; } static int do_client(void) { int rv = -1; switch (cl.op) { case OP_LIST: rv = do_list(); break; case OP_GRANT: rv = do_grant(); break; case OP_REVOKE: rv = do_revoke(); break; } return rv; } int main(int argc, char *argv[]) { int rv; memset(&cl, 0, sizeof(cl)); strncpy(cl.configfile, BOOTH_DEFAULT_CONF, BOOTH_PATH_LEN - 1); strncpy(cl.lockfile, BOOTH_DEFAULT_LOCKFILE, BOOTH_PATH_LEN - 1); rv = read_arguments(argc, argv); if (rv < 0) goto out; if (cl.type == ACT_CLIENT) { cl_log_enable_stderr(TRUE); cl_log_set_facility(0); } else { cl_log_set_entity(logging_entity); cl_log_enable_stderr(debug_level ? TRUE : FALSE); cl_log_set_facility(HA_LOG_FACILITY); } cl_inherit_logging_environment(0); switch (cl.type) { case ACT_ARBITRATOR: rv = do_server(ARBITRATOR); break; case ACT_SITE: rv = do_server(SITE); break; case ACT_CLIENT: rv = do_client(); break; } out: return rv ? EXIT_FAILURE : EXIT_SUCCESS; } diff --git a/src/paxos.c b/src/paxos.c index 105c790..ec45296 100644 --- a/src/paxos.c +++ b/src/paxos.c @@ -1,870 +1,876 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include "list.h" #include "paxos.h" #include "log.h" typedef enum { INIT = 1, PREPARING, PROMISING, PROPOSING, ACCEPTING, RECOVERY, COMMITTED, } paxos_state_t; struct proposal { int ballot_number; char value[0]; }; struct learned { int ballot; int number; }; struct paxos_msghdr { paxos_state_t state; int from; char psname[PAXOS_NAME_LEN+1]; char piname[PAXOS_NAME_LEN+1]; int ballot_number; int reject; int proposer_id; unsigned int extralen; unsigned int valuelen; }; struct proposer { int state; int ballot; int open_number; int accepted_number; int proposed; struct proposal *proposal; }; struct acceptor { int state; int highest_promised; struct proposal *accepted_proposal; }; struct learner { int state; int learned_max; int learned_ballot; struct learned learned[0]; }; struct paxos_space; struct paxos_instance; struct proposer_operations { void (*prepare) (struct paxos_instance *, int *); void (*propose) (struct paxos_space *, struct paxos_instance *, void *, int); void (*commit) (struct paxos_space *, struct paxos_instance *, void *, int); }; struct acceptor_operations { void (*promise) (struct paxos_space *, struct paxos_instance *, void *, int); void (*accepted) (struct paxos_space *, struct paxos_instance *, void *, int); }; struct learner_operations { void (*response) (struct paxos_space *, struct paxos_instance *, void *, int); }; struct paxos_space { char name[PAXOS_NAME_LEN+1]; unsigned int number; unsigned int extralen; unsigned int valuelen; const unsigned char *role; const struct paxos_operations *p_op; const struct proposer_operations *r_op; const struct acceptor_operations *a_op; const struct learner_operations *l_op; struct list_head list; struct list_head pi_head; }; struct paxos_instance { char name[PAXOS_NAME_LEN+1]; int round; int *prio; struct proposer *proposer; struct acceptor *acceptor; struct learner *learner; void (*end) (pi_handle_t pih, int round, int result); struct list_head list; struct paxos_space *ps; }; static LIST_HEAD(ps_head); static int have_quorum(struct paxos_space *ps, int member) { int i, sum = 0; for (i = 0; i < ps->number; i++) { if (ps->role[i] & ACCEPTOR) sum++; } if (member * 2 > sum) return 1; else return 0; } static int next_ballot_number(struct paxos_instance *pi) { int ballot; int myid = pi->ps->p_op->get_myid(); if (pi->prio) ballot = pi->prio[myid]; else ballot = myid; while (ballot <= pi->round) ballot += pi->ps->number; return ballot; } static void proposer_prepare(struct paxos_instance *pi, int *round) { struct paxos_msghdr *hdr; - void *msg; + void *msg, *extra; int msglen = sizeof(struct paxos_msghdr) + pi->ps->extralen; int ballot; log_debug("preposer prepare ..."); msg = malloc(msglen); if (!msg) { log_error("no mem for msg"); *round = -ENOMEM; return; } memset(msg, 0, msglen); hdr = msg; + extra = (char *)msg + sizeof(struct paxos_msghdr); if (*round > pi->round) pi->round = *round; ballot = next_ballot_number(pi); pi->proposer->ballot = ballot; hdr->state = htonl(PREPARING); hdr->from = htonl(pi->ps->p_op->get_myid()); hdr->proposer_id = hdr->from; strcpy(hdr->psname, pi->ps->name); strcpy(hdr->piname, pi->name); hdr->ballot_number = htonl(ballot); hdr->extralen = htonl(pi->ps->extralen); + if (pi->ps->p_op->prepare && + pi->ps->p_op->prepare((pi_handle_t)pi, extra) < 0) + return; + if (pi->ps->p_op->broadcast) pi->ps->p_op->broadcast(msg, msglen); else { int i; for (i = 0; i < pi->ps->number; i++) { if (pi->ps->role[i] & ACCEPTOR) pi->ps->p_op->send(i, msg, msglen); } } free(msg); *round = ballot; } static void proposer_propose(struct paxos_space *ps, struct paxos_instance *pi, void *msg, int msglen) { struct paxos_msghdr *hdr; pi_handle_t pih = (pi_handle_t)pi; void *extra, *value, *message; int ballot; log_debug("proposer propose ..."); if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) { log_error("message length incorrect, " "msglen: %d, msghdr len: %lu, extralen: %u", msglen, (long)sizeof(struct paxos_msghdr), ps->extralen); return; } hdr = msg; ballot = ntohl(hdr->ballot_number); if (pi->proposer->ballot != ballot) { log_debug("not the same ballot, proposer ballot: %d, " "received ballot: %d", pi->proposer->ballot, ballot); return; } if (ntohl(hdr->reject)) { log_debug("proposal was rejected"); pi->round = ballot; pi->proposer->state = INIT; pi->end(pih, pi->round, -EAGAIN); return; } extra = (char *)msg + sizeof(struct paxos_msghdr); - if (ps->p_op->prepare) { - if (ps->p_op->prepare(pih, extra)) + if (ps->p_op->is_prepared) { + if (ps->p_op->is_prepared(pih, extra)) pi->proposer->open_number++; } else pi->proposer->open_number++; if (!have_quorum(ps, pi->proposer->open_number)) return; if (pi->proposer->proposed) return; pi->proposer->proposed = 1; value = pi->proposer->proposal->value; if (ps->p_op->propose) ps->p_op->propose(pih, extra, ballot, value); hdr->valuelen = htonl(ps->valuelen); message = malloc(msglen + ps->valuelen); if (!message) { log_error("no mem for value"); return; } memset(message, 0, msglen + ps->valuelen); memcpy(message, msg, msglen); memcpy((char *)message + msglen, value, ps->valuelen); pi->proposer->state = PROPOSING; hdr = message; hdr->from = htonl(ps->p_op->get_myid()); hdr->state = htonl(PROPOSING); if (ps->p_op->broadcast) ps->p_op->broadcast(message, msglen + ps->valuelen); else { int i; for (i = 0; i < ps->number; i++) { if (ps->role[i] & ACCEPTOR) ps->p_op->send(i, message, msglen + ps->valuelen); } } free(message); } static void proposer_commit(struct paxos_space *ps, struct paxos_instance *pi, void *msg, int msglen) { struct paxos_msghdr *hdr; pi_handle_t pih = (pi_handle_t)pi; void *extra; int ballot; log_debug("proposer commit ..."); if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) { log_error("message length incorrect, " "msglen: %d, msghdr len: %lu, extralen: %u", msglen, (long)sizeof(struct paxos_msghdr), ps->extralen); return; } extra = (char *)msg + sizeof(struct paxos_msghdr); hdr = msg; ballot = ntohl(hdr->ballot_number); if (pi->proposer->ballot != ballot) { log_debug("not the same ballot, proposer ballot: %d, " "received ballot: %d", pi->proposer->ballot, ballot); return; } pi->proposer->accepted_number++; if (!have_quorum(ps, pi->proposer->accepted_number)) return; if (pi->proposer->state == COMMITTED) return; pi->round = ballot; if (ps->p_op->commit) ps->p_op->commit(pih, extra, pi->round); pi->proposer->state = COMMITTED; if (pi->end) pi->end(pih, pi->round, 0); } static void acceptor_promise(struct paxos_space *ps, struct paxos_instance *pi, void *msg, int msglen) { struct paxos_msghdr *hdr; unsigned long to; pi_handle_t pih = (pi_handle_t)pi; void *extra; log_debug("acceptor promise ..."); if (pi->acceptor->state == RECOVERY) { log_debug("still in recovery"); return; } if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) { log_error("message length incorrect, " "msglen: %d, msghdr len: %lu, extralen: %u", msglen, (long)sizeof(struct paxos_msghdr), ps->extralen); return; } hdr = msg; extra = (char *)msg + sizeof(struct paxos_msghdr); if (ntohl(hdr->ballot_number) < pi->acceptor->highest_promised) { log_debug("ballot number: %d, highest promised: %d", ntohl(hdr->ballot_number), pi->acceptor->highest_promised); to = ntohl(hdr->from); hdr->from = htonl(ps->p_op->get_myid()); hdr->state = htonl(PROMISING); hdr->reject = htonl(1); memset(extra, 0, ps->extralen); ps->p_op->send(to, msg, msglen); return; } pi->acceptor->highest_promised = ntohl(hdr->ballot_number); if (ps->p_op->promise) ps->p_op->promise(pih, extra); pi->acceptor->state = PROMISING; to = ntohl(hdr->from); hdr->from = htonl(ps->p_op->get_myid()); hdr->state = htonl(PROMISING); ps->p_op->send(to, msg, msglen); } static void acceptor_accepted(struct paxos_space *ps, struct paxos_instance *pi, void *msg, int msglen) { struct paxos_msghdr *hdr; unsigned long to; pi_handle_t pih = (pi_handle_t)pi; void *extra, *value; int myid = ps->p_op->get_myid(); int ballot; log_debug("acceptor accepted ..."); if (pi->acceptor->state == RECOVERY) { log_debug("still in recovery"); return; } if (msglen != sizeof(struct paxos_msghdr) + ps->extralen + ps->valuelen) { log_error("message length incorrect, msglen: " "%d, msghdr len: %lu, extralen: %u, valuelen: %u", msglen, (long)sizeof(struct paxos_msghdr), ps->extralen, ps->valuelen); return; } hdr = msg; extra = (char *)msg + sizeof(struct paxos_msghdr); ballot = ntohl(hdr->ballot_number); if (ballot < pi->acceptor->highest_promised) { log_debug("ballot: %d, highest promised: %d", ballot, pi->acceptor->highest_promised); to = ntohl(hdr->from); hdr->from = htonl(myid); hdr->state = htonl(ACCEPTING); hdr->reject = htonl(1); ps->p_op->send(to, hdr, sizeof(struct paxos_msghdr)); return; } value = pi->acceptor->accepted_proposal->value; memcpy(value, (char *)msg + sizeof(struct paxos_msghdr) + ps->extralen, ps->valuelen); - if (ps->p_op->accepted) - ps->p_op->accepted(pih, extra, ballot, value); + if (ps->p_op->accepted + && ps->p_op->accepted(pih, extra, ballot, value) < 0) + return; pi->acceptor->state = ACCEPTING; to = ntohl(hdr->from); hdr->from = htonl(myid); hdr->state = htonl(ACCEPTING); if (ps->p_op->broadcast) ps->p_op->broadcast(msg, sizeof(struct paxos_msghdr) + ps->extralen); else { int i; for (i = 0; i < ps->number; i++) { if (ps->role[i] & LEARNER) ps->p_op->send(i, msg, sizeof(struct paxos_msghdr) + ps->extralen); } if (!(ps->role[to] & LEARNER)) ps->p_op->send(to, msg, sizeof(struct paxos_msghdr) + ps->extralen); } } static void learner_response(struct paxos_space *ps, struct paxos_instance *pi, void *msg, int msglen) { struct paxos_msghdr *hdr; pi_handle_t pih = (pi_handle_t)pi; void *extra; int i, unused = 0, found = 0; int ballot; log_debug("learner response ..."); if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) { log_error("message length incorrect, " "msglen: %d, msghdr len: %lu, extralen: %u", msglen, (long)sizeof(struct paxos_msghdr), ps->extralen); return; } hdr = msg; extra = (char *)msg + sizeof(struct paxos_msghdr); ballot = ntohl(hdr->ballot_number); for (i = 0; i < ps->number; i++) { if (!pi->learner->learned[i].ballot) { unused = i; break; } if (pi->learner->learned[i].ballot == ballot) { pi->learner->learned[i].number++; if (pi->learner->learned[i].number > pi->learner->learned_max) pi->learner->learned_max = pi->learner->learned[i].number; found = 1; break; } } if (!found) { pi->learner->learned[unused].ballot = ntohl(hdr->ballot_number); pi->learner->learned[unused].number = 1; } if (!have_quorum(ps, pi->learner->learned_max)) return; if (ps->p_op->learned) ps->p_op->learned(pih, extra, ballot); } const struct proposer_operations generic_proposer_operations = { .prepare = proposer_prepare, .propose = proposer_propose, .commit = proposer_commit, }; const struct acceptor_operations generic_acceptor_operations = { .promise = acceptor_promise, .accepted = acceptor_accepted, }; const struct learner_operations generic_learner_operations = { .response = learner_response, }; ps_handle_t paxos_space_init(const void *name, unsigned int number, unsigned int extralen, unsigned int valuelen, const unsigned char *role, const struct paxos_operations *p_op) { struct paxos_space *ps; list_for_each_entry(ps, &ps_head, list) { if (!strcmp(ps->name, name)) { log_info("paxos space (%s) has already been " "initialized", (char *)name); return -EEXIST; } } if (!number || !valuelen || !p_op || !p_op->get_myid || !p_op->send) { log_error("invalid agruments"); return -EINVAL; } ps = malloc(sizeof(struct paxos_space)); if (!ps) { log_error("no mem for paxos space"); return -ENOMEM; } memset(ps, 0, sizeof(struct paxos_space)); strncpy(ps->name, name, PAXOS_NAME_LEN + 1); ps->number = number; ps->extralen = extralen; ps->valuelen = valuelen; ps->role = role; ps->p_op = p_op; ps->r_op = &generic_proposer_operations; ps->a_op = &generic_acceptor_operations; ps->l_op = &generic_learner_operations; list_add_tail(&ps->list, &ps_head); INIT_LIST_HEAD(&ps->pi_head); return (ps_handle_t)ps; } pi_handle_t paxos_instance_init(ps_handle_t handle, const void *name, int *prio) { struct paxos_space *ps = (struct paxos_space *)handle; struct paxos_instance *pi; struct proposer *proposer = NULL; struct acceptor *acceptor = NULL; struct learner *learner = NULL; int myid, valuelen, rv; list_for_each_entry(pi, &ps->pi_head, list) { if (!strcmp(pi->name, name)) return (pi_handle_t)pi; } if (handle <= 0 || !ps->p_op || !ps->p_op->get_myid) { log_error("invalid agruments"); rv = -EINVAL; goto out; } myid = ps->p_op->get_myid(); valuelen = ps->valuelen; pi = malloc(sizeof(struct paxos_instance)); if (!pi) { log_error("no mem for paxos instance"); rv = -ENOMEM; goto out; } memset(pi, 0, sizeof(struct paxos_instance)); strncpy(pi->name, name, PAXOS_NAME_LEN + 1); if (prio) { pi->prio = malloc(ps->number * sizeof(int)); if (!pi->prio) { log_error("no mem for prio"); rv = -ENOMEM; goto out_pi; } memcpy(pi->prio, prio, ps->number * sizeof(int)); } if (ps->role[myid] & PROPOSER) { proposer = malloc(sizeof(struct proposer)); if (!proposer) { log_error("no mem for proposer"); rv = -ENOMEM; goto out_prio; } memset(proposer, 0, sizeof(struct proposer)); proposer->state = INIT; proposer->proposal = malloc(sizeof(struct proposal) + valuelen); if (!proposer->proposal) { log_error("no mem for proposal"); rv = -ENOMEM; goto out_proposer; } memset(proposer->proposal, 0, sizeof(struct proposal) + valuelen); pi->proposer = proposer; } if (ps->role[myid] & ACCEPTOR) { acceptor = malloc(sizeof(struct acceptor)); if (!acceptor) { log_error("no mem for acceptor"); rv = -ENOMEM; goto out_proposal; } memset(acceptor, 0, sizeof(struct acceptor)); acceptor->state = INIT; acceptor->accepted_proposal = malloc(sizeof(struct proposal) + valuelen); if (!acceptor->accepted_proposal) { log_error("no mem for accepted proposal"); rv = -ENOMEM; goto out_acceptor; } memset(acceptor->accepted_proposal, 0, sizeof(struct proposal) + valuelen); pi->acceptor = acceptor; if (ps->p_op->catchup) pi->acceptor->state = RECOVERY; else pi->acceptor->state = INIT; } if (ps->role[myid] & LEARNER) { learner = malloc(sizeof(struct learner) + ps->number * sizeof(struct learned)); if (!learner) { log_error("no mem for learner"); rv = -ENOMEM; goto out_accepted_proposal; } memset(learner, 0, sizeof(struct learner) + ps->number * sizeof(struct learned)); learner->state = INIT; pi->learner = learner; } pi->ps = ps; list_add_tail(&pi->list, &ps->pi_head); return (pi_handle_t)pi; out_accepted_proposal: if (ps->role[myid] & ACCEPTOR) free(acceptor->accepted_proposal); out_acceptor: if (ps->role[myid] & ACCEPTOR) free(acceptor); out_proposal: if (ps->role[myid] & PROPOSER) free(proposer->proposal); out_proposer: if (ps->role[myid] & PROPOSER) free(proposer); out_prio: if (pi->prio) free(pi->prio); out_pi: free(pi); out: return rv; } int paxos_round_request(pi_handle_t handle, void *value, int *round, void (*end_request) (pi_handle_t handle, int round, int result)) { struct paxos_instance *pi = (struct paxos_instance *)handle; int myid = pi->ps->p_op->get_myid(); int rv = *round; if (!(pi->ps->role[myid] & PROPOSER)) { log_debug("only proposer can do this"); return -EOPNOTSUPP; } pi->proposer->state = PREPARING; pi->proposer->open_number = 0; pi->proposer->accepted_number = 0; pi->proposer->proposed = 0; memcpy(pi->proposer->proposal->value, value, pi->ps->valuelen); pi->end = end_request; pi->ps->r_op->prepare(pi, &rv); return rv; } int paxos_recovery_status_get(pi_handle_t handle) { struct paxos_instance *pi = (struct paxos_instance *)handle; int myid = pi->ps->p_op->get_myid(); if (!(pi->ps->role[myid] & ACCEPTOR)) return -EOPNOTSUPP; if (pi->acceptor->state == RECOVERY) return 1; else return 0; } int paxos_recovery_status_set(pi_handle_t handle, int recovery) { struct paxos_instance *pi = (struct paxos_instance *)handle; int myid = pi->ps->p_op->get_myid(); if (!(pi->ps->role[myid] & ACCEPTOR)) return -EOPNOTSUPP; if (recovery) pi->acceptor->state = RECOVERY; else pi->acceptor->state = INIT; return 0; } int paxos_propose(pi_handle_t handle, void *value, int round) { struct paxos_instance *pi = (struct paxos_instance *)handle; struct paxos_msghdr *hdr; void *extra, *msg; int len = sizeof(struct paxos_msghdr) + pi->ps->extralen + pi->ps->valuelen; if (!pi->proposer->ballot) pi->proposer->ballot = round; if (round != pi->proposer->ballot) { log_debug("round: %d, proposer ballot: %d", round, pi->proposer->ballot); return -EINVAL; } msg = malloc(len); if (!msg) { log_error("no mem for msg"); return -ENOMEM; } pi->proposer->state = PROPOSING; strcpy(pi->proposer->proposal->value, value); pi->proposer->accepted_number = 0; pi->round = round; memset(msg, 0, len); hdr = msg; hdr->state = htonl(PROPOSING); hdr->from = htonl(pi->ps->p_op->get_myid()); hdr->proposer_id = hdr->from; strcpy(hdr->psname, pi->ps->name); strcpy(hdr->piname, pi->name); hdr->ballot_number = htonl(pi->round); hdr->extralen = htonl(pi->ps->extralen); extra = (char *)msg + sizeof(struct paxos_msghdr); memcpy((char *)msg + sizeof(struct paxos_msghdr) + pi->ps->extralen, value, pi->ps->valuelen); if (pi->ps->p_op->propose) pi->ps->p_op->propose(handle, extra, round, value); if (pi->ps->p_op->broadcast) pi->ps->p_op->broadcast(msg, len); else { int i; for (i = 0; i < pi->ps->number; i++) { if (pi->ps->role[i] & ACCEPTOR) pi->ps->p_op->send(i, msg, len); } } free(msg); return 0; } int paxos_catchup(pi_handle_t handle) { struct paxos_instance *pi = (struct paxos_instance *)handle; return pi->ps->p_op->catchup(handle); } int paxos_recvmsg(void *msg, int msglen) { struct paxos_msghdr *hdr = msg; struct paxos_space *ps; struct paxos_instance *pi; int found = 0; int myid; list_for_each_entry(ps, &ps_head, list) { if (!strcmp(ps->name, hdr->psname)) { found = 1; break; } } if (!found) { log_error("could not find the received ps name (%s) " "in registered list", hdr->psname); return -EINVAL; } myid = ps->p_op->get_myid(); found = 0; list_for_each_entry(pi, &ps->pi_head, list) { if (!strcmp(pi->name, hdr->piname)) { found = 1; break; } } if (!found) paxos_instance_init((ps_handle_t)ps, hdr->piname, NULL); switch (ntohl(hdr->state)) { case PREPARING: if (ps->role[myid] & ACCEPTOR) ps->a_op->promise(ps, pi, msg, msglen); break; case PROMISING: ps->r_op->propose(ps, pi, msg, msglen); break; case PROPOSING: if (ps->role[myid] & ACCEPTOR) ps->a_op->accepted(ps, pi, msg, msglen); break; case ACCEPTING: if (ntohl(hdr->proposer_id) == myid) ps->r_op->commit(ps, pi, msg, msglen); else if (ps->role[myid] & LEARNER) ps->l_op->response(ps, pi, msg, msglen); break; default: log_debug("invalid message type: %d", ntohl(hdr->state)); break; }; return 0; } diff --git a/src/paxos.h b/src/paxos.h index 4a8ba79..cff8c66 100644 --- a/src/paxos.h +++ b/src/paxos.h @@ -1,82 +1,83 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _PAXOS_H #define _PAXOS_H #define PAXOS_NAME_LEN 63 #define PROPOSER 0x4 #define ACCEPTOR 0x2 #define LEARNER 0x1 typedef long ps_handle_t; typedef long pi_handle_t; struct paxos_operations { int (*get_myid) (void); int (*send) (unsigned long id, void *value, int len); int (*broadcast) (void *value, int len); int (*catchup) (pi_handle_t handle); int (*prepare) (pi_handle_t handle, void *extra); int (*promise) (pi_handle_t handle, void *extra); + int (*is_prepared) (pi_handle_t handle, void *extra); int (*propose) (pi_handle_t handle, void *extra, int round, void *value); int (*accepted) (pi_handle_t handle, void *extra, int round, void *value); int (*commit) (pi_handle_t handle, void *extra, int round); int (*learned) (pi_handle_t handle, void *extra, int round); }; int paxos_recvmsg(void *msg, int msglen); ps_handle_t paxos_space_init(const void *name, unsigned int number, unsigned int extralen, unsigned int valuelen, const unsigned char *role, const struct paxos_operations *p_op); pi_handle_t paxos_instance_init(ps_handle_t handle, const void *name, int *prio); int paxos_round_request(pi_handle_t handle, void *value, int *round, void (*end_request) (pi_handle_t handle, int round, int result)); int paxos_round_discard(pi_handle_t handle, int round); int paxos_leader_get(pi_handle_t handle, int *round); int paxos_recovery_status_get(pi_handle_t handle); int paxos_recovery_status_set(pi_handle_t handle, int recovery); int paxos_catchup(pi_handle_t handle); int paxos_propose(pi_handle_t handle, void *value, int round); int paxos_instance_exit(pi_handle_t handle); int paxos_space_exit(ps_handle_t handle); #endif /* _PAXOS_H */ diff --git a/src/paxos_lease.c b/src/paxos_lease.c index e89aefc..409129b 100644 --- a/src/paxos_lease.c +++ b/src/paxos_lease.c @@ -1,628 +1,916 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include +#include +#include #include "paxos.h" #include "paxos_lease.h" #include "transport.h" #include "config.h" #include "timer.h" #include "list.h" #include "log.h" #define PAXOS_LEASE_SPACE "paxoslease" #define PLEASE_VALUE_LEN 1024 +#define OP_START_LEASE 0 +#define OP_STOP_LEASE 1 + +#define LEASE_STARTED 0 +#define LEASE_STOPPED 1 + struct paxos_lease_msghdr { + int op; + int clear; int leased; }; struct paxos_lease_value { char name[PAXOS_NAME_LEN+1]; int owner; int expiry; - int release; +}; + +struct lease_action { + int op; + int clear; }; struct lease_state { int round; struct paxos_lease_value *plv; unsigned long long expires; struct timerlist *timer1; struct timerlist *timer2; }; struct paxos_lease { char name[PAXOS_NAME_LEN+1]; pi_handle_t pih; + struct lease_action action; struct lease_state proposer; struct lease_state acceptor; int owner; int expiry; int renew; int failover; int release; unsigned long long expires; void (*end_lease) (pi_handle_t, int); struct timerlist *timer; struct list_head list; }; static LIST_HEAD(lease_head); static int myid = -1; static struct paxos_operations *px_op = NULL; const struct paxos_lease_operations *p_l_op; ps_handle_t ps_handle = 0; -static void end_paxos_request(pi_handle_t handle, int round, int result) +static int find_paxos_lease(pi_handle_t handle, struct paxos_lease **pl) { - struct paxos_lease *pl; + struct paxos_lease *lpl; int found = 0; - list_for_each_entry(pl, &lease_head, list) { - if (pl->pih == handle) { + list_for_each_entry(lpl, &lease_head, list) { + if (lpl->pih == handle) { found = 1; break; } } - if (!found) { + + if (!found) log_error("cound not found the handle for paxos lease: %ld", handle); + else + *pl = lpl; + + return found; +} + +static void end_paxos_request(pi_handle_t handle, int round, int result) +{ + struct paxos_lease *pl; + + if (!find_paxos_lease(handle, &pl)) return; - } if (round != pl->proposer.round) { log_error("current paxos round is not the proposer round, " "current round: %d, proposer round: %d", round, pl->proposer.round); return; } if (pl->end_lease) pl->end_lease((pl_handle_t)pl, result); return; } static void renew_expires(unsigned long data) { struct paxos_lease *pl = (struct paxos_lease *)data; struct paxos_lease_value value; log_debug("renew expires ..."); memset(&value, 0, sizeof(struct paxos_lease_value)); strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1); value.owner = myid; value.expiry = pl->expiry; - if (pl->release) - value.release = 1; paxos_propose(pl->pih, &value, pl->proposer.round); } static void lease_expires(unsigned long data) { struct paxos_lease *pl = (struct paxos_lease *)data; pl_handle_t plh = (pl_handle_t)pl; struct paxos_lease_result plr; log_debug("lease expires ..."); pl->owner = -1; strcpy(plr.name, pl->name); plr.owner = -1; plr.expires = 0; plr.ballot = pl->acceptor.round; p_l_op->notify(plh, &plr); if (pl->proposer.timer1) del_timer(&pl->proposer.timer1); if (pl->proposer.timer2) del_timer(&pl->proposer.timer2); if (pl->acceptor.timer1) del_timer(&pl->acceptor.timer1); if (pl->acceptor.timer2) del_timer(&pl->acceptor.timer2); if (pl->failover) - paxos_lease_acquire(plh, 1, NULL); + paxos_lease_acquire(plh, NOT_CLEAR_RELEASE, 1, NULL); } static void lease_retry(unsigned long data) { struct paxos_lease *pl = (struct paxos_lease *)data; struct paxos_lease_value value; int round; log_debug("lease_retry ..."); if (pl->proposer.timer2) del_timer(&pl->proposer.timer2); if (pl->owner == myid) { log_debug("already got the lease, no need to retry"); return; } memset(&value, 0, sizeof(struct paxos_lease_value)); strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1); value.owner = myid; value.expiry = pl->expiry; + pl->action.op = OP_START_LEASE; + /** + * We don't know whether the lease_retry after ticket grant + * is manual or not, so set clear as NOT_CLEAR_RELEASE is + * the only safe choice. + **/ + pl->action.clear = NOT_CLEAR_RELEASE; round = paxos_round_request(pl->pih, &value, &pl->acceptor.round, end_paxos_request); if (round > 0) pl->proposer.round = round; } int paxos_lease_acquire(pl_handle_t handle, + int clear, int renew, void (*end_acquire) (pl_handle_t handle, int result)) { struct paxos_lease *pl = (struct paxos_lease *)handle; struct paxos_lease_value value; int round; memset(&value, 0, sizeof(struct paxos_lease_value)); strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1); value.owner = myid; value.expiry = pl->expiry; pl->renew = renew; pl->end_lease = end_acquire; - pl->release = 0; + pl->action.op = OP_START_LEASE; + pl->action.clear = clear; round = paxos_round_request(pl->pih, &value, &pl->acceptor.round, end_paxos_request); pl->proposer.timer2 = add_timer(1 * pl->expiry / 10, (unsigned long)pl, lease_retry); if (round <= 0) return -1; else { pl->proposer.round = round; return 0; } } -int paxos_lease_release(pl_handle_t handle) +int paxos_lease_release(pl_handle_t handle, + void (*end_release) (pl_handle_t handle, int result)) { struct paxos_lease *pl = (struct paxos_lease *)handle; + struct paxos_lease_value value; + int round; + + log_debug("enter paxos_lease_release"); + memset(&value, 0, sizeof(struct paxos_lease_value)); + strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1); + pl->end_lease = end_release; - pl->release = 1; + pl->action.op = OP_STOP_LEASE; + round = paxos_round_request(pl->pih, &value, + &pl->acceptor.round, + end_paxos_request); + if (round > 0) + pl->proposer.round = round; - return 0; + log_debug("exit paxos_lease_release"); + return (round > 0)? 0: -1; } static int lease_catchup(pi_handle_t handle) { struct paxos_lease *pl; struct paxos_lease_result plr; - int found = 0; - list_for_each_entry(pl, &lease_head, list) { - if (pl->pih == handle) { - found = 1; - break; - } - } - if (!found) { - log_error("could not find the lease handle: %ld", handle); + if (!find_paxos_lease(handle, &pl)) return -1; - } p_l_op->catchup(pl->name, &pl->owner, &pl->proposer.round, &pl->expires); log_debug("catchup result: name: %s, owner: %d, ballot: %d, expires: %llu", (char *)pl->name, pl->owner, pl->proposer.round, pl->expires); - if (pl->owner == -1) + /** + * 1. If no site hold the ticket, the relet will be set LEASE_STOPPED. + * Grant commond will set the relet to LEASE_STARTED first, so we don't + * need worry about it. + * 2. If someone hold the ticket, the relet will be set LEASE_STARTED. + * Because we must make sure that the site can renew, and relet also + * must be set to LEASE_STARTED. + **/ + if (-1 == pl->owner) { + pl->release = LEASE_STOPPED; return 0; + } else + pl->release = LEASE_STARTED; if (current_time() > pl->expires) { plr.owner = pl->owner = -1; plr.expires = pl->expires = 0; strcpy(plr.name, pl->name); p_l_op->notify((pl_handle_t)pl, &plr); return 0; } if (pl->owner == myid) { pl->acceptor.timer1 = add_timer(pl->expires - current_time(), (unsigned long)pl, lease_expires); if (current_time() < pl->expires - 1 * pl->expiry / 5) pl->proposer.timer1 = add_timer(pl->expires - 1 * pl->expiry / 5 - current_time(), (unsigned long)pl, renew_expires); } else pl->acceptor.timer1 = add_timer(pl->expires - current_time(), (unsigned long)pl, lease_expires); plr.owner = pl->owner; plr.expires = pl->expires; plr.ballot = pl->proposer.round; strcpy(plr.name, pl->name); p_l_op->notify((pl_handle_t)pl, &plr); return 0; } -static int lease_prepared(pi_handle_t handle __attribute__((unused)), - void *header) +static int lease_prepare(pi_handle_t handle, void *header) +{ + struct paxos_lease_msghdr *msghdr = header; + struct paxos_lease *pl; + + log_debug("enter lease_prepare"); + if (!find_paxos_lease(handle, &pl)) + return -1; + + msghdr->op = htonl(pl->action.op); + msghdr->clear = htonl(pl->action.clear); + + /** + * Action of paxos_lease is only used to pass args, + * so clear it now + **/ + memset(&pl->action, 0, sizeof(struct lease_action)); + log_debug("exit lease_prepare"); + return 0; +} + +static inline int start_lease_is_prepared(pi_handle_t handle __attribute__((unused)), + void *header) { struct paxos_lease_msghdr *hdr = header; + log_debug("enter start_lease_is_prepared"); if (hdr->leased) { log_debug("already leased"); return 0; } else { log_debug("not leased"); return 1; } } -static int handle_lease_request(pi_handle_t handle, void *header) +static inline int stop_lease_is_prepared(pi_handle_t handle __attribute__((unused)), + void *header __attribute__((unused))) { - struct paxos_lease_msghdr *hdr; - struct paxos_lease *pl; - int found = 0; + log_debug("enter stop_lease_is_prepared"); + return 1; +} - hdr = header; +static int lease_is_prepared(pi_handle_t handle, void *header) +{ + struct paxos_lease_msghdr *hdr = header; + int ret = 0; + int op = ntohl(hdr->op); + + log_debug("enter lease_is_prepared"); + assert(OP_START_LEASE == op || OP_STOP_LEASE == op); + switch (op) { + case OP_START_LEASE: + ret = start_lease_is_prepared(handle, header); + break; + case OP_STOP_LEASE: + ret = stop_lease_is_prepared(handle, header); + break; + } + + log_debug("exit lease_is_prepared"); + return ret; +} - list_for_each_entry(pl, &lease_head, list) { - if (pl->pih == handle) { - found = 1; - break; - } - } - if (!found) { - log_error("could not find the lease handle: %ld", handle); +static int start_lease_promise(pi_handle_t handle, void *header) +{ + struct paxos_lease_msghdr *hdr = header; + struct paxos_lease *pl; + int clear = ntohl(hdr->clear); + + log_debug("enter start_lease_promise"); + if (!find_paxos_lease(handle, &pl)) return -1; - } - if (pl->owner == -1) { + if (NOT_CLEAR_RELEASE == clear && LEASE_STOPPED == pl->release) { + log_debug("could not be leased"); + hdr->leased = 1; + } else if (-1 == pl->owner) { log_debug("has not been leased"); hdr->leased = 0; } else { log_debug("has been leased"); hdr->leased = 1; } + log_debug("exit start_lease_promise"); return 0; } -static int lease_propose(pi_handle_t handle, - void *extra __attribute__((unused)), - int round, void *value) +static int stop_lease_promise(pi_handle_t handle, + void *header __attribute__((unused))) { struct paxos_lease *pl; - int found = 0; - list_for_each_entry(pl, &lease_head, list) { - if (pl->pih == handle) { - found = 1; - break; - } - } - if (!found) { - log_error("could not find the lease handle: %ld", handle); + log_debug("enter stop_lease_promise"); + if (!find_paxos_lease(handle, &pl)) + return -1; + + log_debug("exit stop_lease_promise"); + return 0; +} + +static int lease_promise(pi_handle_t handle, void *header) +{ + struct paxos_lease_msghdr *hdr = header; + int ret = 0; + int op = ntohl(hdr->op); + + log_debug("enter lease_promise"); + assert(OP_START_LEASE == op || OP_STOP_LEASE == op); + switch (op) { + case OP_START_LEASE: + ret = start_lease_promise(handle, header); + break; + case OP_STOP_LEASE: + ret = stop_lease_promise(handle, header); + break; + } + + log_debug("exit lease_promise"); + return ret; +} + +static int start_lease_propose(pi_handle_t handle, void *extra, + int round, void *value) +{ + struct paxos_lease *pl; + + log_debug("enter start_lease_propose"); + if (!find_paxos_lease(handle, &pl)) return -1; - } if (round != pl->proposer.round) { log_error("current round is not the proposer round, " "current round: %d, proposer round: %d", round, pl->proposer.round); return -1; } if (!pl->proposer.plv) { pl->proposer.plv = malloc(sizeof(struct paxos_lease_value)); if (!pl->proposer.plv) { log_error("could not alloc mem for propsoer plv"); return -ENOMEM; } } memcpy(pl->proposer.plv, value, sizeof(struct paxos_lease_value)); if (pl->proposer.timer1) del_timer(&pl->proposer.timer1); if (pl->renew) { pl->proposer.timer1 = add_timer(4 * pl->expiry / 5, (unsigned long)pl, renew_expires); pl->proposer.expires = current_time() + 4 * pl->expiry / 5; } else { pl->proposer.timer1 = add_timer(pl->expiry, (unsigned long)pl, lease_expires); pl->proposer.expires = current_time() + pl->expiry; } + log_debug("exit start_lease_propose"); return 0; } -static int lease_accepted(pi_handle_t handle, - void *extra __attribute__((unused)), - int round, void *value) +static int stop_lease_propose(pi_handle_t handle, + void *extra __attribute__((unused)), + int round, + void *value) { struct paxos_lease *pl; - int found = 0; - list_for_each_entry(pl, &lease_head, list) { - if (pl->pih == handle) { - found = 1; - break; + log_debug("enter stop_lease_propose"); + if (!find_paxos_lease(handle, &pl)) + return -1; + + if (round != pl->proposer.round) { + log_error("current round is not the proposer round, " + "current round: %d, proposer round: %d", + round, pl->proposer.round); + return -1; + } + + if (!pl->proposer.plv) { + pl->proposer.plv = malloc(sizeof(struct paxos_lease_value)); + if (!pl->proposer.plv) { + log_error("could not alloc mem for propsoer plv"); + return -ENOMEM; } } - if (!found) { - log_error("could not find the lease handle: %ld", handle); + memcpy(pl->proposer.plv, value, sizeof(struct paxos_lease_value)); + + log_debug("exit stop_lease_propose"); + return 0; +} + +static int lease_propose(pi_handle_t handle, void *extra, + int round, void *value) +{ + struct paxos_lease_msghdr *hdr = extra; + int ret = 0; + int op = ntohl(hdr->op); + + log_debug("enter lease_propose"); + assert(OP_START_LEASE == op || OP_STOP_LEASE == op); + switch (op) { + case OP_START_LEASE: + ret = start_lease_propose(handle, extra, round, value); + break; + case OP_STOP_LEASE: + ret = stop_lease_propose(handle, extra, round, value); + break; + } + + log_debug("exit lease_propose"); + return ret; +} + +static int start_lease_accepted(pi_handle_t handle, void *extra, + int round, void *value) +{ + struct paxos_lease_msghdr *hdr = extra; + struct paxos_lease *pl; + + log_debug("enter start_lease_accepted"); + if (!find_paxos_lease(handle, &pl)) return -1; - } pl->acceptor.round = round; + + if (NOT_CLEAR_RELEASE == hdr->clear && LEASE_STOPPED == pl->release) { + log_debug("could not be leased"); + return -1; + } + if (!pl->acceptor.plv) { pl->acceptor.plv = malloc(sizeof(struct paxos_lease_value)); if (!pl->acceptor.plv) { log_error("could not alloc mem for acceptor plv"); return -ENOMEM; } } memcpy(pl->acceptor.plv, value, sizeof(struct paxos_lease_value)); if (pl->acceptor.timer1 && pl->acceptor.timer2 != pl->acceptor.timer1) del_timer(&pl->acceptor.timer1); pl->acceptor.timer1 = add_timer(pl->expiry, (unsigned long)pl, lease_expires); pl->acceptor.expires = current_time() + pl->expiry; + log_debug("exit start_lease_accepted"); return 0; } -static int lease_commit(pi_handle_t handle, - void *extra __attribute__((unused)), - int round) +static int stop_lease_accepted(pi_handle_t handle, + void *extra __attribute__((unused)), + int round, void *value) { struct paxos_lease *pl; - struct paxos_lease_result plr; - int found = 0; - list_for_each_entry(pl, &lease_head, list) { - if (pl->pih == handle) { - found = 1; - break; + log_debug("enter stop_lease_accepted"); + if (!find_paxos_lease(handle, &pl)) + return -1; + + pl->acceptor.round = round; + if (!pl->acceptor.plv) { + pl->acceptor.plv = malloc(sizeof(struct paxos_lease_value)); + if (!pl->acceptor.plv) { + log_error("could not alloc mem for acceptor plv"); + return -ENOMEM; } } - if (!found) { - log_error("could not find the lease handle: %ld", handle); + memcpy(pl->acceptor.plv, value, sizeof(struct paxos_lease_value)); + log_debug("exit stop_lease_accepted"); + return 0; +} + +static int lease_accepted(pi_handle_t handle, void *extra, + int round, void *value) +{ + struct paxos_lease_msghdr *hdr = extra; + int ret = 0; + int op = ntohl(hdr->op); + + log_debug("enter lease_accepted"); + assert(OP_START_LEASE == op || OP_STOP_LEASE == op); + switch (op) { + case OP_START_LEASE: + ret = start_lease_accepted(handle, extra, round, value); + break; + case OP_STOP_LEASE: + ret = stop_lease_accepted(handle, extra, round, value); + break; + } + + log_debug("exit lease_accepted"); + return ret; +} + +static int start_lease_commit(pi_handle_t handle, void *extra, int round) +{ + struct paxos_lease *pl; + struct paxos_lease_result plr; + + log_debug("enter start_lease_commit"); + if (!find_paxos_lease(handle, &pl)) return -1; - } if (round != pl->proposer.round) { log_error("current round is not the proposer round, " "current round: %d, proposer round: %d", round, pl->proposer.round); return -1; } + pl->release = LEASE_STARTED; pl->owner = pl->proposer.plv->owner; pl->expiry = pl->proposer.plv->expiry; - pl->release = pl->proposer.plv->release; if (pl->acceptor.timer2 != pl->acceptor.timer1) { if (pl->acceptor.timer2) del_timer(&pl->acceptor.timer2); pl->acceptor.timer2 = pl->acceptor.timer1; } strcpy(plr.name, pl->proposer.plv->name); plr.owner = pl->proposer.plv->owner; plr.expires = current_time() + pl->proposer.plv->expiry; plr.ballot = round; + p_l_op->notify((pl_handle_t)pl, &plr); - if (pl->release) { - if (pl->acceptor.timer2) - del_timer(&pl->acceptor.timer2); - if (pl->acceptor.timer1) - del_timer(&pl->acceptor.timer1); - if (pl->proposer.timer2) - del_timer(&pl->proposer.timer2); - if (pl->proposer.timer1) - del_timer(&pl->proposer.timer1); - plr.owner = pl->owner = -1; - plr.expires = 0; + log_debug("exit start_lease_commit"); + return 0; +} + +static int stop_lease_commit(pi_handle_t handle, + void *extra __attribute__((unused)), + int round) +{ + struct paxos_lease *pl; + struct paxos_lease_result plr; + + log_debug("enter stop_lease_commit"); + if (!find_paxos_lease(handle, &pl)) + return -1; + + if (round != pl->proposer.round) { + log_error("current round is not the proposer round, " + "current round: %d, proposer round: %d", + round, pl->proposer.round); + return -1; } - p_l_op->notify((pl_handle_t)pl, &plr); + if (pl->acceptor.timer2) + del_timer(&pl->acceptor.timer2); + if (pl->acceptor.timer1) + del_timer(&pl->acceptor.timer1); + if (pl->proposer.timer2) + del_timer(&pl->proposer.timer2); + if (pl->proposer.timer1) + del_timer(&pl->proposer.timer1); + pl->release = LEASE_STOPPED; + + strcpy(plr.name, pl->proposer.plv->name); + plr.owner = pl->owner = -1; + plr.ballot = round; + plr.expires = 0; + p_l_op->notify((pl_handle_t)pl, &plr); + log_debug("exit stop_lease_commit"); return 0; } -static int lease_learned(pi_handle_t handle, - void *extra __attribute__((unused)), - int round) +static int lease_commit(pi_handle_t handle, void *extra, int round) +{ + struct paxos_lease_msghdr *hdr = extra; + int ret = 0; + int op = ntohl(hdr->op); + + log_debug("enter lease_commit"); + assert(OP_START_LEASE == op || OP_STOP_LEASE == op); + switch (op) { + case OP_START_LEASE: + ret = start_lease_commit(handle, extra, round); + break; + case OP_STOP_LEASE: + ret = stop_lease_commit(handle, extra, round); + break; + } + + log_debug("exit lease_commit"); + return ret; +} + +static int start_lease_learned(pi_handle_t handle, void *extra, int round) { struct paxos_lease *pl; struct paxos_lease_result plr; - int found = 0; - list_for_each_entry(pl, &lease_head, list) { - if (pl->pih == handle) { - found = 1; - break; - } - } - if (!found) { - log_error("could not find the lease handle: %ld", handle); + log_debug("enter start_lease_learned"); + if (!find_paxos_lease(handle, &pl)) return -1; - } if (round != pl->acceptor.round) { log_error("current round is not the proposer round, " "current round: %d, proposer round: %d", round, pl->proposer.round); return -1; } + pl->release = LEASE_STARTED; pl->owner = pl->acceptor.plv->owner; pl->expiry = pl->acceptor.plv->expiry; - pl->release = pl->acceptor.plv->release; if (pl->acceptor.timer2 != pl->acceptor.timer1) { if (pl->acceptor.timer2) del_timer(&pl->acceptor.timer2); pl->acceptor.timer2 = pl->acceptor.timer1; } strcpy(plr.name, pl->acceptor.plv->name); plr.owner = pl->acceptor.plv->owner; plr.expires = current_time() + pl->acceptor.plv->expiry; plr.ballot = round; + p_l_op->notify((pl_handle_t)pl, &plr); + log_debug("exit start_lease_learned"); + return 0; +} - if (pl->release) { - if (pl->acceptor.timer2) - del_timer(&pl->acceptor.timer2); - if (pl->acceptor.timer1) - del_timer(&pl->acceptor.timer1); - plr.owner = pl->owner = -1; - plr.expires = 0; +static int stop_lease_learned(pi_handle_t handle, + void *extra __attribute__((unused)), + int round) +{ + struct paxos_lease *pl; + struct paxos_lease_result plr; + + log_debug("enter stop_lease_learned"); + if (!find_paxos_lease(handle, &pl)) + return -1; + + if (round != pl->acceptor.round) { + log_error("current round is not the proposer round, " + "current round: %d, proposer round: %d", + round, pl->proposer.round); + return -1; } - p_l_op->notify((pl_handle_t)pl, &plr); + if (pl->acceptor.timer2) + del_timer(&pl->acceptor.timer2); + if (pl->acceptor.timer1) + del_timer(&pl->acceptor.timer1); + pl->release = LEASE_STOPPED; + strcpy(plr.name, pl->acceptor.plv->name); + plr.owner = pl->owner = -1; + plr.ballot = round; + plr.expires = 0; + p_l_op->notify((pl_handle_t)pl, &plr); + log_debug("exit stop_lease_learned"); return 0; } +static int lease_learned(pi_handle_t handle, void *extra, int round) +{ + struct paxos_lease_msghdr *hdr = extra; + int ret = 0; + int op = ntohl(hdr->op); + + log_debug("enter lease_learned"); + assert(OP_START_LEASE == op || OP_STOP_LEASE == op); + switch (op) { + case OP_START_LEASE: + ret = start_lease_learned(handle, extra, round); + break; + case OP_STOP_LEASE: + ret = stop_lease_learned(handle, extra, round); + break; + } + + log_debug("exit lease_learned"); + return ret; +} + pl_handle_t paxos_lease_init(const void *name, unsigned int namelen, int expiry, int number, int failover, unsigned char *role, int *prio, const struct paxos_lease_operations *pl_op) { ps_handle_t psh; pi_handle_t pih; struct paxos_lease *lease; if (namelen > PAXOS_NAME_LEN) { log_error("length of paxos name is too long (%u)", namelen); return -EINVAL; } if (myid == -1) myid = pl_op->get_myid(); if (!ps_handle) { px_op = malloc(sizeof(struct paxos_operations)); if (!px_op) { log_error("could not alloc for paxos operations"); return -ENOMEM; } memset(px_op, 0, sizeof(struct paxos_operations)); px_op->get_myid = pl_op->get_myid; px_op->send = pl_op->send; px_op->broadcast = pl_op->broadcast; px_op->catchup = lease_catchup; - px_op->prepare = lease_prepared; - px_op->promise = handle_lease_request; + px_op->prepare = lease_prepare; + px_op->is_prepared = lease_is_prepared; + px_op->promise = lease_promise; px_op->propose = lease_propose; px_op->accepted = lease_accepted; px_op->commit = lease_commit; px_op->learned = lease_learned; p_l_op = pl_op; psh = paxos_space_init(PAXOS_LEASE_SPACE, number, sizeof(struct paxos_lease_msghdr), PLEASE_VALUE_LEN, role, px_op); if (psh <= 0) { log_error("failed to initialize paxos space: %ld", psh); free(px_op); px_op = NULL; return psh; } ps_handle = psh; } lease = malloc(sizeof(struct paxos_lease)); if (!lease) { log_error("cound not alloc for paxos lease"); return -ENOMEM; } memset(lease, 0, sizeof(struct paxos_lease)); strncpy(lease->name, name, PAXOS_NAME_LEN + 1); lease->owner = -1; lease->expiry = expiry; lease->failover = failover; list_add_tail(&lease->list, &lease_head); pih = paxos_instance_init(ps_handle, name, prio); if (pih <= 0) { log_error("failed to initialize paxos instance: %ld", pih); free(lease); return pih; } lease->pih = pih; return (pl_handle_t)lease; } int paxos_lease_status_recovery(pl_handle_t handle) { struct paxos_lease *pl = (struct paxos_lease *)handle; if (paxos_recovery_status_get(pl->pih) == 1) { pl->renew = 1; if (paxos_catchup(pl->pih) == 0) paxos_recovery_status_set(pl->pih, 0); } return 0; } int paxos_lease_on_receive(void *msg, int msglen) { return paxos_recvmsg(msg, msglen); } int paxos_lease_exit(pl_handle_t handle) { struct paxos_lease *pl = (struct paxos_lease *)handle; if (px_op) free(px_op); if (pl->proposer.plv) free(pl->proposer.plv); if (pl->proposer.timer1) del_timer(&pl->proposer.timer1); if (pl->proposer.timer2) del_timer(&pl->proposer.timer2); if (pl->acceptor.plv) free(pl->acceptor.plv); if (pl->acceptor.timer1) del_timer(&pl->acceptor.timer1); if (pl->acceptor.timer2) del_timer(&pl->acceptor.timer2); return 0; } diff --git a/src/paxos_lease.h b/src/paxos_lease.h index bb46ec1..e541b0e 100644 --- a/src/paxos_lease.h +++ b/src/paxos_lease.h @@ -1,69 +1,74 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _PAXOS_LEASE_H #define _PAXOS_LEASE_H #define PLEASE_NAME_LEN 63 +#define NOT_CLEAR_RELEASE 0 +#define CLEAR_RELEASE 1 + typedef long pl_handle_t; struct paxos_lease_result { char name[PLEASE_NAME_LEN+1]; int owner; int ballot; unsigned long long expires; }; struct paxos_lease_operations { int (*get_myid) (void); int (*send) (unsigned long id, void *value, int len); int (*broadcast) (void *value, int len); int (*catchup) (const void *name, int *owner, int *ballot, unsigned long long *expires); int (*notify) (pl_handle_t handle, struct paxos_lease_result *result); }; pl_handle_t paxos_lease_init(const void *name, unsigned int namelen, int expiry, int number, int failover, unsigned char *role, int *prio, const struct paxos_lease_operations *pl_op); int paxos_lease_on_receive(void *msg, int msglen); int paxos_lease_acquire(pl_handle_t handle, + int clear, int renew, void (*end_acquire) (pl_handle_t handle, int result)); /* int paxos_lease_owner_get(const void *name); int paxos_lease_epoch_get(const void *name); int paxos_lease_timeout(const void *name); */ int paxos_lease_status_recovery(pl_handle_t handle); -int paxos_lease_release(pl_handle_t handle); +int paxos_lease_release(pl_handle_t handle, + void (*end_release) (pl_handle_t handle, int result)); int paxos_lease_exit(pl_handle_t handle); #endif /* _PAXOS_LEASE_H */ diff --git a/src/ticket.c b/src/ticket.c index 988efa1..79f7404 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -1,578 +1,615 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include "ticket.h" #include "config.h" #include "pacemaker.h" #include "list.h" #include "log.h" #include "booth.h" #include "timer.h" #include "paxos_lease.h" #include "paxos.h" #define PAXOS_MAGIC 0xDB12 #define TK_LINE 256 struct booth_msghdr { uint16_t magic; uint16_t checksum; uint32_t len; } __attribute__((packed)); struct ticket_msg { char id[BOOTH_NAME_LEN+1]; uint32_t owner; uint32_t expiry; uint32_t ballot; uint32_t result; } __attribute__((packed)); struct ticket { char id[BOOTH_NAME_LEN+1]; pl_handle_t handle; int owner; int expiry; int ballot; unsigned long long expires; struct list_head list; }; static LIST_HEAD(ticket_list); static unsigned char *role; int check_ticket(char *ticket) { int i; if (!booth_conf) return 0; for (i = 0; i < booth_conf->ticket_count; i++) { if (!strcmp(booth_conf->ticket[i].name, ticket)) return 1; } return 0; } int check_site(char *site, int *local) { int i; if (!booth_conf) return 0; for (i = 0; i < booth_conf->node_count; i++) { if (booth_conf->node[i].type == SITE && !strcmp(booth_conf->node[i].addr, site)) { *local = booth_conf->node[i].local; return 1; } } return 0; } static int * ticket_priority(int i) { int j; /* TODO: need more precise check */ for (j = 0; j < booth_conf->node_count; j++) { if (booth_conf->ticket[i].weight[j] == 0) return NULL; } return booth_conf->ticket[i].weight; } static int ticket_get_myid(void) { return booth_transport[booth_conf->proto].get_myid(); } -static void end_acquire(pl_handle_t handle, int result) +static void end_acquire(pl_handle_t handle, int error) { struct ticket *tk; int found = 0; - if (result == 0) { - list_for_each_entry(tk, &ticket_list, list) { - if (tk->handle == handle) { - found = 1; - break; - } + log_debug("enter end_acquire"); + list_for_each_entry(tk, &ticket_list, list) { + if (tk->handle == handle) { + found = 1; + break; } - if (!found) - log_error("BUG: ticket handle %ld does not exist", - handle); - log_info("ticket %s was granted/reovked successfully (site %d)", - tk->id, ticket_get_myid()); } + + if (!found) { + log_error("BUG: ticket handle %ld does not exist", handle); + return; + } + + if (error) + log_info("ticket %s was granted failed (site %d), error:%s", + tk->id, ticket_get_myid(), strerror(error)); + else + log_info("ticket %s was granted successfully (site %d)", + tk->id, ticket_get_myid()); + log_debug("exit end_acquire"); +} + +static void end_release(pl_handle_t handle, int error) +{ + struct ticket *tk; + int found = 0; + + log_debug("enter end_release"); + list_for_each_entry(tk, &ticket_list, list) { + if (tk->handle == handle) { + found = 1; + break; + } + } + + if (!found) { + log_error("BUG: ticket handle %ld does not exist", handle); + return; + } + + if (error) + log_info("ticket %s was reovked failed (site %d), error:%s", + tk->id, ticket_get_myid(), strerror(error)); + else + log_info("ticket %s was reovked successfully (site %d)", + tk->id, ticket_get_myid()); + + log_debug("exit end_release"); } static int ticket_send(unsigned long id, void *value, int len) { int i, rv = -1; struct booth_node *to = NULL; struct booth_msghdr *hdr; void *buf; for (i = 0; i < booth_conf->node_count; i++) { if (booth_conf->node[i].nodeid == id) to = &booth_conf->node[i]; } if (!to) return rv; buf = malloc(sizeof(struct booth_msghdr) + len); if (!buf) return -ENOMEM; memset(buf, 0, sizeof(struct booth_msghdr) + len); hdr = buf; hdr->magic = htons(PAXOS_MAGIC); hdr->len = htonl(sizeof(struct booth_msghdr) + len); memcpy((char *)buf + sizeof(struct booth_msghdr), value, len); rv = booth_transport[booth_conf->proto].send( (unsigned long)to, buf, sizeof(struct booth_msghdr) + len); free(buf); return rv; } static int ticket_broadcast(void *value, int len) { void *buf; struct booth_msghdr *hdr; int rv; buf = malloc(sizeof(struct booth_msghdr) + len); if (!buf) return -ENOMEM; memset(buf, 0, sizeof(struct booth_msghdr) + len); hdr = buf; hdr->magic = htons(PAXOS_MAGIC); hdr->len = htonl(sizeof(struct booth_msghdr) + len); memcpy((char *)buf + sizeof(struct booth_msghdr), value, len); rv = booth_transport[booth_conf->proto].broadcast( buf, sizeof(struct booth_msghdr) + len); free(buf); return rv; } #if 0 static int ticket_read(const void *name, int *owner, int *ballot, unsigned long long *expires) { struct ticket *tk; int found = 0; list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, name)) { found = 1; break; } } if (!found) { log_error("BUG: ticket_read failed (ticket %s does not exist)", (char *)name); return -1; } pcmk_handler.load_ticket(tk->id, &tk->owner, &tk->ballot, &tk->expires); *owner = tk->owner; *expires = tk->expires; *ballot = tk->ballot; return 0; } #endif static int ticket_parse(struct ticket_msg *tmsg) { struct ticket *tk; int found = 0; if (!tmsg->result) return -1; list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, tmsg->id)) { tk->owner = tmsg->owner; tk->expires = current_time() + tmsg->expiry; tk->ballot = tmsg->ballot; found = 1; break; } } if (!found) return -1; else return 0; } static int ticket_catchup(const void *name, int *owner, int *ballot, unsigned long long *expires) { struct ticket *tk; int i, s, buflen, rv = 0; char *buf = NULL; struct boothc_header *h; struct ticket_msg *tmsg; int myid = ticket_get_myid(); if (booth_conf->node[myid].type != ARBITRATOR) { list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, name)) { pcmk_handler.load_ticket(tk->id, &tk->owner, &tk->ballot, &tk->expires); if (current_time() >= tk->expires) { tk->owner = -1; tk->expires = 0; } } } } buflen = sizeof(struct boothc_header) + sizeof(struct ticket_msg); buf = malloc(buflen); if (!buf) return -ENOMEM; memset(buf, 0, buflen); h = (struct boothc_header *)buf; h->magic = BOOTHC_MAGIC; h->version = BOOTHC_VERSION; h->cmd = BOOTHC_CMD_CATCHUP; h->len = sizeof(struct ticket_msg); tmsg = (struct ticket_msg *)(buf + sizeof(struct boothc_header)); for (i = 0; i < booth_conf->node_count; i++) { if (booth_conf->node[i].type == SITE && !(booth_conf->node[i].local)) { strncpy(tmsg->id, name, BOOTH_NAME_LEN + 1); log_debug("attempting catchup from %s", booth_conf->node[i].addr); s = booth_transport[TCP].open(&booth_conf->node[i]); if (s < 0) continue; log_debug("connected to %s", booth_conf->node[i].addr); rv = booth_transport[TCP].send(s, buf, buflen); if (rv < 0) { booth_transport[TCP].close(s); continue; } log_debug("sent catchup command to %s", booth_conf->node[i].addr); memset(tmsg, 0, sizeof(struct ticket_msg)); rv = booth_transport[TCP].recv(s, buf, buflen); if (rv < 0) { booth_transport[TCP].close(s); continue; } booth_transport[TCP].close(s); ticket_parse(tmsg); memset(tmsg, 0, sizeof(struct ticket_msg)); } } list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, name)) { if (booth_conf->node[myid].type != ARBITRATOR) { if (current_time() >= tk->expires) { tk->owner = -1; tk->expires = 0; } pcmk_handler.store_ticket(tk->id, tk->owner, tk->ballot, tk->expires); if (tk->owner == myid) pcmk_handler.grant_ticket(tk->id); else pcmk_handler.revoke_ticket(tk->id); } *owner = tk->owner; *expires = tk->expires; *ballot = tk->ballot; } } free(buf); return rv; } static int ticket_write(pl_handle_t handle, struct paxos_lease_result *result) { struct ticket *tk; int found = 0; list_for_each_entry(tk, &ticket_list, list) { if (tk->handle == handle) { found = 1; break; } } if (!found) { log_error("BUG: ticket_write failed " "(ticket handle %ld does not exist)", handle); return -1; } tk->owner = result->owner; tk->expires = result->expires; tk->ballot = result->ballot; if (tk->owner == ticket_get_myid()) { pcmk_handler.store_ticket(tk->id, tk->owner, tk->ballot, tk->expires); pcmk_handler.grant_ticket(tk->id); } else if (tk->owner == -1) { pcmk_handler.store_ticket(tk->id, tk->owner, tk->ballot, tk->expires); pcmk_handler.revoke_ticket(tk->id); } else pcmk_handler.store_ticket(tk->id, tk->owner, tk->ballot, tk->expires); return 0; } static void ticket_status_recovery(pl_handle_t handle) { paxos_lease_status_recovery(handle); } int ticket_recv(void *msg, int msglen) { struct booth_msghdr *hdr; char *data; hdr = msg; if (ntohs(hdr->magic) != PAXOS_MAGIC || ntohl(hdr->len) != msglen) { log_error("message received error"); return -1; } data = (char *)msg + sizeof(struct booth_msghdr); return paxos_lease_on_receive(data, msglen - sizeof(struct booth_msghdr)); } int grant_ticket(char *ticket) { struct ticket *tk; int found = 0; list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, ticket)) { found = 1; break; } } + if (!found) { log_error("ticket %s does not exist", ticket); return BOOTHC_RLT_SYNC_FAIL; } if (tk->owner == ticket_get_myid()) return BOOTHC_RLT_SYNC_SUCC; else { - paxos_lease_acquire(tk->handle, 1, end_acquire); + paxos_lease_acquire(tk->handle, CLEAR_RELEASE, 1, end_acquire); return BOOTHC_RLT_ASYNC; } } int revoke_ticket(char *ticket) { struct ticket *tk; int found = 0; list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, ticket)) { found = 1; break; } } + if (!found) { log_error("ticket %s does not exist", ticket); return BOOTHC_RLT_SYNC_FAIL; } if (tk->owner == -1) return BOOTHC_RLT_SYNC_SUCC; else { - paxos_lease_release(tk->handle); + paxos_lease_release(tk->handle, end_release); return BOOTHC_RLT_ASYNC; } } int get_ticket_info(char *name, int *owner, int *expires) { struct ticket *tk; list_for_each_entry(tk, &ticket_list, list) { if (!strncmp(tk->id, name, BOOTH_NAME_LEN + 1)) { if(owner) *owner = tk->owner; if(expires) *expires = tk->expires; return 0; } } return -1; } int list_ticket(char **pdata, unsigned int *len) { struct ticket *tk; char timeout_str[100]; char node_name[BOOTH_NAME_LEN]; char tmp[TK_LINE]; *pdata = NULL; *len = 0; list_for_each_entry(tk, &ticket_list, list) { memset(tmp, 0, TK_LINE); strncpy(timeout_str, "INF", sizeof(timeout_str)); strncpy(node_name, "None", sizeof(node_name)); if (tk->owner < MAX_NODES && tk->owner > -1) strncpy(node_name, booth_conf->node[tk->owner].addr, sizeof(node_name)); if (tk->expires != 0) strftime(timeout_str, sizeof(timeout_str), "%Y/%m/%d %H:%M:%S", localtime((time_t *)&tk->expires)); snprintf(tmp, TK_LINE, "ticket: %s, owner: %s, expires: %s\n", tk->id, node_name, timeout_str); *pdata = realloc(*pdata, *len + TK_LINE); if (*pdata == NULL) return -ENOMEM; memset(*pdata + *len, 0, TK_LINE); memcpy(*pdata + *len, tmp, TK_LINE); *len += TK_LINE; } return 0; } int catchup_ticket(char **pdata, unsigned int len) { struct ticket_msg *tmsg; struct ticket *tk; assert(len == sizeof(struct ticket_msg)); tmsg = (struct ticket_msg *)(*pdata); list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, tmsg->id) && tk->owner == ticket_get_myid() && current_time() < tk->expires) { tmsg->owner = tk->owner; tmsg->expiry = tk->expires - current_time(); tmsg->ballot = tk->ballot; tmsg->result = 1; break; } } if (!tmsg->result) memset(*pdata, 0, len); return 0; } const struct paxos_lease_operations ticket_operations = { .get_myid = ticket_get_myid, .send = ticket_send, .broadcast = ticket_broadcast, .catchup = ticket_catchup, .notify = ticket_write, }; int setup_ticket(void) { struct ticket *tk, *tmp; int i, rv; pl_handle_t plh; int myid; role = malloc(booth_conf->node_count * sizeof(unsigned char)); if (!role) return -ENOMEM; memset(role, 0, booth_conf->node_count * sizeof(unsigned char)); for (i = 0; i < booth_conf->node_count; i++) { if (booth_conf->node[i].type == SITE) role[i] = PROPOSER | ACCEPTOR | LEARNER; else if (booth_conf->node[i].type == ARBITRATOR) role[i] = ACCEPTOR | LEARNER; } for (i = 0; i < booth_conf->ticket_count; i++) { tk = malloc(sizeof(struct ticket)); if (!tk) { rv = -ENOMEM; goto out; } memset(tk, 0, sizeof(struct ticket)); strcpy(tk->id, booth_conf->ticket[i].name); tk->owner = -1; tk->expiry = booth_conf->ticket[i].expiry; list_add_tail(&tk->list, &ticket_list); plh = paxos_lease_init(tk->id, BOOTH_NAME_LEN, tk->expiry, booth_conf->node_count, 1, role, ticket_priority(i), &ticket_operations); if (plh <= 0) { log_error("paxos lease initialization failed"); rv = plh; goto out; } tk->handle = plh; } myid = ticket_get_myid(); assert(myid < booth_conf->node_count); if (role[myid] & ACCEPTOR) { list_for_each_entry(tk, &ticket_list, list) { ticket_status_recovery(tk->handle); } } return 0; out: list_for_each_entry_safe(tk, tmp, &ticket_list, list) { list_del(&tk->list); } free(role); return rv; }