diff --git a/src/booth.h b/src/booth.h index 66bd5e8..0a5ca98 100644 --- a/src/booth.h +++ b/src/booth.h @@ -1,84 +1,85 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _BOOTH_H #define _BOOTH_H #include #include #include #define BOOTH_LOG_DUMP_SIZE (1024*1024) #define BOOTH_RUN_DIR "/var/run" #define BOOTH_LOG_DIR "/var/log" #define BOOTH_LOGFILE_NAME "booth.log" #define BOOTH_DEFAULT_LOCKFILE BOOTH_RUN_DIR "/booth.pid" #define BOOTH_DEFAULT_CONF "/etc/booth/booth.conf" #define DAEMON_NAME "booth" #define BOOTH_NAME_LEN 63 #define BOOTH_PATH_LEN 127 #define BOOTHC_SOCK_PATH "boothc_lock" #define BOOTH_PROTO_FAMILY AF_INET #define BOOTH_CMD_PORT 22075 #define BOOTHC_MAGIC 0x5F1BA08C #define BOOTHC_VERSION 0x00010000 #define BOOTHC_OPT_FORCE 0x00000001 struct boothc_header { uint32_t magic; uint32_t version; uint32_t cmd; uint32_t option; uint32_t expiry; uint32_t len; uint32_t result; }; typedef enum { BOOTHC_CMD_LIST = 1, BOOTHC_CMD_GRANT, BOOTHC_CMD_REVOKE, BOOTHC_CMD_CATCHUP, } cmd_request_t; typedef enum { BOOTHC_RLT_ASYNC = 1, BOOTHC_RLT_SYNC_SUCC, BOOTHC_RLT_SYNC_FAIL, BOOTHC_RLT_INVALID_ARG, BOOTHC_RLT_REMOTE_OP, + BOOTHC_RLT_OVERGRANT, } cmd_result_t; struct client { int fd; void *workfn; void *deadfn; }; int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci)); int do_read(int fd, void *buf, size_t count); int do_write(int fd, void *buf, size_t count); void process_connection(int ci); void safe_copy(char *dest, char *value, size_t buflen, const char *description); #endif /* _BOOTH_H */ diff --git a/src/main.c b/src/main.c index 07c7446..794bce5 100644 --- a/src/main.c +++ b/src/main.c @@ -1,1039 +1,1145 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include +#include +#include +#include #include "log.h" #include "booth.h" #include "config.h" #include "transport.h" #include "timer.h" #include "pacemaker.h" #include "ticket.h" -#include #define RELEASE_VERSION "1.0" #define CLIENT_NALLOC 32 int log_logfile_priority = LOG_INFO; int log_syslog_priority = LOG_ERR; int log_stderr_priority = LOG_ERR; int daemonize = 0; static int client_maxi; static int client_size = 0; struct client *client = NULL; struct pollfd *pollfd = NULL; int poll_timeout = -1; typedef enum { ACT_ARBITRATOR = 1, ACT_SITE, ACT_CLIENT, } booth_role_t; typedef enum { OP_LIST = 1, OP_GRANT, OP_REVOKE, } operation_t; struct command_line { int type; /* ACT_ */ int op; /* OP_ */ int force; char configfile[BOOTH_PATH_LEN]; char lockfile[BOOTH_PATH_LEN]; char site[BOOTH_NAME_LEN]; char ticket[BOOTH_NAME_LEN]; }; static struct command_line cl; int do_read(int fd, void *buf, size_t count) { int rv, off = 0; while (off < count) { rv = read(fd, (char *)buf + off, count - off); if (rv == 0) return -1; if (rv == -1 && errno == EINTR) continue; if (rv == -1) return -1; off += rv; } return 0; } int do_write(int fd, void *buf, size_t count) { int rv, off = 0; retry: rv = write(fd, (char *)buf + off, count); if (rv == -1 && errno == EINTR) goto retry; if (rv < 0) { log_error("write failed: %s (%d)", strerror(errno), errno); return rv; } if (rv != count) { count -= rv; off += rv; goto retry; } return 0; } static int do_connect(const char *sock_path) { struct sockaddr_un sun; socklen_t addrlen; int rv, fd; fd = socket(PF_UNIX, SOCK_STREAM, 0); if (fd < 0) { log_error("failed to create socket: %s (%d)", strerror(errno), errno); goto out; } memset(&sun, 0, sizeof(sun)); sun.sun_family = AF_UNIX; strcpy(&sun.sun_path[1], sock_path); addrlen = sizeof(sa_family_t) + strlen(sun.sun_path+1) + 1; rv = connect(fd, (struct sockaddr *) &sun, addrlen); if (rv < 0) { if (errno == ECONNREFUSED) log_error("Connection to boothd was refused; " "please ensure that you are on a " "machine which has boothd running."); else log_error("failed to connect: %s (%d)", strerror(errno), errno); close(fd); fd = rv; } out: return fd; } static void init_header(struct boothc_header *h, int cmd, int option, int result, int data_len) { memset(h, 0, sizeof(struct boothc_header)); h->magic = BOOTHC_MAGIC; h->version = BOOTHC_VERSION; h->len = data_len; h->cmd = cmd; h->option = option; h->result = result; } static void client_alloc(void) { int i; if (!client) { client = malloc(CLIENT_NALLOC * sizeof(struct client)); pollfd = malloc(CLIENT_NALLOC * sizeof(struct pollfd)); } else { client = realloc(client, (client_size + CLIENT_NALLOC) * sizeof(struct client)); pollfd = realloc(pollfd, (client_size + CLIENT_NALLOC) * sizeof(struct pollfd)); if (!pollfd) log_error("can't alloc for pollfd"); } if (!client || !pollfd) log_error("can't alloc for client array"); for (i = client_size; i < client_size + CLIENT_NALLOC; i++) { client[i].workfn = NULL; client[i].deadfn = NULL; client[i].fd = -1; pollfd[i].fd = -1; pollfd[i].revents = 0; } client_size += CLIENT_NALLOC; } static void client_dead(int ci) { close(client[ci].fd); client[ci].workfn = NULL; client[ci].fd = -1; pollfd[ci].fd = -1; } int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci)) { int i; if (!client) client_alloc(); again: for (i = 0; i < client_size; i++) { if (client[i].fd == -1) { client[i].workfn = workfn; if (deadfn) client[i].deadfn = deadfn; else client[i].deadfn = client_dead; client[i].fd = fd; pollfd[i].fd = fd; pollfd[i].events = POLLIN; if (i > client_maxi) client_maxi = i; return i; } } client_alloc(); goto again; } static int setup_listener(const char *sock_path) { struct sockaddr_un addr; socklen_t addrlen; int rv, s; s = socket(AF_LOCAL, SOCK_STREAM, 0); if (s < 0) { log_error("socket error %d: %s (%d)", s, strerror(errno), errno); return s; } memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_LOCAL; strcpy(&addr.sun_path[1], sock_path); addrlen = sizeof(sa_family_t) + strlen(addr.sun_path+1) + 1; rv = bind(s, (struct sockaddr *) &addr, addrlen); if (rv < 0) { log_error("bind error %d: %s (%d)", rv, strerror(errno), errno); close(s); return rv; } rv = listen(s, 5); if (rv < 0) { log_error("listen error %d: %s (%d)", rv, strerror(errno), errno); close(s); return rv; } return s; } void process_connection(int ci) { struct boothc_header h; char *data = NULL; char *site, *ticket; + int ticket_owner; int local, rv; void (*deadfn) (int ci); rv = do_read(client[ci].fd, &h, sizeof(h)); if (rv < 0) { if (errno == ECONNRESET) - log_debug("client %d connection reset for fd %d", ci, client[ci].fd); - else - log_debug("client %d read failed for fd %d: %s (%d)", - ci, client[ci].fd, - strerror(errno), errno); + log_debug("client %d connection reset for fd %d", + ci, client[ci].fd); deadfn = client[ci].deadfn; if(deadfn) { - log_debug("run deadfn"); deadfn(ci); } return; } if (h.magic != BOOTHC_MAGIC) { log_error("connection %d magic error %x", ci, h.magic); return; } if (h.version != BOOTHC_VERSION) { log_error("connection %d version error %x", ci, h.version); return; } if (h.len) { data = malloc(h.len); if (!data) { log_error("process_connection no mem %u", h.len); return; } memset(data, 0, h.len); rv = do_read(client[ci].fd, data, h.len); if (rv < 0) { log_error("connection %d read data error %d", ci, rv); goto out; } } switch (h.cmd) { case BOOTHC_CMD_LIST: assert(!data); h.result = list_ticket(&data, &h.len); break; case BOOTHC_CMD_GRANT: h.len = 0; site = data; ticket = data + BOOTH_NAME_LEN; + if (!check_ticket(ticket)) { h.result = BOOTHC_RLT_INVALID_ARG; goto reply; } + + if (get_ticket_info(ticket, &ticket_owner, NULL) == 0) { + if (ticket_owner > -1) { + log_error("client want to get an granted " + "ticket %s", ticket); + h.result = BOOTHC_RLT_OVERGRANT; + goto reply; + } + + } else { + log_error("can not get ticket %s's info", ticket); + h.result = BOOTHC_RLT_INVALID_ARG; + goto reply; + } + if (!check_site(site, &local)) { h.result = BOOTHC_RLT_INVALID_ARG; goto reply; } if (local) h.result = grant_ticket(ticket, h.option); else h.result = BOOTHC_RLT_REMOTE_OP; break; case BOOTHC_CMD_REVOKE: h.len = 0; site = data; ticket = data + BOOTH_NAME_LEN; if (!check_ticket(ticket)) { h.result = BOOTHC_RLT_INVALID_ARG; goto reply; } if (!check_site(site, &local)) { h.result = BOOTHC_RLT_INVALID_ARG; goto reply; } if (local) h.result = revoke_ticket(ticket, h.option); else h.result = BOOTHC_RLT_REMOTE_OP; break; case BOOTHC_CMD_CATCHUP: h.result = catchup_ticket(&data, h.len); break; default: log_error("connection %d cmd %x unknown", ci, h.cmd); break; } reply: rv = do_write(client[ci].fd, &h, sizeof(h)); if (rv < 0) log_error("connection %d write error %d", ci, rv); if (h.len) { rv = do_write(client[ci].fd, data, h.len); if (rv < 0) log_error("connection %d write error %d", ci, rv); } out: free(data); } static void process_listener(int ci) { int fd, i; fd = accept(client[ci].fd, NULL, NULL); if (fd < 0) { log_error("process_listener: accept error for fd %d: %s (%d)", fd, strerror(errno), errno); return; } i = client_add(fd, process_connection, NULL); log_debug("add client connection %d fd %d", i, fd); } static int setup_config(int type) { int rv; rv = read_config(cl.configfile); if (rv < 0) goto out; rv = check_config(type); if (rv < 0) goto out; out: return rv; } static int setup_transport(void) { int rv; transport_layer_t proto = booth_conf->proto; rv = booth_transport[proto].init(ticket_recv); if (rv < 0) { log_error("failed to init booth_transport[%d]", proto); goto out; } rv = booth_transport[TCP].init(NULL); if (rv < 0) { log_error("failed to init booth_transport[TCP]"); goto out; } out: return rv; } static int setup_timer(void) { return timerlist_init(); } static int setup(int type) { int rv; rv = setup_config(type); if (rv < 0) goto fail; rv = setup_timer(); if (rv < 0) goto fail; rv = setup_transport(); if (rv < 0) goto fail; rv = setup_ticket(); if (rv < 0) goto fail; rv = setup_listener(BOOTHC_SOCK_PATH); if (rv < 0) goto fail; client_add(rv, process_listener, NULL); return 0; fail: return -1; } static int loop(void) { void (*workfn) (int ci); void (*deadfn) (int ci); int rv, i; while (1) { rv = poll(pollfd, client_maxi + 1, poll_timeout); if (rv == -1 && errno == EINTR) continue; if (rv < 0) { log_error("poll failed: %s (%d)", strerror(errno), errno); goto fail; } for (i = 0; i <= client_maxi; i++) { if (client[i].fd < 0) continue; if (pollfd[i].revents & POLLIN) { workfn = client[i].workfn; if (workfn) workfn(i); } if (pollfd[i].revents & (POLLERR | POLLHUP | POLLNVAL)) { deadfn = client[i].deadfn; if (deadfn) deadfn(i); } } process_timerlist(); } return 0; fail: return -1; } static int do_list(void) { struct boothc_header h, *rh; char *reply = NULL, *data; int data_len; int fd, rv; init_header(&h, BOOTHC_CMD_LIST, 0, 0, 0); fd = do_connect(BOOTHC_SOCK_PATH); if (fd < 0) { rv = fd; goto out; } rv = do_write(fd, &h, sizeof(h)); if (rv < 0) goto out_close; reply = malloc(sizeof(struct boothc_header)); if (!reply) { rv = -ENOMEM; goto out_close; } rv = do_read(fd, reply, sizeof(struct boothc_header)); if (rv < 0) goto out_free; rh = (struct boothc_header *)reply; data_len = rh->len; reply = realloc(reply, sizeof(struct boothc_header) + data_len); if (!reply) { rv = -ENOMEM; goto out_free; } data = reply + sizeof(struct boothc_header); rv = do_read(fd, data, data_len); if (rv < 0) goto out_free; do_write(STDOUT_FILENO, data, data_len); rv = 0; out_free: free(reply); out_close: close(fd); out: return rv; } +static inline void load_bar(int x, int n, int r, int w) +{ + int i; + float ratio; + int c; + + /* Only update r times.*/ + if ( x % (n / r) != 0 ) return; + + /* Calculuate the ratio of complete-to-incomplete.*/ + ratio = x / (float)n; + c = ratio * w; + + /* Show the percentage complete.*/ + printf("%3d%% [", (int)(ratio * 100) ); + + /* Show the load bar.*/ + for (i = 0; i < c; i++) + printf("="); + for (i = c; i < w; i++) + printf(" "); + + printf("]"); + printf("\r"); + fflush(stdout); +} + +static void counting_down(int total_time) +{ + struct winsize size; + int screen_width; + int i; + + ioctl(STDIN_FILENO, TIOCGWINSZ, (char*)&size); + screen_width = size.ws_col/(float)2; + + /* ignore signals */ + signal(SIGTERM, SIG_IGN); + signal(SIGINT, SIG_IGN); + signal(SIGHUP, SIG_IGN); + + i = 0; + while (i <= total_time) { + load_bar(i, total_time, total_time, screen_width); + sleep(1); + i++; + } + + log_info("\nCounting Down Over...\n"); +} static int do_command(cmd_request_t cmd) { char *buf; struct boothc_header *h, reply; int buflen; uint32_t force = 0; int fd, rv; + int expire_time; + int i; + + buflen = sizeof(struct boothc_header) + sizeof(cl.site) + sizeof(cl.ticket); buf = malloc(buflen); if (!buf) { rv = -ENOMEM; goto out; } h = (struct boothc_header *)buf; if (cl.force) force = BOOTHC_OPT_FORCE; init_header(h, cmd, force, 0, sizeof(cl.site) + sizeof(cl.ticket)); strcpy(buf + sizeof(struct boothc_header), cl.site); strcpy(buf + sizeof(struct boothc_header) + sizeof(cl.site), cl.ticket); fd = do_connect(BOOTHC_SOCK_PATH); if (fd < 0) { rv = fd; goto out_free; } rv = do_write(fd, buf, buflen); if (rv < 0) goto out_close; rv = do_read(fd, &reply, sizeof(struct boothc_header)); if (rv < 0) goto out_close; if (reply.result == BOOTHC_RLT_INVALID_ARG) { log_info("invalid argument!"); rv = -1; goto out_close; } + if (reply.result == BOOTHC_RLT_OVERGRANT) { + log_info("You're granting a granted ticket" + "If you wanted to migrate a ticket," + "use revoke first, then use grant"); + rv = -1; + goto out_close; + } + if (reply.result == BOOTHC_RLT_REMOTE_OP) { struct booth_node to; int s; memset(&to, 0, sizeof(struct booth_node)); to.family = BOOTH_PROTO_FAMILY; strcpy(to.addr, cl.site); s = booth_transport[TCP].open(&to); if (s < 0) goto out_close; rv = booth_transport[TCP].send(s, buf, buflen); if (rv < 0) { booth_transport[TCP].close(s); goto out_close; } rv = booth_transport[TCP].recv(s, &reply, sizeof(struct boothc_header)); if (rv < 0) { booth_transport[TCP].close(s); goto out_close; } booth_transport[TCP].close(s); } if (reply.result == BOOTHC_RLT_ASYNC) { if (cmd == BOOTHC_CMD_GRANT) log_info("grant command sent, result will be returned " "asynchronously, you can get the result from " "the log files"); - else if (cmd == BOOTHC_CMD_REVOKE) + else if (cmd == BOOTHC_CMD_REVOKE) { log_info("revoke command sent, result will be returned " "asynchronously, you can get the result from " "the log files after the ticket expiry time."); + i = 0; + /* FIXME: if we access the server then get the actual + * remaining time the waiting will be shorter, for now, + * client is just waiting the expiry time. + */ + read_config(cl.configfile); + while (i < booth_conf->ticket_count) { + if (!strncmp(booth_conf->ticket[i].name, cl.ticket, + BOOTH_NAME_LEN)) { + expire_time = booth_conf->ticket[i].expiry; + log_info("You have to wait %d seconds to " + "ensure all timer has expired!", + expire_time); + counting_down(expire_time); + rv = 0; + break; + } + i++; + /* no ticket found in conf file */ + if( i == booth_conf->ticket_count ) { + log_error("check your config file, " + "ticket %s not found", cl.ticket); + log_error("your booth's config file may " + "not be the same!"); + break; + rv = -1; + } + } + } else - log_error("internal error reading reply result!"); + log_error("internal error when reading reply result!"); rv = 0; } else if (reply.result == BOOTHC_RLT_SYNC_SUCC) { if (cmd == BOOTHC_CMD_GRANT) log_info("grant succeeded!"); else if (cmd == BOOTHC_CMD_REVOKE) log_info("revoke succeeded!"); rv = 0; } else if (reply.result == BOOTHC_RLT_SYNC_FAIL) { if (cmd == BOOTHC_CMD_GRANT) log_info("grant failed!"); else if (cmd == BOOTHC_CMD_REVOKE) log_info("revoke failed!"); rv = 0; } else { log_error("internal error!"); rv = -1; } out_close: close(fd); out_free: free(buf); out: return rv; } static int do_grant(void) { return do_command(BOOTHC_CMD_GRANT); } static int do_revoke(void) { return do_command(BOOTHC_CMD_REVOKE); } static int lockfile(void) { char buf[16]; struct flock lock; int fd, rv; fd = open(cl.lockfile, O_CREAT|O_WRONLY, 0666); if (fd < 0) { log_error("lockfile open error %s: %s", cl.lockfile, strerror(errno)); return -1; } lock.l_type = F_WRLCK; lock.l_start = 0; lock.l_whence = SEEK_SET; lock.l_len = 0; rv = fcntl(fd, F_SETLK, &lock); if (rv < 0) { log_error("lockfile setlk error %s: %s", cl.lockfile, strerror(errno)); goto fail; } rv = ftruncate(fd, 0); if (rv < 0) { log_error("lockfile truncate error %s: %s", cl.lockfile, strerror(errno)); goto fail; } memset(buf, 0, sizeof(buf)); snprintf(buf, sizeof(buf), "%d\n", getpid()); rv = write(fd, buf, strlen(buf)); if (rv <= 0) { log_error("lockfile write error %s: %s", cl.lockfile, strerror(errno)); goto fail; } return fd; fail: close(fd); return -1; } static void unlink_lockfile(int fd) { unlink(cl.lockfile); close(fd); } static void print_usage(void) { printf("Usage:\n"); printf("booth [options]\n"); printf("\n"); printf("Types:\n"); printf(" arbitrator: daemon running on arbitrator\n"); printf(" site: daemon running on cluster site\n"); printf(" client: command running from client\n"); printf("\n"); printf("Operations:\n"); printf("Please note that operations are valid iff type is client!\n"); printf(" list: List all the tickets\n"); printf(" grant: Grant ticket T(-t T) to site S(-s S)\n"); printf(" revoke: Revoke ticket T(-t T) from site S(-s S)\n"); printf("\n"); printf("Options:\n"); printf(" -c FILE Specify config file [default " BOOTH_DEFAULT_CONF "]\n"); printf(" -l LOCKFILE Specify lock file [default " BOOTH_DEFAULT_LOCKFILE "]\n"); printf(" -D Enable debugging to stderr and don't fork\n"); printf(" -t ticket name\n"); printf(" -s site name\n"); printf(" -f ticket attribute: force, only valid when " "granting\n"); printf(" -h Print this help, then exit\n"); } #define OPTION_STRING "c:Dl:t:s:fh" static char *logging_entity = NULL; void safe_copy(char *dest, char *value, size_t buflen, const char *description) { if (strlen(value) >= buflen) { fprintf(stderr, "'%s' exceeds maximum %s length of %ld\n", value, description, (long)(buflen - 1)); exit(EXIT_FAILURE); } strncpy(dest, value, buflen - 1); } static int read_arguments(int argc, char **argv) { int optchar; char *arg1 = argv[1]; char *op = NULL; if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") || !strcmp(arg1, "-h")) { print_usage(); exit(EXIT_SUCCESS); } if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") || !strcmp(arg1, "-V")) { printf("%s %s (built %s %s)\n", argv[0], RELEASE_VERSION, __DATE__, __TIME__); exit(EXIT_SUCCESS); } if (!strcmp(arg1, "arbitrator")) { cl.type = ACT_ARBITRATOR; logging_entity = (char *) DAEMON_NAME "-arbitrator"; optind = 2; } else if (!strcmp(arg1, "site")) { cl.type = ACT_SITE; logging_entity = (char *) DAEMON_NAME "-site"; optind = 2; } else if (!strcmp(arg1, "client")) { cl.type = ACT_CLIENT; if (argc < 3) { print_usage(); exit(EXIT_FAILURE); } op = argv[2]; optind = 3; } else { cl.type = ACT_CLIENT; op = argv[1]; optind = 2; } switch (cl.type) { case ACT_ARBITRATOR: break; case ACT_SITE: break; case ACT_CLIENT: if (!strcmp(op, "list")) cl.op = OP_LIST; else if (!strcmp(op, "grant")) cl.op = OP_GRANT; else if (!strcmp(op, "revoke")) cl.op = OP_REVOKE; else { fprintf(stderr, "client operation \"%s\" is unknown\n", op); exit(EXIT_FAILURE); } break; } while (optind < argc) { optchar = getopt(argc, argv, OPTION_STRING); switch (optchar) { case 'c': safe_copy(cl.configfile, optarg, sizeof(cl.configfile), "config file"); break; case 'D': daemonize = 1; debug_level = 1; log_logfile_priority = LOG_DEBUG; log_syslog_priority = LOG_DEBUG; break; case 'l': safe_copy(cl.lockfile, optarg, sizeof(cl.lockfile), "lock file"); break; case 't': if (cl.op == OP_GRANT || cl.op == OP_REVOKE) { safe_copy(cl.ticket, optarg, sizeof(cl.ticket), "ticket name"); } else { print_usage(); exit(EXIT_FAILURE); } break; case 's': if (cl.op == OP_GRANT || cl.op == OP_REVOKE) { safe_copy(cl.site, optarg, sizeof(cl.ticket), "site name"); } else { print_usage(); exit(EXIT_FAILURE); } break; case 'f': if (cl.op == OP_GRANT) cl.force = 1; else { print_usage(); exit(EXIT_FAILURE); } break; case 'h': print_usage(); exit(EXIT_SUCCESS); break; case ':': case '?': fprintf(stderr, "Please use '-h' for usage.\n"); exit(EXIT_FAILURE); break; default: fprintf(stderr, "unknown option: %s\n", argv[optind]); exit(EXIT_FAILURE); break; }; } return 0; } static void set_scheduler(void) { struct sched_param sched_param; struct rlimit rlimit; int rv; rlimit.rlim_cur = RLIM_INFINITY; rlimit.rlim_max = RLIM_INFINITY; setrlimit(RLIMIT_MEMLOCK, &rlimit); rv = mlockall(MCL_CURRENT | MCL_FUTURE); if (rv < 0) { log_error("mlockall failed"); } rv = sched_get_priority_max(SCHED_RR); if (rv != -1) { sched_param.sched_priority = rv; rv = sched_setscheduler(0, SCHED_RR, &sched_param); if (rv == -1) log_error("could not set SCHED_RR priority %d: %s (%d)", sched_param.sched_priority, strerror(errno), errno); } else { log_error("could not get maximum scheduler priority err %d", errno); } } static void set_oom_adj(int val) { FILE *fp; fp = fopen("/proc/self/oom_adj", "w"); if (!fp) return; fprintf(fp, "%i", val); fclose(fp); } static int do_server(int type) { int fd = -1; int rv = -1; rv = setup(type); if (rv < 0) goto out; if (!daemonize) { if (daemon(0, 0) < 0) { perror("daemon error"); exit(EXIT_FAILURE); } } /* The lock cannot be obtained before the call to daemon(), otherwise the lockfile would contain the pid of the parent, not the daemon. */ fd = lockfile(); if (fd < 0) return fd; if (type == ARBITRATOR) log_info("BOOTH arbitrator daemon started"); else if (type == SITE) log_info("BOOTH cluster site daemon started"); set_scheduler(); set_oom_adj(-16); rv = loop(); out: if (fd >= 0) unlink_lockfile(fd); return rv; } static int do_client(void) { int rv = -1; switch (cl.op) { case OP_LIST: rv = do_list(); break; case OP_GRANT: rv = do_grant(); break; case OP_REVOKE: rv = do_revoke(); break; } return rv; } int main(int argc, char *argv[]) { int rv; memset(&cl, 0, sizeof(cl)); strncpy(cl.configfile, BOOTH_DEFAULT_CONF, BOOTH_PATH_LEN - 1); strncpy(cl.lockfile, BOOTH_DEFAULT_LOCKFILE, BOOTH_PATH_LEN - 1); rv = read_arguments(argc, argv); if (rv < 0) goto out; if (cl.type == ACT_CLIENT) { cl_log_enable_stderr(TRUE); cl_log_set_facility(0); } else { cl_log_set_entity(logging_entity); cl_log_enable_stderr(debug_level ? TRUE : FALSE); cl_log_set_facility(HA_LOG_FACILITY); } cl_inherit_logging_environment(0); switch (cl.type) { case ACT_ARBITRATOR: rv = do_server(ARBITRATOR); break; case ACT_SITE: rv = do_server(SITE); break; case ACT_CLIENT: rv = do_client(); break; } out: return rv ? EXIT_FAILURE : EXIT_SUCCESS; } diff --git a/src/ticket.c b/src/ticket.c index d0f6856..61451e3 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -1,574 +1,591 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include "ticket.h" #include "config.h" #include "pacemaker.h" #include "list.h" #include "log.h" #include "booth.h" #include "timer.h" #include "paxos_lease.h" #include "paxos.h" #define PAXOS_MAGIC 0xDB12 #define TK_LINE 256 struct booth_msghdr { uint16_t magic; uint16_t checksum; uint32_t len; } __attribute__((packed)); struct ticket_msg { char id[BOOTH_NAME_LEN+1]; uint32_t owner; uint32_t expiry; uint32_t ballot; uint32_t result; } __attribute__((packed)); struct ticket { char id[BOOTH_NAME_LEN+1]; pl_handle_t handle; int owner; int expiry; int ballot; unsigned long long expires; struct list_head list; }; static LIST_HEAD(ticket_list); static unsigned char *role; int check_ticket(char *ticket) { int i; if (!booth_conf) return 0; for (i = 0; i < booth_conf->ticket_count; i++) { if (!strcmp(booth_conf->ticket[i].name, ticket)) return 1; } return 0; } int check_site(char *site, int *local) { int i; if (!booth_conf) return 0; for (i = 0; i < booth_conf->node_count; i++) { if (booth_conf->node[i].type == SITE && !strcmp(booth_conf->node[i].addr, site)) { *local = booth_conf->node[i].local; return 1; } } return 0; } static int * ticket_priority(int i) { int j; /* TODO: need more precise check */ for (j = 0; j < booth_conf->node_count; j++) { if (booth_conf->ticket[i].weight[j] == 0) return NULL; } return booth_conf->ticket[i].weight; } static int ticket_get_myid(void) { return booth_transport[booth_conf->proto].get_myid(); } static void end_acquire(pl_handle_t handle, int result) { struct ticket *tk; int found = 0; if (result == 0) { list_for_each_entry(tk, &ticket_list, list) { if (tk->handle == handle) { found = 1; break; } } if (!found) log_error("BUG: ticket handle %ld does not exist", handle); log_info("ticket %s was granted/reovked successfully (site %d)", tk->id, ticket_get_myid()); } } static int ticket_send(unsigned long id, void *value, int len) { int i, rv = -1; struct booth_node *to = NULL; struct booth_msghdr *hdr; void *buf; for (i = 0; i < booth_conf->node_count; i++) { if (booth_conf->node[i].nodeid == id) to = &booth_conf->node[i]; } if (!to) return rv; buf = malloc(sizeof(struct booth_msghdr) + len); if (!buf) return -ENOMEM; memset(buf, 0, sizeof(struct booth_msghdr) + len); hdr = buf; hdr->magic = htons(PAXOS_MAGIC); hdr->len = htonl(sizeof(struct booth_msghdr) + len); memcpy((char *)buf + sizeof(struct booth_msghdr), value, len); rv = booth_transport[booth_conf->proto].send( (unsigned long)to, buf, sizeof(struct booth_msghdr) + len); free(buf); return rv; } static int ticket_broadcast(void *value, int len) { void *buf; struct booth_msghdr *hdr; int rv; buf = malloc(sizeof(struct booth_msghdr) + len); if (!buf) return -ENOMEM; memset(buf, 0, sizeof(struct booth_msghdr) + len); hdr = buf; hdr->magic = htons(PAXOS_MAGIC); hdr->len = htonl(sizeof(struct booth_msghdr) + len); memcpy((char *)buf + sizeof(struct booth_msghdr), value, len); rv = booth_transport[booth_conf->proto].broadcast( buf, sizeof(struct booth_msghdr) + len); free(buf); return rv; } #if 0 static int ticket_read(const void *name, int *owner, int *ballot, unsigned long long *expires) { struct ticket *tk; int found = 0; list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, name)) { found = 1; break; } } if (!found) { log_error("BUG: ticket_read failed (ticket %s does not exist)", (char *)name); return -1; } pcmk_handler.load_ticket(tk->id, &tk->owner, &tk->ballot, &tk->expires); *owner = tk->owner; *expires = tk->expires; *ballot = tk->ballot; return 0; } #endif static int ticket_parse(struct ticket_msg *tmsg) { struct ticket *tk; int found = 0; if (!tmsg->result) return -1; list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, tmsg->id)) { tk->owner = tmsg->owner; tk->expires = current_time() + tmsg->expiry; tk->ballot = tmsg->ballot; found = 1; break; } } if (!found) return -1; else return 0; } static int ticket_catchup(const void *name, int *owner, int *ballot, unsigned long long *expires) { struct ticket *tk; int i, s, buflen, rv = 0; char *buf = NULL; struct boothc_header *h; struct ticket_msg *tmsg; int myid = ticket_get_myid(); if (booth_conf->node[myid].type != ARBITRATOR) { list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, name)) { pcmk_handler.load_ticket(tk->id, &tk->owner, &tk->ballot, &tk->expires); if (current_time() >= tk->expires) { tk->owner = -1; tk->expires = 0; } } } } buflen = sizeof(struct boothc_header) + sizeof(struct ticket_msg); buf = malloc(buflen); if (!buf) return -ENOMEM; memset(buf, 0, buflen); h = (struct boothc_header *)buf; h->magic = BOOTHC_MAGIC; h->version = BOOTHC_VERSION; h->cmd = BOOTHC_CMD_CATCHUP; h->len = sizeof(struct ticket_msg); tmsg = (struct ticket_msg *)(buf + sizeof(struct boothc_header)); for (i = 0; i < booth_conf->node_count; i++) { if (booth_conf->node[i].type == SITE && !(booth_conf->node[i].local)) { strncpy(tmsg->id, name, BOOTH_NAME_LEN + 1); log_debug("attempting catchup from %s", booth_conf->node[i].addr); s = booth_transport[TCP].open(&booth_conf->node[i]); if (s < 0) continue; log_debug("connected to %s", booth_conf->node[i].addr); rv = booth_transport[TCP].send(s, buf, buflen); if (rv < 0) { booth_transport[TCP].close(s); continue; } log_debug("sent catchup command to %s", booth_conf->node[i].addr); memset(tmsg, 0, sizeof(struct ticket_msg)); rv = booth_transport[TCP].recv(s, buf, buflen); if (rv < 0) { booth_transport[TCP].close(s); continue; } booth_transport[TCP].close(s); ticket_parse(tmsg); memset(tmsg, 0, sizeof(struct ticket_msg)); } } list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, name)) { if (booth_conf->node[myid].type != ARBITRATOR) { if (current_time() >= tk->expires) { tk->owner = -1; tk->expires = 0; } pcmk_handler.store_ticket(tk->id, tk->owner, tk->ballot, tk->expires); if (tk->owner == myid) pcmk_handler.grant_ticket(tk->id); else pcmk_handler.revoke_ticket(tk->id); } *owner = tk->owner; *expires = tk->expires; *ballot = tk->ballot; } } free(buf); return rv; } static int ticket_write(pl_handle_t handle, struct paxos_lease_result *result) { struct ticket *tk; int found = 0; list_for_each_entry(tk, &ticket_list, list) { if (tk->handle == handle) { found = 1; break; } } if (!found) { log_error("BUG: ticket_write failed " "(ticket handle %ld does not exist)", handle); return -1; } tk->owner = result->owner; tk->expires = result->expires; tk->ballot = result->ballot; if (tk->owner == ticket_get_myid()) { pcmk_handler.store_ticket(tk->id, tk->owner, tk->ballot, tk->expires); pcmk_handler.grant_ticket(tk->id); } else if (tk->owner == -1) { pcmk_handler.store_ticket(tk->id, tk->owner, tk->ballot, tk->expires); pcmk_handler.revoke_ticket(tk->id); } else pcmk_handler.store_ticket(tk->id, tk->owner, tk->ballot, tk->expires); return 0; } static void ticket_status_recovery(pl_handle_t handle) { paxos_lease_status_recovery(handle); } int ticket_recv(void *msg, int msglen) { struct booth_msghdr *hdr; char *data; hdr = msg; if (ntohs(hdr->magic) != PAXOS_MAGIC || ntohl(hdr->len) != msglen) { log_error("message received error"); return -1; } data = (char *)msg + sizeof(struct booth_msghdr); return paxos_lease_on_receive(data, msglen - sizeof(struct booth_msghdr)); } int grant_ticket(char *ticket, int force) { struct ticket *tk; int found = 0; if (force) { pcmk_handler.store_ticket(ticket, ticket_get_myid(), 0, -1); pcmk_handler.grant_ticket(ticket); return BOOTHC_RLT_SYNC_SUCC; } list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, ticket)) { found = 1; break; } } if (!found) { log_error("ticket %s does not exist", ticket); return BOOTHC_RLT_SYNC_FAIL; } if (tk->owner == ticket_get_myid()) return BOOTHC_RLT_SYNC_SUCC; else { paxos_lease_acquire(tk->handle, 1, end_acquire); return BOOTHC_RLT_ASYNC; } } int revoke_ticket(char *ticket, int force) { struct ticket *tk; int found = 0; list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, ticket)) { found = 1; break; } } if (!found) { log_error("ticket %s does not exist", ticket); return BOOTHC_RLT_SYNC_FAIL; } if (force) { pcmk_handler.store_ticket(tk->id, -1, 0, 0); pcmk_handler.revoke_ticket(tk->id); } if (tk->owner == -1) return BOOTHC_RLT_SYNC_SUCC; else { paxos_lease_release(tk->handle); return BOOTHC_RLT_ASYNC; } } +int get_ticket_info(char *name, int *owner, int *expires) +{ + struct ticket *tk; + + list_for_each_entry(tk, &ticket_list, list) { + if (!strncmp(tk->id, name, BOOTH_NAME_LEN + 1)) { + if(owner) + *owner = tk->owner; + if(expires) + *expires = tk->expires; + return 0; + } + } + + return -1; +} + int list_ticket(char **pdata, unsigned int *len) { struct ticket *tk; char timeout_str[100]; char node_name[BOOTH_NAME_LEN]; char tmp[TK_LINE]; *pdata = NULL; *len = 0; list_for_each_entry(tk, &ticket_list, list) { memset(tmp, 0, TK_LINE); strncpy(timeout_str, "INF", sizeof(timeout_str)); strncpy(node_name, "None", sizeof(node_name)); if (tk->owner < MAX_NODES && tk->owner > -1) strncpy(node_name, booth_conf->node[tk->owner].addr, sizeof(node_name)); if (tk->expires != 0) strftime(timeout_str, sizeof(timeout_str), "%Y/%m/%d %H:%M:%S", localtime((time_t *)&tk->expires)); snprintf(tmp, TK_LINE, "ticket: %s, owner: %s, expires: %s\n", tk->id, node_name, timeout_str); *pdata = realloc(*pdata, *len + TK_LINE); if (*pdata == NULL) return -ENOMEM; memset(*pdata + *len, 0, TK_LINE); memcpy(*pdata + *len, tmp, TK_LINE); *len += TK_LINE; } return 0; } int catchup_ticket(char **pdata, unsigned int len) { struct ticket_msg *tmsg; struct ticket *tk; assert(len == sizeof(struct ticket_msg)); tmsg = (struct ticket_msg *)(*pdata); list_for_each_entry(tk, &ticket_list, list) { if (!strcmp(tk->id, tmsg->id) && tk->owner == ticket_get_myid() && current_time() < tk->expires) { tmsg->owner = tk->owner; tmsg->expiry = tk->expires - current_time(); tmsg->ballot = tk->ballot; tmsg->result = 1; break; } } if (!tmsg->result) memset(*pdata, 0, len); return 0; } const struct paxos_lease_operations ticket_operations = { .get_myid = ticket_get_myid, .send = ticket_send, .broadcast = ticket_broadcast, .catchup = ticket_catchup, .notify = ticket_write, }; int setup_ticket(void) { struct ticket *tk, *tmp; int i, rv; pl_handle_t plh; int myid; role = malloc(booth_conf->node_count * sizeof(unsigned char)); if (!role) return -ENOMEM; memset(role, 0, booth_conf->node_count * sizeof(unsigned char)); for (i = 0; i < booth_conf->node_count; i++) { if (booth_conf->node[i].type == SITE) role[i] = PROPOSER | ACCEPTOR | LEARNER; else if (booth_conf->node[i].type == ARBITRATOR) role[i] = ACCEPTOR | LEARNER; } for (i = 0; i < booth_conf->ticket_count; i++) { tk = malloc(sizeof(struct ticket)); if (!tk) { rv = -ENOMEM; goto out; } memset(tk, 0, sizeof(struct ticket)); strcpy(tk->id, booth_conf->ticket[i].name); tk->owner = -1; tk->expiry = booth_conf->ticket[i].expiry; if (!tk->expiry) tk->expiry = DEFAULT_TICKET_EXPIRY; list_add_tail(&tk->list, &ticket_list); plh = paxos_lease_init(tk->id, BOOTH_NAME_LEN, tk->expiry, booth_conf->node_count, 1, role, ticket_priority(i), &ticket_operations); if (plh <= 0) { log_error("paxos lease initialization failed"); rv = plh; goto out; } tk->handle = plh; } myid = ticket_get_myid(); assert(myid < booth_conf->node_count); if (role[myid] & ACCEPTOR) { list_for_each_entry(tk, &ticket_list, list) { ticket_status_recovery(tk->handle); } } return 0; out: list_for_each_entry_safe(tk, tmp, &ticket_list, list) { list_del(&tk->list); } free(role); return rv; } diff --git a/src/ticket.h b/src/ticket.h index 25e029a..92928ab 100644 --- a/src/ticket.h +++ b/src/ticket.h @@ -1,33 +1,34 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _TICKET_H #define _TICKET_H #define DEFAULT_TICKET_EXPIRY 600 int check_ticket(char *ticket); int check_site(char *site, int *local); int grant_ticket(char *ticket, int force); int revoke_ticket(char *ticket, int force); int list_ticket(char **pdata, unsigned int *len); int catchup_ticket(char **pdata, unsigned int len); int ticket_recv(void *msg, int msglen); int setup_ticket(void); +int get_ticket_info(char *name, int *owner, int *expires); #endif /* _TICKET_H */ diff --git a/src/transport.c b/src/transport.c index ff7173e..b60bde8 100644 --- a/src/transport.c +++ b/src/transport.c @@ -1,651 +1,655 @@ /* * Copyright (C) 2011 Jiaju Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include "list.h" #include "booth.h" #include "log.h" #include "config.h" #include "paxos_lease.h" #include "transport.h" #define BOOTH_IPADDR_LEN (sizeof(struct in6_addr)) #define NETLINK_BUFSIZE 16384 #define SOCKET_BUFFER_SIZE 160000 #define FRAME_SIZE_MAX 10000 extern struct client *client; extern struct pollfd *pollfd; static struct booth_node local; struct tcp_conn { int s; struct sockaddr to; struct list_head list; }; static LIST_HEAD(tcp); struct udp_context { int s; struct iovec iov_recv; char iov_buffer[FRAME_SIZE_MAX]; } udp; static int (*deliver_fn) (void *msg, int msglen); static int ipaddr_to_sockaddr(struct booth_node *ipaddr, uint16_t port, struct sockaddr_storage *saddr, int *addrlen) { int rv = -1; if (ipaddr->family == AF_INET) { struct in_addr addr; struct sockaddr_in *sin = (struct sockaddr_in *)saddr; memset(sin, 0, sizeof(struct sockaddr_in)); sin->sin_family = ipaddr->family; sin->sin_port = htons(port); inet_pton(AF_INET, ipaddr->addr, &addr); memcpy(&sin->sin_addr, &addr, sizeof(struct in_addr)); *addrlen = sizeof(struct sockaddr_in); rv = 0; } if (ipaddr->family == AF_INET6) { struct in6_addr addr; struct sockaddr_in6 *sin = (struct sockaddr_in6 *)saddr; memset(sin, 0, sizeof(struct sockaddr_in6)); sin->sin6_family = ipaddr->family; sin->sin6_port = htons(port); sin->sin6_scope_id = 2; inet_pton(AF_INET6, ipaddr->addr, &addr); memcpy(&sin->sin6_addr, &addr, sizeof(struct in6_addr)); *addrlen = sizeof(struct sockaddr_in6); rv = 0; } return rv; } static void parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int len) { while (RTA_OK(rta, len)) { if (rta->rta_type <= max) tb[rta->rta_type] = rta; rta = RTA_NEXT(rta,len); } } static int find_myself(struct booth_node *node) { int fd, addrlen, found = 0; struct sockaddr_nl nladdr; unsigned char ndaddr[BOOTH_IPADDR_LEN]; unsigned char ipaddr[BOOTH_IPADDR_LEN]; static char rcvbuf[NETLINK_BUFSIZE]; struct { struct nlmsghdr nlh; struct rtgenmsg g; } req; memset(ipaddr, 0, BOOTH_IPADDR_LEN); memset(ndaddr, 0, BOOTH_IPADDR_LEN); if (node->family == AF_INET) { inet_pton(AF_INET, node->addr, ndaddr); addrlen = sizeof(struct in_addr); } else if (node->family == AF_INET6) { inet_pton(AF_INET6, node->addr, ndaddr); addrlen = sizeof(struct in6_addr); } else { log_error("invalid INET family"); return 0; } fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE); if (fd < 0) { log_error("failed to create netlink socket"); return 0; } setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); memset(&nladdr, 0, sizeof(nladdr)); nladdr.nl_family = AF_NETLINK; memset(&req, 0, sizeof(req)); req.nlh.nlmsg_len = sizeof(req); req.nlh.nlmsg_type = RTM_GETADDR; req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST; req.nlh.nlmsg_pid = 0; req.nlh.nlmsg_seq = 1; req.g.rtgen_family = AF_INET; if (sendto(fd, (void *)&req, sizeof(req), 0, (struct sockaddr*)&nladdr, sizeof(nladdr)) < 0) { close(fd); log_error("failed to send data to netlink socket"); return 0; } while (1) { int status; struct nlmsghdr *h; struct iovec iov = { rcvbuf, sizeof(rcvbuf) }; struct msghdr msg = { (void *)&nladdr, sizeof(nladdr), &iov, 1, NULL, 0, 0 }; status = recvmsg(fd, &msg, 0); if (!status) { close(fd); log_error("failed to recvmsg from netlink socket"); return 0; } h = (struct nlmsghdr *)rcvbuf; if (h->nlmsg_type == NLMSG_DONE) break; if (h->nlmsg_type == NLMSG_ERROR) { close(fd); log_error("netlink socket recvmsg error"); return 0; } while (NLMSG_OK(h, status)) { if (h->nlmsg_type == RTM_NEWADDR) { struct ifaddrmsg *ifa = NLMSG_DATA(h); struct rtattr *tb[IFA_MAX+1]; int len = h->nlmsg_len - NLMSG_LENGTH(sizeof(*ifa)); memset(tb, 0, sizeof(tb)); parse_rtattr(tb, IFA_MAX, IFA_RTA(ifa), len); memcpy(ipaddr, RTA_DATA(tb[IFA_ADDRESS]), BOOTH_IPADDR_LEN); if (!memcmp(ipaddr, ndaddr, addrlen)) { found = 1; goto out; } } h = NLMSG_NEXT(h, status); } } out: close(fd); return found; } static int load_myid(void) { int i; for (i = 0; i < booth_conf->node_count; i++) { if (find_myself(&booth_conf->node[i])) { booth_conf->node[i].local = 1; if (!local.family) memcpy(&local, &booth_conf->node[i], sizeof(struct booth_node)); return booth_conf->node[i].nodeid; } } return -1; } static int booth_get_myid(void) { if (local.local) return local.nodeid; else return -1; } static void process_dead(int ci) { struct tcp_conn *conn, *safe; list_for_each_entry_safe(conn, safe, &tcp, list) { if (conn->s == client[ci].fd) { list_del(&conn->list); free(conn); break; } } close(client[ci].fd); client[ci].workfn = NULL; client[ci].fd = -1; pollfd[ci].fd = -1; } static void process_tcp_listener(int ci) { int fd, i, one = 1; socklen_t addrlen; struct sockaddr addr; struct tcp_conn *conn; fd = accept(client[ci].fd, &addr, &addrlen); if (fd < 0) { log_error("process_tcp_listener: accept error %d %d", fd, errno); return; } setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one)); conn = malloc(sizeof(struct tcp_conn)); if (!conn) { log_error("failed to alloc mem"); return; } memset(conn, 0, sizeof(struct tcp_conn)); conn->s = fd; memcpy(&conn->to, &addr, sizeof(struct sockaddr)); list_add_tail(&conn->list, &tcp); i = client_add(fd, process_connection, process_dead); log_debug("client connection %d fd %d", i, fd); } static int setup_tcp_listener(void) { struct sockaddr_storage sockaddr; int s, addrlen, rv; s = socket(local.family, SOCK_STREAM, 0); if (s == -1) { log_error("failed to create tcp socket %s", strerror(errno)); return s; } ipaddr_to_sockaddr(&local, BOOTH_CMD_PORT, &sockaddr, &addrlen); rv = bind(s, (struct sockaddr *)&sockaddr, addrlen); if (rv == -1) { log_error("failed to bind socket %s", strerror(errno)); return rv; } rv = listen(s, 5); if (rv == -1) { log_error("failed to listen on socket %s", strerror(errno)); return rv; } return s; } static int booth_tcp_init(void * unused __attribute__((unused))) { int rv; if (!local.local) return -1; rv = setup_tcp_listener(); if (rv < 0) return rv; client_add(rv, process_tcp_listener, NULL); return 0; } static int connect_nonb(int sockfd, const struct sockaddr *saptr, socklen_t salen, int sec) { int flags, n, error; socklen_t len; fd_set rset, wset; struct timeval tval; flags = fcntl(sockfd, F_GETFL, 0); fcntl(sockfd, F_SETFL, flags | O_NONBLOCK); error = 0; if ( (n = connect(sockfd, saptr, salen)) < 0) if (errno != EINPROGRESS) return -1; if (n == 0) goto done; /* connect completed immediately */ FD_ZERO(&rset); FD_SET(sockfd, &rset); wset = rset; tval.tv_sec = sec; tval.tv_usec = 0; if ((n = select(sockfd + 1, &rset, &wset, NULL, sec ? &tval : NULL)) == 0) { /* leave outside function to close */ /* timeout */ /* close(sockfd); */ errno = ETIMEDOUT; return -1; } if (FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) { len = sizeof(error); if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) return -1; /* Solaris pending error */ } else { log_error("select error: sockfd not set"); return -1; } done: fcntl(sockfd, F_SETFL, flags); /* restore file status flags */ if (error) { /* leave outside function to close */ /* close(sockfd); */ errno = error; return -1; } return 0; } static int booth_tcp_open(struct booth_node *to) { struct sockaddr_storage sockaddr; struct tcp_conn *conn; int addrlen, rv, s, found = 0; ipaddr_to_sockaddr(to, BOOTH_CMD_PORT, &sockaddr, &addrlen); list_for_each_entry(conn, &tcp, list) { if (!memcmp(&conn->to, &sockaddr, sizeof(sockaddr))) { found = 1; break; } } if (!found) { s = socket(BOOTH_PROTO_FAMILY, SOCK_STREAM, 0); if (s == -1) return -1; rv = connect_nonb(s, (struct sockaddr *)&sockaddr, addrlen, 10); if (rv == -1) { - log_error("connection to %s timeout", to->addr); + if( errno == ETIMEDOUT) + log_error("connection to %s timeout", to->addr); + else + log_error("connection to %s error %s", to->addr, + strerror(errno)); close(s); return rv; } conn = malloc(sizeof(struct tcp_conn)); if (!conn) { log_error("failed to alloc mem"); close(s); return -ENOMEM; } memset(conn, 0, sizeof(struct tcp_conn)); conn->s = s; memcpy(&conn->to, &sockaddr, sizeof(struct sockaddr)); list_add_tail(&conn->list, &tcp); } return conn->s; } static int booth_tcp_send(unsigned long to, void *buf, int len) { return do_write(to, buf, len); } static int booth_tcp_recv(unsigned long from, void *buf, int len) { return do_read(from, buf, len); } static int booth_tcp_close(unsigned long s) { struct tcp_conn *conn; list_for_each_entry(conn, &tcp, list) { if (conn->s == s) { list_del(&conn->list); close(s); free(conn); goto out; } } out: return 0; } static int booth_tcp_exit(void) { return 0; } static int setup_udp_server(void) { struct sockaddr_storage sockaddr; int addrlen, rv; unsigned int recvbuf_size; udp.s = socket(local.family, SOCK_DGRAM, 0); if (udp.s == -1) { log_error("failed to create udp socket %s", strerror(errno)); return -1; } rv = fcntl(udp.s, F_SETFL, O_NONBLOCK); if (rv == -1) { log_error("failed to set non-blocking operation " "on udp socket: %s", strerror(errno)); close(udp.s); return -1; } ipaddr_to_sockaddr(&local, booth_conf->port, &sockaddr, &addrlen); rv = bind(udp.s, (struct sockaddr *)&sockaddr, addrlen); if (rv == -1) { log_error("failed to bind socket %s", strerror(errno)); close(udp.s); return -1; } recvbuf_size = SOCKET_BUFFER_SIZE; rv = setsockopt(udp.s, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, sizeof(recvbuf_size)); if (rv == -1) { log_error("failed to set recvbuf size"); close(udp.s); return -1; } return udp.s; } static void process_recv(int ci) { struct msghdr msg_recv; struct sockaddr_storage system_from; int received; unsigned char *msg_offset; msg_recv.msg_name = &system_from; msg_recv.msg_namelen = sizeof (struct sockaddr_storage); msg_recv.msg_iov = &udp.iov_recv; msg_recv.msg_iovlen = 1; msg_recv.msg_control = 0; msg_recv.msg_controllen = 0; msg_recv.msg_flags = 0; received = recvmsg(client[ci].fd, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT); if (received == -1) return; msg_offset = udp.iov_recv.iov_base; deliver_fn(msg_offset, received); } static int booth_udp_init(void *f) { int myid = -1; memset(&local, 0, sizeof(struct booth_node)); myid = load_myid(); if (myid < 0) { log_error("can't find myself in config file"); return -1; } memset(&udp, 0, sizeof(struct udp_context)); udp.iov_recv.iov_base = udp.iov_buffer; udp.iov_recv.iov_len = FRAME_SIZE_MAX; udp.s = setup_udp_server(); if (udp.s == -1) return -1; deliver_fn = f; client_add(udp.s, process_recv, NULL); return 0; } static int booth_udp_send(unsigned long to, void *buf, int len) { struct msghdr msg; struct sockaddr_storage sockaddr; struct iovec iovec; unsigned int iov_len; int addrlen = 0, rv; iovec.iov_base = (void *)buf; iovec.iov_len = len; iov_len = 1; ipaddr_to_sockaddr((struct booth_node *)to, booth_conf->port, &sockaddr, &addrlen); msg.msg_name = &sockaddr; msg.msg_namelen = addrlen; msg.msg_iov = (void *)&iovec; msg.msg_iovlen = iov_len; msg.msg_control = 0; msg.msg_controllen = 0; msg.msg_flags = 0; rv = sendmsg(udp.s, &msg, MSG_NOSIGNAL); if (rv < 0) return rv; return 0; } static int booth_udp_broadcast(void *buf, int len) { int i; if (!booth_conf || !booth_conf->node_count) return -1; for (i = 0; i < booth_conf->node_count; i++) booth_udp_send((unsigned long)&booth_conf->node[i], buf, len); return 0; } static int booth_udp_exit(void) { return 0; } /* SCTP transport layer has not been developed yet */ static int booth_sctp_init(void *f __attribute__((unused))) { return 0; } static int booth_sctp_send(unsigned long to __attribute__((unused)), void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int booth_sctp_broadcast(void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int booth_sctp_exit(void) { return 0; } struct booth_transport booth_transport[] = { { .name = "TCP", .init = booth_tcp_init, .get_myid = booth_get_myid, .open = booth_tcp_open, .send = booth_tcp_send, .recv = booth_tcp_recv, .close = booth_tcp_close, .exit = booth_tcp_exit }, { .name = "UDP", .init = booth_udp_init, .get_myid = booth_get_myid, .send = booth_udp_send, .broadcast = booth_udp_broadcast, .exit = booth_udp_exit }, { .name = "SCTP", .init = booth_sctp_init, .get_myid = booth_get_myid, .send = booth_sctp_send, .broadcast = booth_sctp_broadcast, .exit = booth_sctp_exit } };