Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/src/main.c b/src/main.c
index 9c7165b..1aa2668 100644
--- a/src/main.c
+++ b/src/main.c
@@ -1,1013 +1,1067 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sched.h>
#include <errno.h>
#include <limits.h>
#include <sys/file.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/poll.h>
#include <fcntl.h>
#include <string.h>
#include "log.h"
#include "booth.h"
#include "config.h"
#include "transport.h"
#include "timer.h"
#include "pacemaker.h"
#include "ticket.h"
#define RELEASE_VERSION "1.0"
#define CLIENT_NALLOC 32
int log_logfile_priority = LOG_INFO;
int log_syslog_priority = LOG_ERR;
int log_stderr_priority = LOG_ERR;
static int client_maxi;
static int client_size = 0;
struct client *client = NULL;
struct pollfd *pollfd = NULL;
int poll_timeout = -1;
typedef enum {
ACT_ARBITRATOR = 1,
ACT_SITE,
ACT_CLIENT,
} booth_role_t;
typedef enum {
OP_LIST = 1,
OP_GRANT,
OP_REVOKE,
} operation_t;
struct command_line {
int type; /* ACT_ */
int op; /* OP_ */
int debug;
int force;
int expiry;
char site[BOOTH_NAME_LEN];
char ticket[BOOTH_NAME_LEN];
};
static struct command_line cl;
int do_read(int fd, void *buf, size_t count)
{
int rv, off = 0;
while (off < count) {
rv = read(fd, (char *)buf + off, count - off);
if (rv == 0)
return -1;
if (rv == -1 && errno == EINTR)
continue;
if (rv == -1)
return -1;
off += rv;
}
return 0;
}
int do_write(int fd, void *buf, size_t count)
{
int rv, off = 0;
retry:
rv = write(fd, (char *)buf + off, count);
if (rv == -1 && errno == EINTR)
goto retry;
if (rv < 0) {
log_error("write errno %d", errno);
return rv;
}
if (rv != count) {
count -= rv;
off += rv;
goto retry;
}
return 0;
}
static int do_connect(const char *sock_path)
{
struct sockaddr_un sun;
socklen_t addrlen;
int rv, fd;
fd = socket(PF_UNIX, SOCK_STREAM, 0);
if (fd < 0)
goto out;
memset(&sun, 0, sizeof(sun));
sun.sun_family = AF_UNIX;
strcpy(&sun.sun_path[1], sock_path);
addrlen = sizeof(sa_family_t) + strlen(sun.sun_path+1) + 1;
rv = connect(fd, (struct sockaddr *) &sun, addrlen);
if (rv < 0) {
close(fd);
fd = rv;
}
out:
return fd;
}
static void init_header(struct boothc_header *h, int cmd, int option,
int expiry, int result, int data_len)
{
memset(h, 0, sizeof(struct boothc_header));
h->magic = BOOTHC_MAGIC;
h->version = BOOTHC_VERSION;
h->len = data_len;
h->cmd = cmd;
h->option = option;
h->expiry = expiry;
h->result = result;
}
static void client_alloc(void)
{
int i;
if (!client) {
client = malloc(CLIENT_NALLOC * sizeof(struct client));
pollfd = malloc(CLIENT_NALLOC * sizeof(struct pollfd));
} else {
client = realloc(client, (client_size + CLIENT_NALLOC) *
sizeof(struct client));
pollfd = realloc(pollfd, (client_size + CLIENT_NALLOC) *
sizeof(struct pollfd));
if (!pollfd)
log_error("can't alloc for pollfd");
}
if (!client || !pollfd)
log_error("can't alloc for client array");
for (i = client_size; i < client_size + CLIENT_NALLOC; i++) {
client[i].workfn = NULL;
client[i].deadfn = NULL;
client[i].fd = -1;
pollfd[i].fd = -1;
pollfd[i].revents = 0;
}
client_size += CLIENT_NALLOC;
}
static void client_dead(int ci)
{
close(client[ci].fd);
client[ci].workfn = NULL;
client[ci].fd = -1;
pollfd[ci].fd = -1;
}
int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci))
{
int i;
if (!client)
client_alloc();
again:
for (i = 0; i < client_size; i++) {
if (client[i].fd == -1) {
client[i].workfn = workfn;
if (deadfn)
client[i].deadfn = deadfn;
else
client[i].deadfn = client_dead;
client[i].fd = fd;
pollfd[i].fd = fd;
pollfd[i].events = POLLIN;
if (i > client_maxi)
client_maxi = i;
return i;
}
}
client_alloc();
goto again;
}
static int setup_listener(const char *sock_path)
{
struct sockaddr_un addr;
socklen_t addrlen;
int rv, s;
s = socket(AF_LOCAL, SOCK_STREAM, 0);
if (s < 0) {
log_error("socket error %d %d", s, errno);
return s;
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_LOCAL;
strcpy(&addr.sun_path[1], sock_path);
addrlen = sizeof(sa_family_t) + strlen(addr.sun_path+1) + 1;
rv = bind(s, (struct sockaddr *) &addr, addrlen);
if (rv < 0) {
log_error("bind error %d %d", rv, errno);
close(s);
return rv;
}
rv = listen(s, 5);
if (rv < 0) {
log_error("listen error %d %d", rv, errno);
close(s);
return rv;
}
return s;
}
void process_connection(int ci)
{
struct boothc_header h;
char *data = NULL;
char *site, *ticket;
int local, rv;
rv = do_read(client[ci].fd, &h, sizeof(h));
if (rv < 0) {
log_error("connection %d read error %d", ci, rv);
return;
}
if (h.magic != BOOTHC_MAGIC) {
log_error("connection %d magic error %x", ci, h.magic);
return;
}
if (h.version != BOOTHC_VERSION) {
log_error("connection %d version error %x", ci, h.version);
return;
}
if (h.len) {
data = malloc(h.len);
if (!data) {
log_error("process_connection no mem %u", h.len);
return;
}
memset(data, 0, h.len);
rv = do_read(client[ci].fd, data, h.len);
if (rv < 0) {
log_error("connection %d read data error %d", ci, rv);
goto out;
}
}
switch (h.cmd) {
case BOOTHC_CMD_LIST:
break;
case BOOTHC_CMD_GRANT:
site = data;
ticket = data + BOOTH_NAME_LEN;
if (!check_ticket(ticket)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (!check_site(site, &local)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (local)
h.result = grant_ticket(ticket, h.option, h.expiry);
else
h.result = BOOTHC_RLT_REMOTE_OP;
break;
case BOOTHC_CMD_REVOKE:
+ site = data;
+ ticket = data + BOOTH_NAME_LEN;
+ if (!check_ticket(ticket)) {
+ h.result = BOOTHC_RLT_INVALID_ARG;
+ goto reply;
+ }
+ if (!check_site(site, &local)) {
+ h.result = BOOTHC_RLT_INVALID_ARG;
+ goto reply;
+ }
+ if (local)
+ h.result = revoke_ticket(ticket, h.option);
+ else
+ h.result = BOOTHC_RLT_REMOTE_OP;
break;
default:
log_error("connection %d cmd %x unknown", ci, h.cmd);
break;
}
reply:
rv = do_write(client[ci].fd, &h, sizeof(h));
if (rv < 0)
log_error("connection %d write error %d", ci, rv);
out:
free(data);
}
static void process_listener(int ci)
{
int fd, i;
fd = accept(client[ci].fd, NULL, NULL);
if (fd < 0) {
log_error("process_listener: accept error %d %d", fd, errno);
return;
}
i = client_add(fd, process_connection, NULL);
log_debug("client connection %d fd %d", i, fd);
}
static int setup_config(int type)
{
int rv;
rv = read_config(BOOTH_DEFAULT_CONF);
if (rv < 0)
goto out;
rv = check_config(type);
if (rv < 0)
goto out;
out:
return rv;
}
static int setup_transport(void)
{
int rv;
rv = booth_transport[booth_conf->proto].init(ticket_recv);
if (rv < 0)
goto out;
rv = booth_transport[TCP].init(NULL);
if (rv < 0)
goto out;
out:
return rv;
}
static int setup_timer(void)
{
return timerlist_init();
}
static int loop(int type)
{
void (*workfn) (int ci);
void (*deadfn) (int ci);
int rv, i;
rv = setup_config(type);
if (rv < 0)
goto fail;
rv = setup_timer();
if (rv < 0)
goto fail;
rv = setup_transport();
if (rv < 0)
goto fail;
rv = setup_ticket();
if (rv < 0)
goto fail;
rv = setup_listener(BOOTHC_SOCK_PATH);
if (rv < 0)
goto fail;
client_add(rv, process_listener, NULL);
while (1) {
rv = poll(pollfd, client_maxi + 1, poll_timeout);
if (rv == -1 && errno == EINTR)
continue;
if (rv < 0) {
log_error("poll errno %d", errno);
goto fail;
}
for (i = 0; i <= client_maxi; i++) {
if (client[i].fd < 0)
continue;
if (pollfd[i].revents & POLLIN) {
workfn = client[i].workfn;
if (workfn)
workfn(i);
}
if (pollfd[i].revents &
(POLLERR | POLLHUP | POLLNVAL)) {
deadfn = client[i].deadfn;
if (deadfn)
deadfn(i);
}
}
process_timerlist();
}
return 0;
fail:
return -1;
}
static int do_list(void)
{
struct boothc_header h, *rh;
char *reply = NULL, *data;
int data_len;
int fd, rv;
init_header(&h, BOOTHC_CMD_LIST, 0, 0, 0, 0);
fd = do_connect(BOOTHC_SOCK_PATH);
if (fd < 0) {
rv = fd;
goto out;
}
rv = do_write(fd, &h, sizeof(h));
if (rv < 0)
goto out_close;
reply = malloc(sizeof(struct boothc_header));
if (!reply) {
rv = -ENOMEM;
goto out_close;
}
rv = do_read(fd, reply, sizeof(struct boothc_header));
if (rv < 0)
goto out_free;
rh = (struct boothc_header *)reply;
data_len = rh->len;
reply = realloc(reply, sizeof(struct boothc_header) + data_len);
if (!reply) {
rv = -ENOMEM;
goto out_free;
}
data = reply + sizeof(struct boothc_header);
rv = do_read(fd, data, data_len);
if (rv < 0)
goto out_free;
do_write(STDOUT_FILENO, data, data_len);
rv = 0;
out_free:
free(reply);
out_close:
close(fd);
out:
return rv;
}
static int do_grant(void)
{
char *buf;
struct boothc_header *h, reply;
int buflen;
uint32_t force = 0;
int fd, rv;
buflen = sizeof(struct boothc_header) +
sizeof(cl.site) + sizeof(cl.ticket);
buf = malloc(buflen);
if (!buf) {
rv = -ENOMEM;
goto out;
}
h = (struct boothc_header *)buf;
if (cl.force)
force = BOOTHC_OPT_FORCE;
init_header(h, BOOTHC_CMD_GRANT, force, cl.expiry, 0,
sizeof(cl.site) + sizeof(cl.ticket));
strcpy(buf + sizeof(struct boothc_header), cl.site);
strcpy(buf + sizeof(struct boothc_header) + sizeof(cl.site), cl.ticket);
fd = do_connect(BOOTHC_SOCK_PATH);
if (fd < 0) {
rv = fd;
goto out_free;
}
rv = do_write(fd, buf, buflen);
if (rv < 0)
goto out_close;
rv = do_read(fd, &reply, sizeof(struct boothc_header));
if (rv < 0)
goto out_close;
if (reply.result == BOOTHC_RLT_INVALID_ARG) {
log_info("invalid argument!");
rv = -1;
goto out_close;
}
if (reply.result == BOOTHC_RLT_REMOTE_OP) {
struct booth_node to;
int s;
memset(&to, 0, sizeof(struct booth_node));
to.family = BOOTH_PROTO_FAMILY;
strcpy(to.addr, cl.site);
s = booth_transport[TCP].open(&to);
if (s < 0)
goto out_close;
rv = booth_transport[TCP].send(s, buf, buflen);
if (rv < 0) {
booth_transport[TCP].close(s);
goto out_close;
}
rv = booth_transport[TCP].recv(s, &reply,
- sizeof(struct boothc_header));
+ sizeof(struct boothc_header));
if (rv < 0) {
booth_transport[TCP].close(s);
goto out_close;
}
booth_transport[TCP].close(s);
}
if (reply.result == BOOTHC_RLT_ASYNC) {
log_info("grant command sent, but result is async.");
rv = 0;
} else if (reply.result == BOOTHC_RLT_SYNC_SUCC) {
log_info("grant succeeded!");
rv = 0;
} else if (reply.result == BOOTHC_RLT_SYNC_FAIL) {
log_info("grant failed!");
rv = 0;
} else {
log_error("internal error!");
rv = -1;
}
out_close:
close(fd);
out_free:
free(buf);
out:
return rv;
}
static int do_revoke(void)
{
char *buf;
struct boothc_header *h, reply;
int buflen;
+ uint32_t force = 0;
int fd, rv;
- buflen = sizeof(struct boothc_header) + sizeof(cl.ticket);
+ buflen = sizeof(struct boothc_header) +
+ sizeof(cl.site) + sizeof(cl.ticket);
buf = malloc(buflen);
if (!buf) {
rv = -ENOMEM;
goto out;
}
h = (struct boothc_header *)buf;
- init_header(h, BOOTHC_CMD_REVOKE, 0, 0, 0, sizeof(cl.ticket));
- strcpy(buf + sizeof(struct boothc_header), cl.ticket);
+ if (cl.force)
+ force = BOOTHC_OPT_FORCE;
+ init_header(h, BOOTHC_CMD_REVOKE, force, 0, 0,
+ sizeof(cl.site) + sizeof(cl.ticket));
+ strcpy(buf + sizeof(struct boothc_header), cl.site);
+ strcpy(buf + sizeof(struct boothc_header) + sizeof(cl.site), cl.ticket);
+
fd = do_connect(BOOTHC_SOCK_PATH);
if (fd < 0) {
rv = fd;
goto out_free;
}
rv = do_write(fd, buf, buflen);
if (rv < 0)
goto out_close;
rv = do_read(fd, &reply, sizeof(struct boothc_header));
if (rv < 0)
goto out_close;
+
+ if (reply.result == BOOTHC_RLT_INVALID_ARG) {
+ log_info("invalid argument!");
+ rv = -1;
+ goto out_close;
+ }
+
+ if (reply.result == BOOTHC_RLT_REMOTE_OP) {
+ struct booth_node to;
+ int s;
+
+ memset(&to, 0, sizeof(struct booth_node));
+ to.family = BOOTH_PROTO_FAMILY;
+ strcpy(to.addr, cl.site);
+
+ s = booth_transport[TCP].open(&to);
+ if (s < 0)
+ goto out_close;
+
+ rv = booth_transport[TCP].send(s, buf, buflen);
+ if (rv < 0) {
+ booth_transport[TCP].close(s);
+ goto out_close;
+ }
+ rv = booth_transport[TCP].recv(s, &reply,
+ sizeof(struct boothc_header));
+ if (rv < 0) {
+ booth_transport[TCP].close(s);
+ goto out_close;
+ }
+ booth_transport[TCP].close(s);
+ }
+
if (reply.result == BOOTHC_RLT_ASYNC) {
log_info("revoke command sent, but result is async.");
rv = 0;
} else if (reply.result == BOOTHC_RLT_SYNC_SUCC) {
log_info("revoke succeeded!");
rv = 0;
} else if (reply.result == BOOTHC_RLT_SYNC_FAIL) {
log_info("revoke failed!");
rv = 0;
} else {
log_error("internal error!");
rv = -1;
}
out_close:
close(fd);
out_free:
free(buf);
out:
return rv;
}
static int lockfile(void)
{
char path[PATH_MAX];
char buf[16];
struct flock lock;
int fd, rv;
snprintf(path, PATH_MAX, "%s/%s", BOOTH_RUN_DIR, BOOTH_LOCKFILE_NAME);
fd = open(path, O_CREAT|O_WRONLY, 0666);
if (fd < 0) {
log_error("lockfile open error %s: %s",
path, strerror(errno));
return -1;
}
lock.l_type = F_WRLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
rv = fcntl(fd, F_SETLK, &lock);
if (rv < 0) {
log_error("lockfile setlk error %s: %s",
path, strerror(errno));
goto fail;
}
rv = ftruncate(fd, 0);
if (rv < 0) {
log_error("lockfile truncate error %s: %s",
path, strerror(errno));
goto fail;
}
memset(buf, 0, sizeof(buf));
snprintf(buf, sizeof(buf), "%d\n", getpid());
rv = write(fd, buf, strlen(buf));
if (rv <= 0) {
log_error("lockfile write error %s: %s",
path, strerror(errno));
goto fail;
}
return fd;
fail:
close(fd);
return -1;
}
static void unlink_lockfile(int fd)
{
char path[PATH_MAX];
snprintf(path, PATH_MAX, "%s/%s", BOOTH_RUN_DIR, BOOTH_LOCKFILE_NAME);
unlink(path);
close(fd);
}
static void print_usage(void)
{
printf("Usage:\n");
printf("booth <type> <operation> [options]\n");
printf("\n");
printf("Types:\n");
printf(" arbitrator: daemon running on arbitrator\n");
printf(" site: daemon running on cluster site\n");
printf(" client: command running from client\n");
printf("\n");
printf("Operations:\n");
printf("Please note that operations are valid iff type is client!\n");
printf("list: List all the tickets\n");
printf("grant: Grant ticket T(-t T) to site S(-s S)\n");
- printf("revoke: Revoke ticket T(-t T) from local site\n");
+ printf("revoke: Revoke ticket T(-t T) from site S(-s S)\n");
printf("\n");
printf("Options:\n");
printf(" -D Enable debugging to stderr and don't fork\n");
printf(" -t ticket name\n");
printf(" -s site name\n");
printf(" -f ticket attribute: force, only valid when "
"granting\n");
printf(" -e ticket will failover after the expiry time if "
"not being renewed,\n\t\tset it while "
"granting. default: 600sec\n");
printf(" -h Print this help, then exit\n");
}
#define OPTION_STRING "Dt:s:fe:h"
static int read_arguments(int argc, char **argv)
{
int optchar;
char *arg1 = argv[1];
char *op;
if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") ||
!strcmp(arg1, "-h")) {
print_usage();
exit(EXIT_SUCCESS);
}
if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") ||
!strcmp(arg1, "-V")) {
printf("%s %s (built %s %s)\n",
argv[0], RELEASE_VERSION, __DATE__, __TIME__);
exit(EXIT_SUCCESS);
}
if (!strcmp(arg1, "arbitrator")) {
cl.type = ACT_ARBITRATOR;
optind = 2;
} else if (!strcmp(arg1, "site")) {
cl.type = ACT_SITE;
optind = 2;
} else if (!strcmp(arg1, "client")) {
cl.type = ACT_CLIENT;
if (argc < 3) {
print_usage();
exit(EXIT_FAILURE);
}
op = argv[2];
optind = 3;
} else {
cl.type = ACT_CLIENT;
op = argv[1];
optind = 2;
}
switch (cl.type) {
case ACT_ARBITRATOR:
break;
case ACT_SITE:
break;
case ACT_CLIENT:
if (!strcmp(op, "list"))
cl.op = OP_LIST;
else if (!strcmp(op, "grant"))
cl.op = OP_GRANT;
else if (!strcmp(op, "revoke"))
cl.op = OP_REVOKE;
else {
fprintf(stderr, "client operation \"%s\" is unknown\n",
op);
exit(EXIT_FAILURE);
}
break;
}
while (optind < argc) {
optchar = getopt(argc, argv, OPTION_STRING);
switch (optchar) {
case 'D':
cl.debug = 1;
log_logfile_priority = LOG_DEBUG;
log_syslog_priority = LOG_DEBUG;
break;
case 't':
if (cl.op == OP_GRANT || cl.op == OP_REVOKE)
strcpy(cl.ticket, optarg);
else {
print_usage();
exit(EXIT_FAILURE);
}
break;
case 's':
if (cl.op == OP_GRANT || cl.op == OP_REVOKE)
strcpy(cl.site, optarg);
else {
print_usage();
exit(EXIT_FAILURE);
}
break;
case 'f':
if (cl.op == OP_GRANT)
cl.force = 1;
else {
print_usage();
exit(EXIT_FAILURE);
}
break;
case 'e':
if (cl.op == OP_GRANT)
cl.expiry = atoi(optarg);
else {
print_usage();
exit(EXIT_FAILURE);
}
break;
case 'h':
print_usage();
exit(EXIT_SUCCESS);
break;
case ':':
case '?':
fprintf(stderr, "Please use '-h' for usage.\n");
exit(EXIT_FAILURE);
break;
default:
fprintf(stderr, "unknown option: %c\n", optchar);
exit(EXIT_FAILURE);
break;
};
}
return 0;
}
static void set_scheduler(void)
{
struct sched_param sched_param;
struct rlimit rlimit;
int rv;
rlimit.rlim_cur = RLIM_INFINITY;
rlimit.rlim_max = RLIM_INFINITY;
setrlimit(RLIMIT_MEMLOCK, &rlimit);
rv = mlockall(MCL_CURRENT | MCL_FUTURE);
if (rv < 0) {
log_error("mlockall failed");
}
rv = sched_get_priority_max(SCHED_RR);
if (rv != -1) {
sched_param.sched_priority = rv;
rv = sched_setscheduler(0, SCHED_RR, &sched_param);
if (rv == -1)
log_error("could not set SCHED_RR priority %d err %d",
sched_param.sched_priority, errno);
} else {
log_error("could not get maximum scheduler priority err %d",
errno);
}
}
static void set_oom_adj(int val)
{
FILE *fp;
fp = fopen("/proc/self/oom_adj", "w");
if (!fp)
return;
fprintf(fp, "%i", val);
fclose(fp);
}
static int do_arbitrator(void)
{
int fd;
int rv = -1;
if (!cl.debug) {
if (daemon(0, 0) < 0) {
perror("daemon error");
exit(EXIT_FAILURE);
}
}
setup_logging();
fd = lockfile();
if (fd < 0)
return fd;
log_info("BOOTH arbitrator daemon started");
set_scheduler();
set_oom_adj(-16);
rv = loop(ARBITRATOR);
if (rv < 0)
goto fail;
unlink_lockfile(fd);
close_logging();
return 0;
fail:
return -1;
}
static int do_site(void)
{
int fd;
int rv = -1;
if (!cl.debug) {
if (daemon(0, 0) < 0) {
perror("daemon error");
exit(EXIT_FAILURE);
}
}
setup_logging();
fd = lockfile();
if (fd < 0)
return fd;
log_info("BOOTH cluster site daemon started");
set_scheduler();
set_oom_adj(-16);
rv = loop(SITE);
if (rv < 0)
goto fail;
unlink_lockfile(fd);
close_logging();
return 0;
fail:
return -1;
}
static int do_client(void)
{
int rv = -1;
setup_logging();
switch (cl.op) {
case OP_LIST:
rv = do_list();
break;
case OP_GRANT:
rv = do_grant();
break;
case OP_REVOKE:
rv = do_revoke();
break;
}
close_logging();
return rv;
}
int main(int argc, char *argv[])
{
int rv;
memset(&cl, 0, sizeof(cl));
rv = read_arguments(argc, argv);
if (rv < 0)
goto out;
switch (cl.type) {
case ACT_ARBITRATOR:
rv = do_arbitrator();
break;
case ACT_SITE:
rv = do_site();
break;
case ACT_CLIENT:
rv = do_client();
break;
}
out:
return rv ? EXIT_FAILURE : EXIT_SUCCESS;
}
diff --git a/src/paxos_lease.c b/src/paxos_lease.c
index b1f309f..204040a 100644
--- a/src/paxos_lease.c
+++ b/src/paxos_lease.c
@@ -1,443 +1,454 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include "paxos.h"
#include "paxos_lease.h"
#include "transport.h"
#include "config.h"
#include "timer.h"
#include "list.h"
#define PAXOS_LEASE_SPACE "paxoslease"
#define PLEASE_VALUE_LEN 1024
struct paxos_lease_msghdr {
int leased;
};
struct paxos_lease_value {
char name[PAXOS_NAME_LEN+1];
int owner;
int expiry;
};
struct lease_state {
int round;
struct paxos_lease_value *plv;
unsigned long long expires;
struct timerlist *timer;
};
struct paxos_lease {
char name[PAXOS_NAME_LEN+1];
pi_handle_t pih;
struct lease_state proposer;
struct lease_state acceptor;
int owner;
int expiry;
int relet;
int failover;
+ int release;
unsigned long long expires;
void (*end_lease) (pi_handle_t, int);
struct timerlist *timer;
struct list_head list;
};
static LIST_HEAD(lease_head);
static int myid = -1;
static struct paxos_operations *px_op = NULL;
const struct paxos_lease_operations *p_l_op;
ps_handle_t ps_handle = 0;
static void end_paxos_request(pi_handle_t handle, int round, int result)
{
struct paxos_lease *pl;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found)
return;
if (round != pl->proposer.round)
return;
if (pl->end_lease)
pl->end_lease((pl_handle_t)pl, result);
return;
}
static void lease_expires(unsigned long data)
{
struct paxos_lease *pl = (struct paxos_lease *)data;
pl_handle_t plh = (pl_handle_t)pl;
struct paxos_lease_result plr;
if (pl->owner != myid) {
pl->owner = -1;
strcpy(plr.name, pl->name);
plr.owner = -1;
plr.expires = 0;
p_l_op->notify(plh, &plr);
if (pl->proposer.timer)
del_timer(pl->proposer.timer);
if (pl->acceptor.timer)
del_timer(pl->acceptor.timer);
if (pl->failover)
paxos_lease_acquire(plh, 1, NULL);
- } else if (pl->owner == myid && pl->relet) {
+ } else if (pl->owner == myid && pl->relet && !pl->release) {
struct paxos_lease_value value;
strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1);
value.owner = myid;
value.expiry = pl->expiry;
paxos_propose(pl->pih, &value, pl->proposer.round);
} else {
pl->owner = -1;
strcpy(plr.name, pl->name);
plr.owner = -1;
plr.expires = 0;
p_l_op->notify(plh, &plr);
if (pl->proposer.timer)
del_timer(pl->proposer.timer);
if (pl->acceptor.timer)
del_timer(pl->acceptor.timer);
}
}
int paxos_lease_acquire(pl_handle_t handle,
int relet,
void (*end_acquire) (pl_handle_t handle, int result))
{
struct paxos_lease *pl = (struct paxos_lease *)handle;
struct paxos_lease_value value;
int round;
strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1);
value.owner = myid;
value.expiry = pl->expiry;
pl->relet = relet;
pl->end_lease = end_acquire;
+ pl->release = 0;
round = paxos_round_request(pl->pih, &value, end_paxos_request);
if (round <= 0)
return -1;
pl->proposer.round = round;
return 0;
}
+int paxos_lease_release(pl_handle_t handle)
+{
+ struct paxos_lease *pl = (struct paxos_lease *)handle;
+
+ pl->release = 1;
+
+ return 0;
+}
+
static int lease_catchup(const void *name)
{
struct paxos_lease *pl;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (!strcmp(pl->name, name)) {
found = 1;
break;
}
}
if (!found)
return -1;
p_l_op->catchup(name, &pl->owner, &pl->expires);
return 0;
}
static int lease_prepared(pi_handle_t handle __attribute__((unused)),
void *header)
{
struct paxos_lease_msghdr *hdr = header;
if (hdr->leased)
return 0;
else
return 1;
}
static int handle_lease_request(pi_handle_t handle, void *header)
{
struct paxos_lease_msghdr *hdr;
struct paxos_lease *pl;
int found = 0;
hdr = header;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found)
return -1;
if (pl->owner == -1)
hdr->leased = 0;
else
hdr->leased = 1;
return 0;
}
static int lease_propose(pi_handle_t handle,
void *extra __attribute__((unused)),
int round, void *value)
{
struct paxos_lease *pl;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found)
return -1;
if (round != pl->proposer.round)
return -1;
if (!pl->proposer.plv) {
pl->proposer.plv = malloc(sizeof(struct paxos_lease_value));
if (!pl->proposer.plv)
return -ENOMEM;
}
memcpy(pl->proposer.plv, value, sizeof(struct paxos_lease_value));
if (pl->relet) {
pl->proposer.timer = add_timer(4 * pl->expiry / 5,
(unsigned long)pl,
lease_expires);
pl->proposer.expires = current_time() + 4 * pl->expiry / 5;
} else {
pl->proposer.timer = add_timer(pl->expiry, (unsigned long)pl,
lease_expires);
pl->proposer.expires = current_time() + pl->expiry;
}
return 0;
}
static int lease_accepted(pi_handle_t handle,
void *extra __attribute__((unused)),
int round, void *value)
{
struct paxos_lease *pl;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found)
return -1;
pl->acceptor.round = round;
if (!pl->acceptor.plv) {
pl->acceptor.plv = malloc(sizeof(struct paxos_lease_value));
if (!pl->acceptor.plv)
return -ENOMEM;
}
memcpy(pl->acceptor.plv, value, sizeof(struct paxos_lease_value));
if (pl->acceptor.timer)
mod_timer(pl->acceptor.timer, pl->expiry);
else
pl->acceptor.timer = add_timer(pl->expiry, (unsigned long)pl,
lease_expires);
pl->acceptor.expires = current_time() + pl->expiry;
return 0;
}
static int lease_commit(pi_handle_t handle,
void *extra __attribute__((unused)),
int round)
{
struct paxos_lease *pl;
struct paxos_lease_result plr;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found)
return -1;
if (round != pl->proposer.round)
return -1;
pl->owner = pl->proposer.plv->owner;
pl->expiry = pl->proposer.plv->expiry;
strcpy(plr.name, pl->proposer.plv->name);
plr.owner = pl->proposer.plv->owner;
plr.expires = current_time() + pl->proposer.plv->expiry;
p_l_op->notify((pl_handle_t)pl, &plr);
return 0;
}
static int lease_learned(pi_handle_t handle,
void *extra __attribute__((unused)),
int round)
{
struct paxos_lease *pl;
struct paxos_lease_result plr;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found)
return -1;
if (round != pl->acceptor.round)
return -1;
pl->owner = pl->acceptor.plv->owner;
pl->expiry = pl->acceptor.plv->expiry;
strcpy(plr.name, pl->acceptor.plv->name);
plr.owner = pl->acceptor.plv->owner;
plr.expires = current_time() + pl->acceptor.plv->expiry;
p_l_op->notify((pl_handle_t)pl, &plr);
return 0;
}
pl_handle_t paxos_lease_init(const void *name,
unsigned int namelen,
int expiry,
int number,
int failover,
unsigned char *role,
int *prio,
const struct paxos_lease_operations *pl_op)
{
ps_handle_t psh;
pi_handle_t pih;
struct paxos_lease *lease;
if (namelen > PAXOS_NAME_LEN)
return -EINVAL;
if (myid == -1)
myid = pl_op->get_myid();
if (!ps_handle) {
px_op = malloc(sizeof(struct paxos_operations));
if (!px_op)
return -ENOMEM;
memset(px_op, 0, sizeof(struct paxos_operations));
px_op->get_myid = pl_op->get_myid;
px_op->send = pl_op->send;
px_op->broadcast = pl_op->broadcast;
px_op->catchup = lease_catchup;
px_op->prepare = lease_prepared;
px_op->promise = handle_lease_request;
px_op->propose = lease_propose;
px_op->accepted = lease_accepted;
px_op->commit = lease_commit;
px_op->learned = lease_learned;
p_l_op = pl_op;
psh = paxos_space_init(PAXOS_LEASE_SPACE,
number,
sizeof(struct paxos_lease_msghdr),
PLEASE_VALUE_LEN,
role,
px_op);
if (psh <= 0) {
free(px_op);
px_op = NULL;
return psh;
}
ps_handle = psh;
}
lease = malloc(sizeof(struct paxos_lease));
if (!lease)
return -ENOMEM;
memset(lease, 0, sizeof(struct paxos_lease));
strncpy(lease->name, name, PAXOS_NAME_LEN + 1);
lease->owner = -1;
lease->expiry = expiry;
lease->failover = failover;
list_add_tail(&lease->list, &lease_head);
pih = paxos_instance_init(ps_handle, name, prio);
if (pih <= 0) {
free(lease);
return pih;
}
lease->pih = pih;
return (pl_handle_t)lease;
}
int paxos_lease_on_receive(void *msg, int msglen)
{
return paxos_recvmsg(msg, msglen);
}
int paxos_lease_exit(pl_handle_t handle)
{
struct paxos_lease *pl = (struct paxos_lease *)handle;
if (px_op)
free(px_op);
if (pl->proposer.plv)
free(pl->proposer.plv);
if (pl->proposer.timer)
del_timer(pl->proposer.timer);
if (pl->acceptor.plv)
free(pl->acceptor.plv);
if (pl->acceptor.timer)
del_timer(pl->acceptor.timer);
return 0;
}
diff --git a/src/ticket.c b/src/ticket.c
index fdb7ad6..1195f01 100644
--- a/src/ticket.c
+++ b/src/ticket.c
@@ -1,346 +1,375 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include "ticket.h"
#include "config.h"
#include "pacemaker.h"
#include "list.h"
#include "log.h"
#include "paxos_lease.h"
#include "paxos.h"
#define PAXOS_MAGIC 0xDB12
struct booth_msghdr {
uint16_t magic;
uint16_t checksum;
uint32_t len;
} __attribute__((packed));
struct ticket {
char id[BOOTH_NAME_LEN+1];
pl_handle_t handle;
int owner;
int expiry;
unsigned long long expires;
struct list_head list;
};
static LIST_HEAD(ticket_list);
static unsigned char *role;
int check_ticket(char *ticket)
{
int i;
if (!booth_conf)
return 0;
for (i = 0; i < booth_conf->ticket_count; i++) {
if (!strcmp(booth_conf->ticket[i].name, ticket))
return 1;
}
return 0;
}
int check_site(char *site, int *local)
{
int i;
if (!booth_conf)
return 0;
for (i = 0; i < booth_conf->node_count; i++) {
if (booth_conf->node[i].type == SITE
&& !strcmp(booth_conf->node[i].addr, site)) {
*local = booth_conf->node[i].local;
return 1;
}
}
return 0;
}
static int * ticket_priority(int i)
{
int j;
/* TODO: need more precise check */
for (j = 0; j < booth_conf->node_count; j++) {
if (booth_conf->ticket[i].weight[j] == 0)
return NULL;
}
return booth_conf->ticket[i].weight;
}
static int ticket_get_myid(void)
{
return booth_transport[booth_conf->proto].get_myid();
}
static void end_acquire(pl_handle_t handle, int result)
{
struct ticket *tk;
int found = 0;
if (result == 0) {
list_for_each_entry(tk, &ticket_list, list) {
if (tk->handle == handle) {
tk->owner = ticket_get_myid();
found = 1;
break;
}
}
if (!found)
log_error("BUG: ticket handle %d does not exist",
handle);
log_info("ticket %s acquired", tk->id);
log_info("ticket %s granted to local (id %d)", tk->id,
ticket_get_myid());
}
}
static int ticket_send(unsigned long id, void *value, int len)
{
int i, rv = -1;
struct booth_node *to = NULL;
struct booth_msghdr *hdr;
void *buf;
for (i = 0; i < booth_conf->node_count; i++) {
if (booth_conf->node[i].nodeid == id)
to = &booth_conf->node[i];
}
if (!to)
return rv;
buf = malloc(sizeof(struct booth_msghdr) + len);
if (!buf)
return -ENOMEM;
memset(buf, 0, sizeof(struct booth_msghdr) + len);
hdr = buf;
hdr->magic = htons(PAXOS_MAGIC);
hdr->len = htonl(sizeof(struct booth_msghdr) + len);
memcpy((char *)buf + sizeof(struct booth_msghdr), value, len);
rv = booth_transport[booth_conf->proto].send(
(unsigned long)to, buf, sizeof(struct booth_msghdr) + len);
free(buf);
return rv;
}
static int ticket_broadcast(void *value, int len)
{
void *buf;
struct booth_msghdr *hdr;
int rv;
buf = malloc(sizeof(struct booth_msghdr) + len);
if (!buf)
return -ENOMEM;
memset(buf, 0, sizeof(struct booth_msghdr) + len);
hdr = buf;
hdr->magic = htons(PAXOS_MAGIC);
hdr->len = htonl(sizeof(struct booth_msghdr) + len);
memcpy((char *)buf + sizeof(struct booth_msghdr), value, len);
rv = booth_transport[booth_conf->proto].broadcast(
buf, sizeof(struct booth_msghdr) + len);
free(buf);
return rv;
}
static int ticket_read(const void *name, int *owner,
unsigned long long *expires)
{
struct ticket *tk;
int found = 0;
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, name)) {
found = 1;
break;
}
}
if (!found) {
log_error("BUG: ticket_read failed (ticket %s does not exist)",
(char *)name);
return -1;
}
pcmk_handler.load_ticket(tk->id, &tk->owner, &tk->expires);
*owner = tk->owner;
*expires = tk->expires;
return 0;
}
static int ticket_write(pl_handle_t handle, struct paxos_lease_result *result)
{
struct ticket *tk;
int found = 0;
list_for_each_entry(tk, &ticket_list, list) {
if (tk->handle == handle) {
found = 1;
break;
}
}
if (!found) {
log_error("BUG: ticket_write failed "
"(ticket handle %d does not exist)", handle);
return -1;
}
tk->owner = result->owner;
tk->expires = result->expires;
if (tk->owner == ticket_get_myid()) {
pcmk_handler.store_ticket(tk->id, tk->owner, tk->expires);
pcmk_handler.grant_ticket(tk->id);
} else if (tk->owner == -1) {
pcmk_handler.store_ticket(tk->id, tk->owner, tk->expires);
pcmk_handler.revoke_ticket(tk->id);
} else
pcmk_handler.store_ticket(tk->id, tk->owner, tk->expires);
return 0;
}
int ticket_recv(void *msg, int msglen)
{
struct booth_msghdr *hdr;
char *data;
hdr = msg;
if (ntohs(hdr->magic) != PAXOS_MAGIC ||
ntohl(hdr->len) != msglen) {
log_error("message received error");
return -1;
}
data = (char *)msg + sizeof(struct booth_msghdr);
return paxos_lease_on_receive(data,
msglen - sizeof(struct booth_msghdr));
}
int grant_ticket(char *ticket, int force, int expiry)
{
struct ticket *tk;
int found = 0;
if (force) {
pcmk_handler.store_ticket(ticket, ticket_get_myid(), -1);
pcmk_handler.grant_ticket(ticket);
return BOOTHC_RLT_SYNC_SUCC;
}
if (!expiry)
expiry = DEFAULT_TICKET_EXPIRY;
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, ticket)) {
found = 1;
break;
}
}
if (!found) {
log_error("ticket %s does not exist", ticket);
return BOOTHC_RLT_SYNC_FAIL;
}
if (tk->owner == ticket_get_myid())
return BOOTHC_RLT_SYNC_SUCC;
else {
paxos_lease_acquire(tk->handle, 1, end_acquire);
return BOOTHC_RLT_ASYNC;
}
}
+int revoke_ticket(char *ticket, int force)
+{
+ struct ticket *tk;
+ int found = 0;
+
+ list_for_each_entry(tk, &ticket_list, list) {
+ if (!strcmp(tk->id, ticket)) {
+ found = 1;
+ break;
+ }
+ }
+ if (!found) {
+ log_error("ticket %s does not exist", ticket);
+ return BOOTHC_RLT_SYNC_FAIL;
+ }
+
+ if (force) {
+ pcmk_handler.store_ticket(tk->id, -1, 0);
+ pcmk_handler.revoke_ticket(tk->id);
+ }
+
+ if (tk->owner == -1)
+ return BOOTHC_RLT_SYNC_SUCC;
+ else {
+ paxos_lease_release(tk->handle);
+ return BOOTHC_RLT_ASYNC;
+ }
+}
+
const struct paxos_lease_operations ticket_operations = {
.get_myid = ticket_get_myid,
.send = ticket_send,
.broadcast = ticket_broadcast,
.catchup = ticket_read,
.notify = ticket_write,
};
int setup_ticket(void)
{
struct ticket *tk, *tmp;
int i, rv;
pl_handle_t plh;
role = malloc(booth_conf->node_count * sizeof(unsigned char));
if (!role)
return -ENOMEM;
memset(role, 0, booth_conf->node_count * sizeof(unsigned char));
for (i = 0; i < booth_conf->node_count; i++) {
if (booth_conf->node[i].type == SITE)
role[i] = PROPOSER | ACCEPTOR | LEARNER;
else if (booth_conf->node[i].type == ARBITRATOR)
role[i] = ACCEPTOR;
}
for (i = 0; i < booth_conf->ticket_count; i++) {
tk = malloc(sizeof(struct ticket));
if (!tk) {
rv = -ENOMEM;
goto out;
}
memset(tk, 0, sizeof(struct ticket));
strcpy(tk->id, booth_conf->ticket[i].name);
tk->owner = -1;
tk->expiry = booth_conf->ticket[i].expiry;
if (!tk->expiry)
tk->expiry = DEFAULT_TICKET_EXPIRY;
list_add_tail(&tk->list, &ticket_list);
plh = paxos_lease_init(tk->id,
BOOTH_NAME_LEN,
tk->expiry,
booth_conf->node_count,
1,
role,
ticket_priority(i),
&ticket_operations);
if (plh <= 0) {
log_error("paxos lease initialization failed");
rv = plh;
goto out;
}
tk->handle = plh;
}
return 0;
out:
list_for_each_entry_safe(tk, tmp, &ticket_list, list) {
list_del(&tk->list);
}
free(role);
return rv;
}
diff --git a/src/ticket.h b/src/ticket.h
index 12d1042..c8e05f5 100644
--- a/src/ticket.h
+++ b/src/ticket.h
@@ -1,30 +1,31 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _TICKET_H
#define _TICKET_H
#define DEFAULT_TICKET_EXPIRY 600
int check_ticket(char *ticket);
int check_site(char *site, int *local);
int grant_ticket(char *ticket, int force, int expiry);
+int revoke_ticket(char *ticket, int force);
int ticket_recv(void *msg, int msglen);
int setup_ticket(void);
#endif /* _TICKET_H */

File Metadata

Mime Type
text/x-diff
Expires
Mon, Feb 24, 5:00 AM (1 d, 21 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1463965
Default Alt Text
(42 KB)

Event Timeline