Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/src/main.c b/src/main.c
index a78d732..819ce4d 100644
--- a/src/main.c
+++ b/src/main.c
@@ -1,1087 +1,1087 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sched.h>
#include <errno.h>
#include <limits.h>
#include <sys/file.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/poll.h>
#include <fcntl.h>
#include <string.h>
#include <assert.h>
#include "log.h"
#include "booth.h"
#include "config.h"
#include "transport.h"
#include "timer.h"
#include "pacemaker.h"
#include "ticket.h"
#include <error.h>
#define RELEASE_VERSION "1.0"
#define CLIENT_NALLOC 32
int log_logfile_priority = LOG_INFO;
int log_syslog_priority = LOG_ERR;
int log_stderr_priority = LOG_ERR;
static int client_maxi;
static int client_size = 0;
struct client *client = NULL;
struct pollfd *pollfd = NULL;
int poll_timeout = -1;
typedef enum {
ACT_ARBITRATOR = 1,
ACT_SITE,
ACT_CLIENT,
} booth_role_t;
typedef enum {
OP_LIST = 1,
OP_GRANT,
OP_REVOKE,
} operation_t;
struct command_line {
int type; /* ACT_ */
int op; /* OP_ */
int force;
char configfile[BOOTH_PATH_LEN];
char lockfile[BOOTH_PATH_LEN];
char site[BOOTH_NAME_LEN];
char ticket[BOOTH_NAME_LEN];
};
static struct command_line cl;
int do_read(int fd, void *buf, size_t count)
{
int rv, off = 0;
while (off < count) {
rv = read(fd, (char *)buf + off, count - off);
if (rv == 0)
return -1;
if (rv == -1 && errno == EINTR)
continue;
if (rv == -1)
return -1;
off += rv;
}
return 0;
}
int do_write(int fd, void *buf, size_t count)
{
int rv, off = 0;
retry:
rv = write(fd, (char *)buf + off, count);
if (rv == -1 && errno == EINTR)
goto retry;
if (rv < 0) {
log_error("write errno %d", errno);
return rv;
}
if (rv != count) {
count -= rv;
off += rv;
goto retry;
}
return 0;
}
static int do_connect(const char *sock_path)
{
struct sockaddr_un sun;
socklen_t addrlen;
int rv, fd;
fd = socket(PF_UNIX, SOCK_STREAM, 0);
if (fd < 0)
goto out;
memset(&sun, 0, sizeof(sun));
sun.sun_family = AF_UNIX;
strcpy(&sun.sun_path[1], sock_path);
addrlen = sizeof(sa_family_t) + strlen(sun.sun_path+1) + 1;
rv = connect(fd, (struct sockaddr *) &sun, addrlen);
if (rv < 0) {
close(fd);
fd = rv;
}
out:
return fd;
}
static void init_header(struct boothc_header *h, int cmd, int option,
int result, int data_len)
{
memset(h, 0, sizeof(struct boothc_header));
h->magic = BOOTHC_MAGIC;
h->version = BOOTHC_VERSION;
h->len = data_len;
h->cmd = cmd;
h->option = option;
h->result = result;
}
static void client_alloc(void)
{
int i;
if (!client) {
client = malloc(CLIENT_NALLOC * sizeof(struct client));
pollfd = malloc(CLIENT_NALLOC * sizeof(struct pollfd));
} else {
client = realloc(client, (client_size + CLIENT_NALLOC) *
sizeof(struct client));
pollfd = realloc(pollfd, (client_size + CLIENT_NALLOC) *
sizeof(struct pollfd));
if (!pollfd)
log_error("can't alloc for pollfd");
}
if (!client || !pollfd)
log_error("can't alloc for client array");
for (i = client_size; i < client_size + CLIENT_NALLOC; i++) {
client[i].workfn = NULL;
client[i].deadfn = NULL;
client[i].fd = -1;
pollfd[i].fd = -1;
pollfd[i].revents = 0;
}
client_size += CLIENT_NALLOC;
}
static void client_dead(int ci)
{
close(client[ci].fd);
client[ci].workfn = NULL;
client[ci].fd = -1;
pollfd[ci].fd = -1;
}
int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci))
{
int i;
if (!client)
client_alloc();
again:
for (i = 0; i < client_size; i++) {
if (client[i].fd == -1) {
client[i].workfn = workfn;
if (deadfn)
client[i].deadfn = deadfn;
else
client[i].deadfn = client_dead;
client[i].fd = fd;
pollfd[i].fd = fd;
pollfd[i].events = POLLIN;
if (i > client_maxi)
client_maxi = i;
return i;
}
}
client_alloc();
goto again;
}
static int setup_listener(const char *sock_path)
{
struct sockaddr_un addr;
socklen_t addrlen;
int rv, s;
s = socket(AF_LOCAL, SOCK_STREAM, 0);
if (s < 0) {
log_error("socket error %d %d", s, errno);
return s;
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_LOCAL;
strcpy(&addr.sun_path[1], sock_path);
addrlen = sizeof(sa_family_t) + strlen(addr.sun_path+1) + 1;
rv = bind(s, (struct sockaddr *) &addr, addrlen);
if (rv < 0) {
log_error("bind error %d %d", rv, errno);
close(s);
return rv;
}
rv = listen(s, 5);
if (rv < 0) {
log_error("listen error %d %d", rv, errno);
close(s);
return rv;
}
return s;
}
void process_connection(int ci)
{
struct boothc_header h;
char *data = NULL;
char *site, *ticket;
int local, rv;
void (*deadfn) (int ci);
rv = do_read(client[ci].fd, &h, sizeof(h));
if (rv < 0) {
log_error("connection %d read error %d", ci, rv);
if (errno == ECONNRESET)
log_debug("client %d aborted conection fd %d", ci, client[ci].fd);
else
log_debug("client %d closed connnection fd %d", ci, client[ci].fd);
deadfn = client[ci].deadfn;
if(deadfn) {
log_debug("run deadfn");
deadfn(ci);
}
return;
}
if (h.magic != BOOTHC_MAGIC) {
log_error("connection %d magic error %x", ci, h.magic);
return;
}
if (h.version != BOOTHC_VERSION) {
log_error("connection %d version error %x", ci, h.version);
return;
}
if (h.len) {
data = malloc(h.len);
if (!data) {
log_error("process_connection no mem %u", h.len);
return;
}
memset(data, 0, h.len);
rv = do_read(client[ci].fd, data, h.len);
if (rv < 0) {
log_error("connection %d read data error %d", ci, rv);
goto out;
}
h.len = 0;
}
switch (h.cmd) {
case BOOTHC_CMD_LIST:
assert(!data);
h.result = list_ticket(&data, &h.len);
break;
case BOOTHC_CMD_GRANT:
site = data;
ticket = data + BOOTH_NAME_LEN;
if (!check_ticket(ticket)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (!check_site(site, &local)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (local)
h.result = grant_ticket(ticket, h.option);
else
h.result = BOOTHC_RLT_REMOTE_OP;
break;
case BOOTHC_CMD_REVOKE:
site = data;
ticket = data + BOOTH_NAME_LEN;
if (!check_ticket(ticket)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (!check_site(site, &local)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (local)
h.result = revoke_ticket(ticket, h.option);
else
h.result = BOOTHC_RLT_REMOTE_OP;
break;
default:
log_error("connection %d cmd %x unknown", ci, h.cmd);
break;
}
reply:
rv = do_write(client[ci].fd, &h, sizeof(h));
if (rv < 0)
log_error("connection %d write error %d", ci, rv);
if (h.len) {
rv = do_write(client[ci].fd, data, h.len);
if (rv < 0)
log_error("connection %d write error %d", ci, rv);
}
out:
free(data);
}
static void process_listener(int ci)
{
int fd, i;
fd = accept(client[ci].fd, NULL, NULL);
if (fd < 0) {
log_error("process_listener: accept error %d %d", fd, errno);
return;
}
i = client_add(fd, process_connection, NULL);
log_debug("add client connection %d fd %d", i, fd);
}
static int setup_config(int type)
{
int rv;
rv = read_config(cl.configfile);
if (rv < 0)
goto out;
rv = check_config(type);
if (rv < 0)
goto out;
out:
return rv;
}
static int setup_transport(void)
{
int rv;
transport_layer_t proto = booth_conf->proto;
rv = booth_transport[proto].init(ticket_recv);
if (rv < 0) {
log_error("failed to init booth_transport[%d]", proto);
goto out;
}
rv = booth_transport[TCP].init(NULL);
if (rv < 0) {
log_error("failed to init booth_transport[TCP]");
goto out;
}
out:
return rv;
}
static int setup_timer(void)
{
return timerlist_init();
}
static int setup(int type)
{
int rv;
rv = setup_config(type);
if (rv < 0)
goto fail;
rv = setup_timer();
if (rv < 0)
goto fail;
rv = setup_transport();
if (rv < 0)
goto fail;
rv = setup_ticket();
if (rv < 0)
goto fail;
rv = setup_listener(BOOTHC_SOCK_PATH);
if (rv < 0)
goto fail;
client_add(rv, process_listener, NULL);
return 0;
fail:
return -1;
}
static int loop(void)
{
void (*workfn) (int ci);
void (*deadfn) (int ci);
int rv, i;
while (1) {
rv = poll(pollfd, client_maxi + 1, poll_timeout);
if (rv == -1 && errno == EINTR)
continue;
if (rv < 0) {
log_error("poll errno %d", errno);
goto fail;
}
for (i = 0; i <= client_maxi; i++) {
if (client[i].fd < 0)
continue;
if (pollfd[i].revents & POLLIN) {
workfn = client[i].workfn;
if (workfn)
workfn(i);
}
if (pollfd[i].revents &
(POLLERR | POLLHUP | POLLNVAL)) {
deadfn = client[i].deadfn;
if (deadfn)
deadfn(i);
}
}
process_timerlist();
}
return 0;
fail:
return -1;
}
static int do_list(void)
{
struct boothc_header h, *rh;
char *reply = NULL, *data;
int data_len;
int fd, rv;
init_header(&h, BOOTHC_CMD_LIST, 0, 0, 0);
fd = do_connect(BOOTHC_SOCK_PATH);
if (fd < 0) {
rv = fd;
goto out;
}
rv = do_write(fd, &h, sizeof(h));
if (rv < 0)
goto out_close;
reply = malloc(sizeof(struct boothc_header));
if (!reply) {
rv = -ENOMEM;
goto out_close;
}
rv = do_read(fd, reply, sizeof(struct boothc_header));
if (rv < 0)
goto out_free;
rh = (struct boothc_header *)reply;
data_len = rh->len;
reply = realloc(reply, sizeof(struct boothc_header) + data_len);
if (!reply) {
rv = -ENOMEM;
goto out_free;
}
data = reply + sizeof(struct boothc_header);
rv = do_read(fd, data, data_len);
if (rv < 0)
goto out_free;
do_write(STDOUT_FILENO, data, data_len);
rv = 0;
out_free:
free(reply);
out_close:
close(fd);
out:
return rv;
}
static int do_grant(void)
{
char *buf;
struct boothc_header *h, reply;
int buflen;
uint32_t force = 0;
int fd, rv;
buflen = sizeof(struct boothc_header) +
sizeof(cl.site) + sizeof(cl.ticket);
buf = malloc(buflen);
if (!buf) {
rv = -ENOMEM;
goto out;
}
h = (struct boothc_header *)buf;
if (cl.force)
force = BOOTHC_OPT_FORCE;
init_header(h, BOOTHC_CMD_GRANT, force, 0,
sizeof(cl.site) + sizeof(cl.ticket));
strcpy(buf + sizeof(struct boothc_header), cl.site);
strcpy(buf + sizeof(struct boothc_header) + sizeof(cl.site), cl.ticket);
fd = do_connect(BOOTHC_SOCK_PATH);
if (fd < 0) {
rv = fd;
goto out_free;
}
rv = do_write(fd, buf, buflen);
if (rv < 0)
goto out_close;
rv = do_read(fd, &reply, sizeof(struct boothc_header));
if (rv < 0)
goto out_close;
if (reply.result == BOOTHC_RLT_INVALID_ARG) {
log_info("invalid argument!");
rv = -1;
goto out_close;
}
if (reply.result == BOOTHC_RLT_REMOTE_OP) {
struct booth_node to;
int s;
memset(&to, 0, sizeof(struct booth_node));
to.family = BOOTH_PROTO_FAMILY;
strcpy(to.addr, cl.site);
s = booth_transport[TCP].open(&to);
if (s < 0)
goto out_close;
rv = booth_transport[TCP].send(s, buf, buflen);
if (rv < 0) {
booth_transport[TCP].close(s);
goto out_close;
}
rv = booth_transport[TCP].recv(s, &reply,
sizeof(struct boothc_header));
if (rv < 0) {
booth_transport[TCP].close(s);
goto out_close;
}
booth_transport[TCP].close(s);
}
if (reply.result == BOOTHC_RLT_ASYNC) {
log_info("grant command sent, but result is async.");
rv = 0;
} else if (reply.result == BOOTHC_RLT_SYNC_SUCC) {
log_info("grant succeeded!");
rv = 0;
} else if (reply.result == BOOTHC_RLT_SYNC_FAIL) {
log_info("grant failed!");
rv = 0;
} else {
log_error("internal error!");
rv = -1;
}
out_close:
close(fd);
out_free:
free(buf);
out:
return rv;
}
static int do_revoke(void)
{
char *buf;
struct boothc_header *h, reply;
int buflen;
uint32_t force = 0;
int fd, rv;
buflen = sizeof(struct boothc_header) +
sizeof(cl.site) + sizeof(cl.ticket);
buf = malloc(buflen);
if (!buf) {
rv = -ENOMEM;
goto out;
}
h = (struct boothc_header *)buf;
if (cl.force)
force = BOOTHC_OPT_FORCE;
init_header(h, BOOTHC_CMD_REVOKE, force, 0,
sizeof(cl.site) + sizeof(cl.ticket));
strcpy(buf + sizeof(struct boothc_header), cl.site);
strcpy(buf + sizeof(struct boothc_header) + sizeof(cl.site), cl.ticket);
fd = do_connect(BOOTHC_SOCK_PATH);
if (fd < 0) {
rv = fd;
goto out_free;
}
rv = do_write(fd, buf, buflen);
if (rv < 0)
goto out_close;
rv = do_read(fd, &reply, sizeof(struct boothc_header));
if (rv < 0)
goto out_close;
if (reply.result == BOOTHC_RLT_INVALID_ARG) {
log_info("invalid argument!");
rv = -1;
goto out_close;
}
if (reply.result == BOOTHC_RLT_REMOTE_OP) {
struct booth_node to;
int s;
memset(&to, 0, sizeof(struct booth_node));
to.family = BOOTH_PROTO_FAMILY;
strcpy(to.addr, cl.site);
s = booth_transport[TCP].open(&to);
if (s < 0)
goto out_close;
rv = booth_transport[TCP].send(s, buf, buflen);
if (rv < 0) {
booth_transport[TCP].close(s);
goto out_close;
}
rv = booth_transport[TCP].recv(s, &reply,
sizeof(struct boothc_header));
if (rv < 0) {
booth_transport[TCP].close(s);
goto out_close;
}
booth_transport[TCP].close(s);
}
if (reply.result == BOOTHC_RLT_ASYNC) {
log_info("revoke command sent, but result is async.");
rv = 0;
} else if (reply.result == BOOTHC_RLT_SYNC_SUCC) {
log_info("revoke succeeded!");
rv = 0;
} else if (reply.result == BOOTHC_RLT_SYNC_FAIL) {
log_info("revoke failed!");
rv = 0;
} else {
log_error("internal error!");
rv = -1;
}
out_close:
close(fd);
out_free:
free(buf);
out:
return rv;
}
static int lockfile(void)
{
char buf[16];
struct flock lock;
int fd, rv;
fd = open(cl.lockfile, O_CREAT|O_WRONLY, 0666);
if (fd < 0) {
log_error("lockfile open error %s: %s",
cl.lockfile, strerror(errno));
return -1;
}
lock.l_type = F_WRLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
rv = fcntl(fd, F_SETLK, &lock);
if (rv < 0) {
log_error("lockfile setlk error %s: %s",
cl.lockfile, strerror(errno));
goto fail;
}
rv = ftruncate(fd, 0);
if (rv < 0) {
log_error("lockfile truncate error %s: %s",
cl.lockfile, strerror(errno));
goto fail;
}
memset(buf, 0, sizeof(buf));
snprintf(buf, sizeof(buf), "%d\n", getpid());
rv = write(fd, buf, strlen(buf));
if (rv <= 0) {
log_error("lockfile write error %s: %s",
cl.lockfile, strerror(errno));
goto fail;
}
return fd;
fail:
close(fd);
return -1;
}
static void unlink_lockfile(int fd)
{
unlink(cl.lockfile);
close(fd);
}
static void print_usage(void)
{
printf("Usage:\n");
printf("booth <type> <operation> [options]\n");
printf("\n");
printf("Types:\n");
printf(" arbitrator: daemon running on arbitrator\n");
printf(" site: daemon running on cluster site\n");
printf(" client: command running from client\n");
printf("\n");
printf("Operations:\n");
printf("Please note that operations are valid iff type is client!\n");
printf(" list: List all the tickets\n");
printf(" grant: Grant ticket T(-t T) to site S(-s S)\n");
printf(" revoke: Revoke ticket T(-t T) from site S(-s S)\n");
printf("\n");
printf("Options:\n");
printf(" -c FILE Specify config file [default " BOOTH_DEFAULT_CONF "]\n");
printf(" -l LOCKFILE Specify lock file [default " BOOTH_DEFAULT_LOCKFILE "]\n");
printf(" -D Enable debugging to stderr and don't fork\n");
printf(" -t ticket name\n");
printf(" -s site name\n");
printf(" -f ticket attribute: force, only valid when "
"granting\n");
printf(" -h Print this help, then exit\n");
}
#define OPTION_STRING "c:Dl:t:s:fh"
static char *logging_entity = NULL;
void safe_copy(char *dest, char *value, size_t buflen, const char *description) {
if (strlen(value) >= buflen) {
fprintf(stderr, "'%s' exceeds maximum %s length of %ld\n",
value, description, buflen - 1);
exit(EXIT_FAILURE);
}
strncpy(dest, value, buflen - 1);
}
static int read_arguments(int argc, char **argv)
{
int optchar;
char *arg1 = argv[1];
char *op = NULL;
if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") ||
!strcmp(arg1, "-h")) {
print_usage();
exit(EXIT_SUCCESS);
}
if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") ||
!strcmp(arg1, "-V")) {
printf("%s %s (built %s %s)\n",
argv[0], RELEASE_VERSION, __DATE__, __TIME__);
exit(EXIT_SUCCESS);
}
if (!strcmp(arg1, "arbitrator")) {
cl.type = ACT_ARBITRATOR;
logging_entity = (char *) DAEMON_NAME "-arbitrator";
optind = 2;
} else if (!strcmp(arg1, "site")) {
cl.type = ACT_SITE;
logging_entity = (char *) DAEMON_NAME "-site";
optind = 2;
} else if (!strcmp(arg1, "client")) {
cl.type = ACT_CLIENT;
if (argc < 3) {
print_usage();
exit(EXIT_FAILURE);
}
op = argv[2];
optind = 3;
} else {
cl.type = ACT_CLIENT;
op = argv[1];
optind = 2;
}
switch (cl.type) {
case ACT_ARBITRATOR:
break;
case ACT_SITE:
break;
case ACT_CLIENT:
if (!strcmp(op, "list"))
cl.op = OP_LIST;
else if (!strcmp(op, "grant"))
cl.op = OP_GRANT;
else if (!strcmp(op, "revoke"))
cl.op = OP_REVOKE;
else {
fprintf(stderr, "client operation \"%s\" is unknown\n",
op);
exit(EXIT_FAILURE);
}
break;
}
while (optind < argc) {
optchar = getopt(argc, argv, OPTION_STRING);
switch (optchar) {
case 'c':
safe_copy(cl.configfile, optarg, sizeof(cl.configfile), "config file");
break;
case 'D':
debug_level = 1;
log_logfile_priority = LOG_DEBUG;
log_syslog_priority = LOG_DEBUG;
break;
case 'l':
safe_copy(cl.lockfile, optarg, sizeof(cl.lockfile), "lock file");
break;
case 't':
if (cl.op == OP_GRANT || cl.op == OP_REVOKE) {
safe_copy(cl.ticket, optarg, sizeof(cl.ticket), "ticket name");
} else {
print_usage();
exit(EXIT_FAILURE);
}
break;
case 's':
if (cl.op == OP_GRANT || cl.op == OP_REVOKE) {
safe_copy(cl.site, optarg, sizeof(cl.ticket), "site name");
} else {
print_usage();
exit(EXIT_FAILURE);
}
break;
case 'f':
if (cl.op == OP_GRANT)
cl.force = 1;
else {
print_usage();
exit(EXIT_FAILURE);
}
break;
case 'h':
print_usage();
exit(EXIT_SUCCESS);
break;
case ':':
case '?':
fprintf(stderr, "Please use '-h' for usage.\n");
exit(EXIT_FAILURE);
break;
default:
fprintf(stderr, "unknown option: %s\n", argv[optind]);
exit(EXIT_FAILURE);
break;
};
}
return 0;
}
static void set_scheduler(void)
{
struct sched_param sched_param;
struct rlimit rlimit;
int rv;
rlimit.rlim_cur = RLIM_INFINITY;
rlimit.rlim_max = RLIM_INFINITY;
setrlimit(RLIMIT_MEMLOCK, &rlimit);
rv = mlockall(MCL_CURRENT | MCL_FUTURE);
if (rv < 0) {
log_error("mlockall failed");
}
rv = sched_get_priority_max(SCHED_RR);
if (rv != -1) {
sched_param.sched_priority = rv;
rv = sched_setscheduler(0, SCHED_RR, &sched_param);
if (rv == -1)
log_error("could not set SCHED_RR priority %d err %d",
sched_param.sched_priority, errno);
} else {
log_error("could not get maximum scheduler priority err %d",
errno);
}
}
static void set_oom_adj(int val)
{
FILE *fp;
fp = fopen("/proc/self/oom_adj", "w");
if (!fp)
return;
fprintf(fp, "%i", val);
fclose(fp);
}
static int do_server(int type)
{
- int fd;
+ int fd = -1;
int rv = -1;
rv = setup(type);
if (rv < 0)
goto out;
if (!debug_level) {
if (daemon(0, 0) < 0) {
perror("daemon error");
exit(EXIT_FAILURE);
}
}
/*
The lock cannot be obtained before the call to daemon(), otherwise
the lockfile would contain the pid of the parent, not the daemon.
*/
fd = lockfile();
if (fd < 0)
return fd;
if (type == ARBITRATOR)
log_info("BOOTH arbitrator daemon started");
else if (type == SITE)
log_info("BOOTH cluster site daemon started");
set_scheduler();
set_oom_adj(-16);
rv = loop();
out:
unlink_lockfile(fd);
return rv;
}
static int do_client(void)
{
int rv = -1;
switch (cl.op) {
case OP_LIST:
rv = do_list();
break;
case OP_GRANT:
rv = do_grant();
break;
case OP_REVOKE:
rv = do_revoke();
break;
}
return rv;
}
int main(int argc, char *argv[])
{
int rv;
memset(&cl, 0, sizeof(cl));
strncpy(cl.configfile, BOOTH_DEFAULT_CONF, BOOTH_PATH_LEN - 1);
strncpy(cl.lockfile, BOOTH_DEFAULT_LOCKFILE, BOOTH_PATH_LEN - 1);
rv = read_arguments(argc, argv);
if (rv < 0)
goto out;
if (cl.type == ACT_CLIENT) {
cl_log_enable_stderr(TRUE);
cl_log_set_facility(0);
} else {
cl_log_set_entity(logging_entity);
cl_log_enable_stderr(debug_level ? TRUE : FALSE);
cl_log_set_facility(HA_LOG_FACILITY);
}
cl_inherit_logging_environment(0);
switch (cl.type) {
case ACT_ARBITRATOR:
rv = do_server(ARBITRATOR);
break;
case ACT_SITE:
rv = do_server(SITE);
break;
case ACT_CLIENT:
rv = do_client();
break;
}
out:
return rv ? EXIT_FAILURE : EXIT_SUCCESS;
}
diff --git a/src/paxos.c b/src/paxos.c
index c82a130..0c6512f 100644
--- a/src/paxos.c
+++ b/src/paxos.c
@@ -1,850 +1,850 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include "list.h"
#include "paxos.h"
#include "log.h"
typedef enum {
INIT = 1,
PREPARING,
PROMISING,
PROPOSING,
ACCEPTING,
RECOVERY,
COMMITTED,
} paxos_state_t;
struct proposal {
int ballot_number;
char value[0];
};
struct learned {
int ballot;
int number;
};
struct paxos_msghdr {
paxos_state_t state;
int from;
char psname[PAXOS_NAME_LEN+1];
char piname[PAXOS_NAME_LEN+1];
int ballot_number;
int reject;
int proposer_id;
unsigned int extralen;
unsigned int valuelen;
};
struct proposer {
int state;
int ballot;
int open_number;
int accepted_number;
int proposed;
struct proposal *proposal;
};
struct acceptor {
int state;
int highest_promised;
struct proposal *accepted_proposal;
};
struct learner {
int state;
int learned_max;
int learned_ballot;
struct learned learned[0];
};
struct paxos_space;
struct paxos_instance;
struct proposer_operations {
void (*prepare) (struct paxos_instance *,
int *);
void (*propose) (struct paxos_space *,
struct paxos_instance *,
void *, int);
void (*commit) (struct paxos_space *,
struct paxos_instance *,
void *, int);
};
struct acceptor_operations {
void (*promise) (struct paxos_space *,
struct paxos_instance *,
void *, int);
void (*accepted) (struct paxos_space *,
struct paxos_instance *,
void *, int);
};
struct learner_operations {
void (*response) (struct paxos_space *,
struct paxos_instance *,
void *, int);
};
struct paxos_space {
char name[PAXOS_NAME_LEN+1];
unsigned int number;
unsigned int extralen;
unsigned int valuelen;
const unsigned char *role;
const struct paxos_operations *p_op;
const struct proposer_operations *r_op;
const struct acceptor_operations *a_op;
const struct learner_operations *l_op;
struct list_head list;
struct list_head pi_head;
};
struct paxos_instance {
char name[PAXOS_NAME_LEN+1];
int round;
int *prio;
struct proposer *proposer;
struct acceptor *acceptor;
struct learner *learner;
void (*end) (pi_handle_t pih, int round, int result);
struct list_head list;
struct paxos_space *ps;
};
static LIST_HEAD(ps_head);
static int have_quorum(struct paxos_space *ps, int member)
{
int i, sum = 0;
for (i = 0; i < ps->number; i++) {
if (ps->role[i] & ACCEPTOR)
sum++;
}
if (member * 2 > sum)
return 1;
else
return 0;
}
static int next_ballot_number(struct paxos_instance *pi)
{
int ballot;
int myid = pi->ps->p_op->get_myid();
if (pi->prio)
ballot = pi->prio[myid];
else
ballot = myid;
while (ballot <= pi->round)
ballot += pi->ps->number;
return ballot;
}
static void proposer_prepare(struct paxos_instance *pi, int *round)
{
struct paxos_msghdr *hdr;
void *msg;
int msglen = sizeof(struct paxos_msghdr) + pi->ps->extralen;
int ballot;
log_debug("preposer prepare ...");
msg = malloc(msglen);
if (!msg) {
log_error("no mem for msg");
*round = -ENOMEM;
return;
}
memset(msg, 0, msglen);
hdr = msg;
if (*round > pi->round)
pi->round = *round;
ballot = next_ballot_number(pi);
pi->proposer->ballot = ballot;
hdr->state = htonl(PREPARING);
hdr->from = htonl(pi->ps->p_op->get_myid());
hdr->proposer_id = hdr->from;
strcpy(hdr->psname, pi->ps->name);
strcpy(hdr->piname, pi->name);
hdr->ballot_number = htonl(ballot);
hdr->extralen = htonl(pi->ps->extralen);
if (pi->ps->p_op->broadcast)
pi->ps->p_op->broadcast(msg, msglen);
else {
int i;
for (i = 0; i < pi->ps->number; i++) {
if (pi->ps->role[i] & ACCEPTOR)
pi->ps->p_op->send(i, msg, msglen);
}
}
free(msg);
*round = ballot;
}
static void proposer_propose(struct paxos_space *ps,
struct paxos_instance *pi,
void *msg, int msglen)
{
struct paxos_msghdr *hdr;
pi_handle_t pih = (pi_handle_t)pi;
void *extra, *value, *message;
int ballot;
log_debug("proposer propose ...");
if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) {
log_error("message length incorrect, "
"msglen: %d, msghdr len: %lu, extralen: %u",
msglen, sizeof(struct paxos_msghdr), ps->extralen);
return;
}
hdr = msg;
ballot = ntohl(hdr->ballot_number);
if (pi->proposer->ballot != ballot) {
log_debug("not the same ballot, proposer ballot: %d, "
"received ballot: %d", pi->proposer->ballot, ballot);
return;
}
if (ntohl(hdr->reject)) {
log_debug("proposal was rejected");
pi->round = ballot;
pi->proposer->state = INIT;
pi->end(pih, pi->round, -EAGAIN);
return;
}
extra = (char *)msg + sizeof(struct paxos_msghdr);
if (ps->p_op->prepare) {
if (ps->p_op->prepare(pih, extra))
pi->proposer->open_number++;
} else
pi->proposer->open_number++;
if (!have_quorum(ps, pi->proposer->open_number))
return;
if (pi->proposer->proposed)
return;
pi->proposer->proposed = 1;
value = pi->proposer->proposal->value;
if (ps->p_op->propose)
ps->p_op->propose(pih, extra, ballot, value);
hdr->valuelen = htonl(ps->valuelen);
message = malloc(msglen + ps->valuelen);
if (!message) {
log_error("no mem for value");
return;
}
memset(message, 0, msglen + ps->valuelen);
memcpy(message, msg, msglen);
memcpy((char *)message + msglen, value, ps->valuelen);
pi->proposer->state = PROPOSING;
hdr = message;
hdr->from = htonl(ps->p_op->get_myid());
hdr->state = htonl(PROPOSING);
if (ps->p_op->broadcast)
ps->p_op->broadcast(message, msglen + ps->valuelen);
else {
int i;
for (i = 0; i < ps->number; i++) {
if (ps->role[i] & ACCEPTOR)
ps->p_op->send(i, message,
msglen + ps->valuelen);
}
}
}
static void proposer_commit(struct paxos_space *ps,
struct paxos_instance *pi,
void *msg, int msglen)
{
struct paxos_msghdr *hdr;
pi_handle_t pih = (pi_handle_t)pi;
void *extra;
int ballot;
log_debug("proposer commit ...");
if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) {
log_error("message length incorrect, "
"msglen: %d, msghdr len: %lu, extralen: %u",
msglen, sizeof(struct paxos_msghdr), ps->extralen);
return;
}
extra = (char *)msg + sizeof(struct paxos_msghdr);
hdr = msg;
ballot = ntohl(hdr->ballot_number);
if (pi->proposer->ballot != ballot) {
log_debug("not the same ballot, proposer ballot: %d, "
"received ballot: %d", pi->proposer->ballot, ballot);
return;
}
pi->proposer->accepted_number++;
if (!have_quorum(ps, pi->proposer->accepted_number))
return;
pi->round = ballot;
if (ps->p_op->commit)
ps->p_op->commit(pih, extra, pi->round);
pi->proposer->state = COMMITTED;
pi->end(pih, pi->round, 0);
}
static void acceptor_promise(struct paxos_space *ps,
struct paxos_instance *pi,
void *msg, int msglen)
{
struct paxos_msghdr *hdr;
unsigned long to;
pi_handle_t pih = (pi_handle_t)pi;
void *extra;
log_debug("acceptor promise ...");
if (pi->acceptor->state == RECOVERY) {
log_debug("still in recovery");
return;
}
if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) {
log_error("message length incorrect, "
"msglen: %d, msghdr len: %lu, extralen: %u",
msglen, sizeof(struct paxos_msghdr), ps->extralen);
return;
}
hdr = msg;
extra = (char *)msg + sizeof(struct paxos_msghdr);
if (ntohl(hdr->ballot_number) < pi->acceptor->highest_promised) {
log_debug("ballot number: %d, highest promised: %d",
ntohl(hdr->ballot_number),
pi->acceptor->highest_promised);
to = ntohl(hdr->from);
hdr->from = htonl(ps->p_op->get_myid());
hdr->state = htonl(PROMISING);
hdr->reject = htonl(1);
memset(extra, 0, ps->extralen);
ps->p_op->send(to, msg, msglen);
return;
}
pi->acceptor->highest_promised = ntohl(hdr->ballot_number);
if (ps->p_op->promise)
ps->p_op->promise(pih, extra);
pi->acceptor->state = PROMISING;
to = ntohl(hdr->from);
hdr->from = htonl(ps->p_op->get_myid());
hdr->state = htonl(PROMISING);
ps->p_op->send(to, msg, msglen);
}
static void acceptor_accepted(struct paxos_space *ps,
struct paxos_instance *pi,
void *msg, int msglen)
{
struct paxos_msghdr *hdr;
unsigned long to;
pi_handle_t pih = (pi_handle_t)pi;
void *extra, *value;
int myid = ps->p_op->get_myid();
int ballot;
log_debug("acceptor accepted ...");
if (pi->acceptor->state == RECOVERY) {
log_debug("still in recovery");
return;
}
if (msglen != sizeof(struct paxos_msghdr) + ps->extralen + ps->valuelen) {
log_error("message length incorrect, msglen: "
"%d, msghdr len: %lu, extralen: %u, valuelen: %u",
msglen, sizeof(struct paxos_msghdr), ps->extralen,
ps->valuelen);
return;
}
hdr = msg;
extra = (char *)msg + sizeof(struct paxos_msghdr);
ballot = ntohl(hdr->ballot_number);
if (ballot < pi->acceptor->highest_promised) {
log_debug("ballot: %d, highest promised: %d",
ballot, pi->acceptor->highest_promised);
to = ntohl(hdr->from);
hdr->from = htonl(myid);
hdr->state = htonl(ACCEPTING);
hdr->reject = htonl(1);
ps->p_op->send(to, hdr, sizeof(struct paxos_msghdr));
return;
}
value = pi->acceptor->accepted_proposal->value;
memcpy(value, (char *)msg + sizeof(struct paxos_msghdr) + ps->extralen,
ps->valuelen);
if (ps->p_op->accepted)
ps->p_op->accepted(pih, extra, ballot, value);
pi->acceptor->state = ACCEPTING;
to = ntohl(hdr->from);
hdr->from = htonl(myid);
hdr->state = htonl(ACCEPTING);
if (ps->p_op->broadcast)
ps->p_op->broadcast(msg, sizeof(struct paxos_msghdr)
+ ps->extralen);
else {
int i;
for (i = 0; i < ps->number; i++) {
if (ps->role[i] & LEARNER)
ps->p_op->send(i, msg,
sizeof(struct paxos_msghdr)
+ ps->extralen);
}
if (!(ps->role[to] & LEARNER))
ps->p_op->send(to, msg, sizeof(struct paxos_msghdr)
+ ps->extralen);
}
}
static void learner_response(struct paxos_space *ps,
struct paxos_instance *pi,
void *msg, int msglen)
{
struct paxos_msghdr *hdr;
pi_handle_t pih = (pi_handle_t)pi;
void *extra;
- int i, unused, found = 0;
+ int i, unused = 0, found = 0;
int ballot;
log_debug("learner response ...");
if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) {
log_error("message length incorrect, "
"msglen: %d, msghdr len: %lu, extralen: %u",
msglen, sizeof(struct paxos_msghdr), ps->extralen);
return;
}
hdr = msg;
extra = (char *)msg + sizeof(struct paxos_msghdr);
ballot = ntohl(hdr->ballot_number);
for (i = 0; i < ps->number; i++) {
if (!pi->learner->learned[i].ballot) {
unused = i;
break;
}
if (pi->learner->learned[i].ballot == ballot) {
pi->learner->learned[i].number++;
if (pi->learner->learned[i].number
> pi->learner->learned_max)
pi->learner->learned_max
= pi->learner->learned[i].number;
found = 1;
break;
}
}
if (!found) {
pi->learner->learned[unused].ballot = ntohl(hdr->ballot_number);
pi->learner->learned[unused].number = 1;
}
if (!have_quorum(ps, pi->learner->learned_max))
return;
if (ps->p_op->learned)
ps->p_op->learned(pih, extra, ballot);
}
const struct proposer_operations generic_proposer_operations = {
.prepare = proposer_prepare,
.propose = proposer_propose,
.commit = proposer_commit,
};
const struct acceptor_operations generic_acceptor_operations = {
.promise = acceptor_promise,
.accepted = acceptor_accepted,
};
const struct learner_operations generic_learner_operations = {
.response = learner_response,
};
ps_handle_t paxos_space_init(const void *name,
unsigned int number,
unsigned int extralen,
unsigned int valuelen,
const unsigned char *role,
const struct paxos_operations *p_op)
{
struct paxos_space *ps;
list_for_each_entry(ps, &ps_head, list) {
if (!strcmp(ps->name, name)) {
log_info("paxos space (%s) has already been "
"initialized", (char *)name);
return -EEXIST;
}
}
if (!number || !valuelen || !p_op || !p_op->get_myid || !p_op->send) {
log_error("invalid agruments");
return -EINVAL;
}
ps = malloc(sizeof(struct paxos_space));
if (!ps) {
log_error("no mem for paxos space");
return -ENOMEM;
}
memset(ps, 0, sizeof(struct paxos_space));
strncpy(ps->name, name, PAXOS_NAME_LEN + 1);
ps->number = number;
ps->extralen = extralen;
ps->valuelen = valuelen;
ps->role = role;
ps->p_op = p_op;
ps->r_op = &generic_proposer_operations;
ps->a_op = &generic_acceptor_operations;
ps->l_op = &generic_learner_operations;
list_add_tail(&ps->list, &ps_head);
INIT_LIST_HEAD(&ps->pi_head);
return (ps_handle_t)ps;
}
pi_handle_t paxos_instance_init(ps_handle_t handle, const void *name, int *prio)
{
struct paxos_space *ps = (struct paxos_space *)handle;
struct paxos_instance *pi;
- struct proposer *proposer;
- struct acceptor *acceptor;
- struct learner *learner;
+ struct proposer *proposer = NULL;
+ struct acceptor *acceptor = NULL;
+ struct learner *learner = NULL;
int myid, valuelen, rv;
list_for_each_entry(pi, &ps->pi_head, list) {
if (!strcmp(pi->name, name))
return (pi_handle_t)pi;
}
if (handle <= 0 || !ps->p_op || !ps->p_op->get_myid) {
log_error("invalid agruments");
rv = -EINVAL;
goto out;
}
myid = ps->p_op->get_myid();
valuelen = ps->valuelen;
pi = malloc(sizeof(struct paxos_instance));
if (!pi) {
log_error("no mem for paxos instance");
rv = -ENOMEM;
goto out;
}
memset(pi, 0, sizeof(struct paxos_instance));
strncpy(pi->name, name, PAXOS_NAME_LEN + 1);
if (prio) {
pi->prio = malloc(ps->number * sizeof(int));
if (!pi->prio) {
log_error("no mem for prio");
rv = -ENOMEM;
goto out_pi;
}
memcpy(pi->prio, prio, ps->number * sizeof(int));
}
if (ps->role[myid] & PROPOSER) {
proposer = malloc(sizeof(struct proposer));
if (!proposer) {
log_error("no mem for proposer");
rv = -ENOMEM;
goto out_prio;
}
memset(proposer, 0, sizeof(struct proposer));
proposer->state = INIT;
proposer->proposal = malloc(sizeof(struct proposal) + valuelen);
if (!proposer->proposal) {
log_error("no mem for proposal");
rv = -ENOMEM;
goto out_proposer;
}
memset(proposer->proposal, 0,
sizeof(struct proposal) + valuelen);
pi->proposer = proposer;
}
if (ps->role[myid] & ACCEPTOR) {
acceptor = malloc(sizeof(struct acceptor));
if (!acceptor) {
log_error("no mem for acceptor");
rv = -ENOMEM;
goto out_proposal;
}
memset(acceptor, 0, sizeof(struct acceptor));
acceptor->state = INIT;
acceptor->accepted_proposal = malloc(sizeof(struct proposal)
+ valuelen);
if (!acceptor->accepted_proposal) {
log_error("no mem for accepted proposal");
rv = -ENOMEM;
goto out_acceptor;
}
memset(acceptor->accepted_proposal, 0,
sizeof(struct proposal) + valuelen);
pi->acceptor = acceptor;
if (ps->p_op->catchup) {
pi->acceptor->state = RECOVERY;
ps->p_op->catchup(name);
pi->acceptor->state = INIT;
}
}
if (ps->role[myid] & LEARNER) {
learner = malloc(sizeof(struct learner)
+ ps->number * sizeof(struct learned));
if (!learner) {
log_error("no mem for learner");
rv = -ENOMEM;
goto out_accepted_proposal;
}
memset(learner, 0,
sizeof(struct learner)
+ ps->number * sizeof(struct learned));
learner->state = INIT;
pi->learner = learner;
}
pi->ps = ps;
list_add_tail(&pi->list, &ps->pi_head);
return (pi_handle_t)pi;
out_accepted_proposal:
if (ps->role[myid] & ACCEPTOR)
free(acceptor->accepted_proposal);
out_acceptor:
if (ps->role[myid] & ACCEPTOR)
free(acceptor);
out_proposal:
if (ps->role[myid] & PROPOSER)
free(proposer->proposal);
out_proposer:
if (ps->role[myid] & PROPOSER)
free(proposer);
out_prio:
if (pi->prio)
free(pi->prio);
out_pi:
free(pi);
out:
return rv;
}
int paxos_round_request(pi_handle_t handle,
void *value,
int *round,
void (*end_request) (pi_handle_t handle,
int round,
int result))
{
struct paxos_instance *pi = (struct paxos_instance *)handle;
int myid = pi->ps->p_op->get_myid();
int rv = *round;
if (!(pi->ps->role[myid] & PROPOSER)) {
log_debug("only proposer can do this");
return -EOPNOTSUPP;
}
pi->proposer->state = PREPARING;
pi->proposer->open_number = 0;
pi->proposer->accepted_number = 0;
pi->proposer->proposed = 0;
memcpy(pi->proposer->proposal->value, value, pi->ps->valuelen);
pi->end = end_request;
pi->ps->r_op->prepare(pi, &rv);
return rv;
}
int paxos_recovery_status_get(pi_handle_t handle)
{
struct paxos_instance *pi = (struct paxos_instance *)handle;
int myid = pi->ps->p_op->get_myid();
if (!(pi->ps->role[myid] & ACCEPTOR))
return -EOPNOTSUPP;
if (pi->acceptor->state == RECOVERY)
return 1;
else
return 0;
}
int paxos_recovery_status_set(pi_handle_t handle, int recovery)
{
struct paxos_instance *pi = (struct paxos_instance *)handle;
int myid = pi->ps->p_op->get_myid();
if (!(pi->ps->role[myid] & ACCEPTOR))
return -EOPNOTSUPP;
if (recovery)
pi->acceptor->state = RECOVERY;
else
pi->acceptor->state = INIT;
return 0;
}
int paxos_propose(pi_handle_t handle, void *value, int round)
{
struct paxos_instance *pi = (struct paxos_instance *)handle;
struct paxos_msghdr *hdr;
void *extra, *msg;
int len = sizeof(struct paxos_msghdr)
+ pi->ps->extralen + pi->ps->valuelen;
if (round != pi->proposer->ballot) {
log_debug("round: %d, proposer ballot: %d",
round, pi->proposer->ballot);
return -EINVAL;
}
strcpy(pi->proposer->proposal->value, value);
pi->round = round;
msg = malloc(len);
if (!msg) {
log_error("no mem for msg");
return -ENOMEM;
}
memset(msg, 0, len);
hdr = msg;
hdr->state = htonl(PROPOSING);
hdr->from = htonl(pi->ps->p_op->get_myid());
hdr->proposer_id = hdr->from;
strcpy(hdr->psname, pi->ps->name);
strcpy(hdr->piname, pi->name);
hdr->ballot_number = htonl(pi->round);
hdr->extralen = htonl(pi->ps->extralen);
extra = (char *)msg + sizeof(struct paxos_msghdr);
memcpy((char *)msg + sizeof(struct paxos_msghdr) + pi->ps->extralen,
value, pi->ps->valuelen);
if (pi->ps->p_op->propose)
pi->ps->p_op->propose(handle, extra, round, value);
if (pi->ps->p_op->broadcast)
pi->ps->p_op->broadcast(msg, len);
else {
int i;
for (i = 0; i < pi->ps->number; i++) {
if (pi->ps->role[i] & ACCEPTOR)
pi->ps->p_op->send(i, msg, len);
}
}
return 0;
}
int paxos_recvmsg(void *msg, int msglen)
{
struct paxos_msghdr *hdr = msg;
struct paxos_space *ps;
struct paxos_instance *pi;
int found = 0;
int myid;
list_for_each_entry(ps, &ps_head, list) {
if (!strcmp(ps->name, hdr->psname)) {
found = 1;
break;
}
}
if (!found) {
log_error("could not find the received ps name (%s) "
"in registered list", hdr->psname);
return -EINVAL;
}
myid = ps->p_op->get_myid();
found = 0;
list_for_each_entry(pi, &ps->pi_head, list) {
if (!strcmp(pi->name, hdr->piname)) {
found = 1;
break;
}
}
if (!found)
paxos_instance_init((ps_handle_t)ps, hdr->piname, NULL);
switch (ntohl(hdr->state)) {
case PREPARING:
if (ps->role[myid] & ACCEPTOR)
ps->a_op->promise(ps, pi, msg, msglen);
break;
case PROMISING:
ps->r_op->propose(ps, pi, msg, msglen);
break;
case PROPOSING:
if (ps->role[myid] & ACCEPTOR)
ps->a_op->accepted(ps, pi, msg, msglen);
break;
case ACCEPTING:
if (ntohl(hdr->proposer_id) == myid)
ps->r_op->commit(ps, pi, msg, msglen);
else if (ps->role[myid] & LEARNER)
ps->l_op->response(ps, pi, msg, msglen);
break;
default:
log_debug("invalid message type: %d", ntohl(hdr->state));
break;
};
return 0;
}
diff --git a/src/paxos.h b/src/paxos.h
index ea8c54e..b91d9a9 100644
--- a/src/paxos.h
+++ b/src/paxos.h
@@ -1,80 +1,80 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _PAXOS_H
#define _PAXOS_H
#define PAXOS_NAME_LEN 63
#define PROPOSER 0x4
#define ACCEPTOR 0x2
#define LEARNER 0x1
-typedef int ps_handle_t;
-typedef int pi_handle_t;
+typedef long ps_handle_t;
+typedef long pi_handle_t;
struct paxos_operations {
int (*get_myid) (void);
int (*send) (unsigned long id, void *value, int len);
int (*broadcast) (void *value, int len);
int (*catchup) (const void *name);
int (*prepare) (pi_handle_t handle, void *extra);
int (*promise) (pi_handle_t handle, void *extra);
int (*propose) (pi_handle_t handle, void *extra,
int round, void *value);
int (*accepted) (pi_handle_t handle, void *extra,
int round, void *value);
int (*commit) (pi_handle_t handle, void *extra, int round);
int (*learned) (pi_handle_t handle, void *extra, int round);
};
int paxos_recvmsg(void *msg, int msglen);
ps_handle_t paxos_space_init(const void *name,
unsigned int number,
unsigned int extralen,
unsigned int valuelen,
const unsigned char *role,
const struct paxos_operations *p_op);
pi_handle_t paxos_instance_init(ps_handle_t handle,
const void *name,
int *prio);
int paxos_round_request(pi_handle_t handle,
void *value,
int *round,
void (*end_request) (pi_handle_t handle,
int round,
int result));
int paxos_round_discard(pi_handle_t handle, int round);
int paxos_leader_get(pi_handle_t handle, int *round);
int paxos_recovery_status_get(pi_handle_t handle);
int paxos_recovery_status_set(pi_handle_t handle, int recovery);
int paxos_propose(pi_handle_t handle, void *value, int round);
int paxos_instance_exit(pi_handle_t handle);
int paxos_space_exit(ps_handle_t handle);
#endif /* _PAXOS_H */
diff --git a/src/paxos_lease.c b/src/paxos_lease.c
index 9cd36f5..9b16d8c 100644
--- a/src/paxos_lease.c
+++ b/src/paxos_lease.c
@@ -1,572 +1,572 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include "paxos.h"
#include "paxos_lease.h"
#include "transport.h"
#include "config.h"
#include "timer.h"
#include "list.h"
#include "log.h"
#define PAXOS_LEASE_SPACE "paxoslease"
#define PLEASE_VALUE_LEN 1024
struct paxos_lease_msghdr {
int leased;
};
struct paxos_lease_value {
char name[PAXOS_NAME_LEN+1];
int owner;
int expiry;
};
struct lease_state {
int round;
struct paxos_lease_value *plv;
unsigned long long expires;
struct timerlist *timer;
};
struct paxos_lease {
char name[PAXOS_NAME_LEN+1];
pi_handle_t pih;
struct lease_state proposer;
struct lease_state acceptor;
int owner;
int expiry;
int relet;
int failover;
int release;
unsigned long long expires;
void (*end_lease) (pi_handle_t, int);
struct timerlist *timer;
struct list_head list;
};
static LIST_HEAD(lease_head);
static int myid = -1;
static struct paxos_operations *px_op = NULL;
const struct paxos_lease_operations *p_l_op;
ps_handle_t ps_handle = 0;
static void end_paxos_request(pi_handle_t handle, int round, int result)
{
struct paxos_lease *pl;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found) {
- log_error("cound not found the handle for paxos lease: %d",
+ log_error("cound not found the handle for paxos lease: %ld",
handle);
return;
}
if (round != pl->proposer.round) {
log_error("current paxos round is not the proposer round, "
"current round: %d, proposer round: %d",
round, pl->proposer.round);
return;
}
if (pl->end_lease)
pl->end_lease((pl_handle_t)pl, result);
return;
}
static void lease_expires(unsigned long data)
{
struct paxos_lease *pl = (struct paxos_lease *)data;
pl_handle_t plh = (pl_handle_t)pl;
struct paxos_lease_result plr;
if (pl->owner != myid) {
log_debug("lease owner is not myself");
pl->owner = -1;
strcpy(plr.name, pl->name);
plr.owner = -1;
plr.expires = 0;
p_l_op->notify(plh, &plr);
if (pl->proposer.timer)
del_timer(pl->proposer.timer);
if (pl->acceptor.timer)
del_timer(pl->acceptor.timer);
if (pl->failover)
paxos_lease_acquire(plh, 1, NULL);
} else if (pl->owner == myid && pl->relet && !pl->release) {
struct paxos_lease_value value;
log_debug("lease owner is myself, need renew");
strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1);
value.owner = myid;
value.expiry = pl->expiry;
paxos_propose(pl->pih, &value, pl->proposer.round);
} else {
log_debug("lease owner is myself");
pl->owner = -1;
strcpy(plr.name, pl->name);
plr.owner = -1;
plr.expires = 0;
p_l_op->notify(plh, &plr);
if (pl->proposer.timer)
del_timer(pl->proposer.timer);
if (pl->acceptor.timer)
del_timer(pl->acceptor.timer);
}
}
static void lease_retry(unsigned long data)
{
struct paxos_lease *pl = (struct paxos_lease *)data;
struct paxos_lease_value value;
int round;
if (pl->timer)
del_timer(pl->timer);
if (pl->owner == myid) {
log_debug("already got the lease, no need to retry");
return;
}
strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1);
value.owner = myid;
value.expiry = pl->expiry;
round = paxos_round_request(pl->pih, &value, &pl->acceptor.round,
end_paxos_request);
if (round > 0)
pl->proposer.round = round;
}
int paxos_lease_acquire(pl_handle_t handle,
int relet,
void (*end_acquire) (pl_handle_t handle, int result))
{
struct paxos_lease *pl = (struct paxos_lease *)handle;
struct paxos_lease_value value;
int round;
strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1);
value.owner = myid;
value.expiry = pl->expiry;
pl->relet = relet;
pl->end_lease = end_acquire;
pl->release = 0;
round = paxos_round_request(pl->pih, &value, &pl->acceptor.round,
end_paxos_request);
pl->timer = add_timer(1 * pl->expiry / 10, (unsigned long)pl,
lease_retry);
if (round <= 0)
return -1;
else {
pl->proposer.round = round;
return 0;
}
}
int paxos_lease_release(pl_handle_t handle)
{
struct paxos_lease *pl = (struct paxos_lease *)handle;
pl->release = 1;
return 0;
}
static int lease_catchup(const void *name)
{
struct paxos_lease *pl;
struct paxos_lease_result plr;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (!strcmp(pl->name, name)) {
found = 1;
break;
}
}
if (!found) {
log_error("could not found the lease name (%s) "
"in registered list", (char *)name);
return -1;
}
p_l_op->catchup(name, &pl->owner, &pl->expires);
log_debug("catchup result: name: %s, owner: %d, expires: %llu",
(char *)name, pl->owner, pl->expires);
if (pl->owner == -1)
return 0;
if (current_time() > pl->expires) {
plr.owner = pl->owner = -1;
plr.expires = pl->expires = 0;
strcpy(plr.name, pl->name);
p_l_op->notify((pl_handle_t)pl, &plr);
return 0;
}
if (pl->owner == myid) {
pl->acceptor.timer = add_timer(pl->expires - current_time(),
(unsigned long)pl,
lease_expires);
if (current_time() < pl->expires - 1 * pl->expiry / 5)
pl->proposer.timer = add_timer(pl->expires
- 1 * pl->expiry / 5
- current_time(),
(unsigned long)pl,
lease_expires);
} else
pl->acceptor.timer = add_timer(pl->expires - current_time(),
(unsigned long)pl,
lease_expires);
plr.owner = pl->owner;
plr.expires = pl->expires;
strcpy(plr.name, pl->name);
p_l_op->notify((pl_handle_t)pl, &plr);
return 0;
}
static int lease_prepared(pi_handle_t handle __attribute__((unused)),
void *header)
{
struct paxos_lease_msghdr *hdr = header;
if (hdr->leased) {
log_debug("already leased");
return 0;
} else {
log_debug("not leased");
return 1;
}
}
static int handle_lease_request(pi_handle_t handle, void *header)
{
struct paxos_lease_msghdr *hdr;
struct paxos_lease *pl;
int found = 0;
hdr = header;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found) {
- log_error("could not find the lease handle: %d", handle);
+ log_error("could not find the lease handle: %ld", handle);
return -1;
}
if (pl->owner == -1) {
log_debug("has not been leased");
hdr->leased = 0;
} else {
log_debug("has been leased");
hdr->leased = 1;
}
return 0;
}
static int lease_propose(pi_handle_t handle,
void *extra __attribute__((unused)),
int round, void *value)
{
struct paxos_lease *pl;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found) {
- log_error("could not find the lease handle: %d", handle);
+ log_error("could not find the lease handle: %ld", handle);
return -1;
}
if (round != pl->proposer.round) {
log_error("current round is not the proposer round, "
"current round: %d, proposer round: %d",
round, pl->proposer.round);
return -1;
}
if (!pl->proposer.plv) {
pl->proposer.plv = malloc(sizeof(struct paxos_lease_value));
if (!pl->proposer.plv) {
log_error("could not alloc mem for propsoer plv");
return -ENOMEM;
}
}
memcpy(pl->proposer.plv, value, sizeof(struct paxos_lease_value));
if (pl->proposer.timer)
del_timer(pl->proposer.timer);
if (pl->relet) {
pl->proposer.timer = add_timer(4 * pl->expiry / 5,
(unsigned long)pl,
lease_expires);
pl->proposer.expires = current_time() + 4 * pl->expiry / 5;
} else {
pl->proposer.timer = add_timer(pl->expiry, (unsigned long)pl,
lease_expires);
pl->proposer.expires = current_time() + pl->expiry;
}
return 0;
}
static int lease_accepted(pi_handle_t handle,
void *extra __attribute__((unused)),
int round, void *value)
{
struct paxos_lease *pl;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found) {
- log_error("could not find the lease handle: %d", handle);
+ log_error("could not find the lease handle: %ld", handle);
return -1;
}
pl->acceptor.round = round;
if (!pl->acceptor.plv) {
pl->acceptor.plv = malloc(sizeof(struct paxos_lease_value));
if (!pl->acceptor.plv) {
log_error("could not alloc mem for acceptor plv");
return -ENOMEM;
}
}
memcpy(pl->acceptor.plv, value, sizeof(struct paxos_lease_value));
if (pl->acceptor.timer)
del_timer(pl->acceptor.timer);
pl->acceptor.timer = add_timer(pl->expiry, (unsigned long)pl,
lease_expires);
pl->acceptor.expires = current_time() + pl->expiry;
return 0;
}
static int lease_commit(pi_handle_t handle,
void *extra __attribute__((unused)),
int round)
{
struct paxos_lease *pl;
struct paxos_lease_result plr;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found) {
- log_error("could not find the lease handle: %d", handle);
+ log_error("could not find the lease handle: %ld", handle);
return -1;
}
if (round != pl->proposer.round) {
log_error("current round is not the proposer round, "
"current round: %d, proposer round: %d",
round, pl->proposer.round);
return -1;
}
pl->owner = pl->proposer.plv->owner;
pl->expiry = pl->proposer.plv->expiry;
strcpy(plr.name, pl->proposer.plv->name);
plr.owner = pl->proposer.plv->owner;
plr.expires = current_time() + pl->proposer.plv->expiry;
p_l_op->notify((pl_handle_t)pl, &plr);
return 0;
}
static int lease_learned(pi_handle_t handle,
void *extra __attribute__((unused)),
int round)
{
struct paxos_lease *pl;
struct paxos_lease_result plr;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found) {
- log_error("could not find the lease handle: %d", handle);
+ log_error("could not find the lease handle: %ld", handle);
return -1;
}
if (round != pl->acceptor.round) {
log_error("current round is not the proposer round, "
"current round: %d, proposer round: %d",
round, pl->proposer.round);
return -1;
}
pl->owner = pl->acceptor.plv->owner;
pl->expiry = pl->acceptor.plv->expiry;
strcpy(plr.name, pl->acceptor.plv->name);
plr.owner = pl->acceptor.plv->owner;
plr.expires = current_time() + pl->acceptor.plv->expiry;
p_l_op->notify((pl_handle_t)pl, &plr);
return 0;
}
pl_handle_t paxos_lease_init(const void *name,
unsigned int namelen,
int expiry,
int number,
int failover,
unsigned char *role,
int *prio,
const struct paxos_lease_operations *pl_op)
{
ps_handle_t psh;
pi_handle_t pih;
struct paxos_lease *lease;
if (namelen > PAXOS_NAME_LEN) {
log_error("length of paxos name is too long (%u)", namelen);
return -EINVAL;
}
if (myid == -1)
myid = pl_op->get_myid();
if (!ps_handle) {
px_op = malloc(sizeof(struct paxos_operations));
if (!px_op) {
log_error("could not alloc for paxos operations");
return -ENOMEM;
}
memset(px_op, 0, sizeof(struct paxos_operations));
px_op->get_myid = pl_op->get_myid;
px_op->send = pl_op->send;
px_op->broadcast = pl_op->broadcast;
px_op->catchup = lease_catchup;
px_op->prepare = lease_prepared;
px_op->promise = handle_lease_request;
px_op->propose = lease_propose;
px_op->accepted = lease_accepted;
px_op->commit = lease_commit;
px_op->learned = lease_learned;
p_l_op = pl_op;
psh = paxos_space_init(PAXOS_LEASE_SPACE,
number,
sizeof(struct paxos_lease_msghdr),
PLEASE_VALUE_LEN,
role,
px_op);
if (psh <= 0) {
- log_error("failed to initialize paxos space: %d", psh);
+ log_error("failed to initialize paxos space: %ld", psh);
free(px_op);
px_op = NULL;
return psh;
}
ps_handle = psh;
}
lease = malloc(sizeof(struct paxos_lease));
if (!lease) {
log_error("cound not alloc for paxos lease");
return -ENOMEM;
}
memset(lease, 0, sizeof(struct paxos_lease));
strncpy(lease->name, name, PAXOS_NAME_LEN + 1);
lease->owner = -1;
lease->expiry = expiry;
lease->failover = failover;
list_add_tail(&lease->list, &lease_head);
pih = paxos_instance_init(ps_handle, name, prio);
if (pih <= 0) {
- log_error("failed to initialize paxos instance: %d", pih);
+ log_error("failed to initialize paxos instance: %ld", pih);
free(lease);
return pih;
}
lease->pih = pih;
return (pl_handle_t)lease;
}
int paxos_lease_on_receive(void *msg, int msglen)
{
return paxos_recvmsg(msg, msglen);
}
int paxos_lease_exit(pl_handle_t handle)
{
struct paxos_lease *pl = (struct paxos_lease *)handle;
if (px_op)
free(px_op);
if (pl->proposer.plv)
free(pl->proposer.plv);
if (pl->proposer.timer)
del_timer(pl->proposer.timer);
if (pl->acceptor.plv)
free(pl->acceptor.plv);
if (pl->acceptor.timer)
del_timer(pl->acceptor.timer);
return 0;
}
diff --git a/src/paxos_lease.h b/src/paxos_lease.h
index 498070c..8a03d97 100644
--- a/src/paxos_lease.h
+++ b/src/paxos_lease.h
@@ -1,66 +1,66 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _PAXOS_LEASE_H
#define _PAXOS_LEASE_H
#define PLEASE_NAME_LEN 63
-typedef int pl_handle_t;
+typedef long pl_handle_t;
struct paxos_lease_result {
char name[PLEASE_NAME_LEN+1];
int owner;
unsigned long long expires;
};
struct paxos_lease_operations {
int (*get_myid) (void);
int (*send) (unsigned long id, void *value, int len);
int (*broadcast) (void *value, int len);
int (*catchup) (const void *name, int *owner,
unsigned long long *expires);
int (*notify) (pl_handle_t handle, struct paxos_lease_result *result);
};
pl_handle_t paxos_lease_init(const void *name,
unsigned int namelen,
int expiry,
int number,
int failover,
unsigned char *role,
int *prio,
const struct paxos_lease_operations *pl_op);
int paxos_lease_on_receive(void *msg, int msglen);
int paxos_lease_acquire(pl_handle_t handle,
int relet,
void (*end_acquire) (pl_handle_t handle, int result));
/*
int paxos_lease_owner_get(const void *name);
int paxos_lease_epoch_get(const void *name);
int paxos_lease_timeout(const void *name);
*/
int paxos_lease_release(pl_handle_t handle);
int paxos_lease_exit(pl_handle_t handle);
#endif /* _PAXOS_LEASE_H */
diff --git a/src/ticket.c b/src/ticket.c
index 26e501b..fa09fed 100644
--- a/src/ticket.c
+++ b/src/ticket.c
@@ -1,397 +1,397 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <stdio.h>
#include "ticket.h"
#include "config.h"
#include "pacemaker.h"
#include "list.h"
#include "log.h"
#include "paxos_lease.h"
#include "paxos.h"
#define PAXOS_MAGIC 0xDB12
#define TK_LINE 256
struct booth_msghdr {
uint16_t magic;
uint16_t checksum;
uint32_t len;
} __attribute__((packed));
struct ticket {
char id[BOOTH_NAME_LEN+1];
pl_handle_t handle;
int owner;
int expiry;
unsigned long long expires;
struct list_head list;
};
static LIST_HEAD(ticket_list);
static unsigned char *role;
int check_ticket(char *ticket)
{
int i;
if (!booth_conf)
return 0;
for (i = 0; i < booth_conf->ticket_count; i++) {
if (!strcmp(booth_conf->ticket[i].name, ticket))
return 1;
}
return 0;
}
int check_site(char *site, int *local)
{
int i;
if (!booth_conf)
return 0;
for (i = 0; i < booth_conf->node_count; i++) {
if (booth_conf->node[i].type == SITE
&& !strcmp(booth_conf->node[i].addr, site)) {
*local = booth_conf->node[i].local;
return 1;
}
}
return 0;
}
static int * ticket_priority(int i)
{
int j;
/* TODO: need more precise check */
for (j = 0; j < booth_conf->node_count; j++) {
if (booth_conf->ticket[i].weight[j] == 0)
return NULL;
}
return booth_conf->ticket[i].weight;
}
static int ticket_get_myid(void)
{
return booth_transport[booth_conf->proto].get_myid();
}
static void end_acquire(pl_handle_t handle, int result)
{
struct ticket *tk;
int found = 0;
if (result == 0) {
list_for_each_entry(tk, &ticket_list, list) {
if (tk->handle == handle) {
tk->owner = ticket_get_myid();
found = 1;
break;
}
}
if (!found)
- log_error("BUG: ticket handle %d does not exist",
+ log_error("BUG: ticket handle %ld does not exist",
handle);
log_info("ticket %s acquired", tk->id);
log_info("ticket %s granted to local (id %d)", tk->id,
ticket_get_myid());
}
}
static int ticket_send(unsigned long id, void *value, int len)
{
int i, rv = -1;
struct booth_node *to = NULL;
struct booth_msghdr *hdr;
void *buf;
for (i = 0; i < booth_conf->node_count; i++) {
if (booth_conf->node[i].nodeid == id)
to = &booth_conf->node[i];
}
if (!to)
return rv;
buf = malloc(sizeof(struct booth_msghdr) + len);
if (!buf)
return -ENOMEM;
memset(buf, 0, sizeof(struct booth_msghdr) + len);
hdr = buf;
hdr->magic = htons(PAXOS_MAGIC);
hdr->len = htonl(sizeof(struct booth_msghdr) + len);
memcpy((char *)buf + sizeof(struct booth_msghdr), value, len);
rv = booth_transport[booth_conf->proto].send(
(unsigned long)to, buf, sizeof(struct booth_msghdr) + len);
free(buf);
return rv;
}
static int ticket_broadcast(void *value, int len)
{
void *buf;
struct booth_msghdr *hdr;
int rv;
buf = malloc(sizeof(struct booth_msghdr) + len);
if (!buf)
return -ENOMEM;
memset(buf, 0, sizeof(struct booth_msghdr) + len);
hdr = buf;
hdr->magic = htons(PAXOS_MAGIC);
hdr->len = htonl(sizeof(struct booth_msghdr) + len);
memcpy((char *)buf + sizeof(struct booth_msghdr), value, len);
rv = booth_transport[booth_conf->proto].broadcast(
buf, sizeof(struct booth_msghdr) + len);
free(buf);
return rv;
}
static int ticket_read(const void *name, int *owner,
unsigned long long *expires)
{
struct ticket *tk;
int found = 0;
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, name)) {
found = 1;
break;
}
}
if (!found) {
log_error("BUG: ticket_read failed (ticket %s does not exist)",
(char *)name);
return -1;
}
pcmk_handler.load_ticket(tk->id, &tk->owner, &tk->expires);
*owner = tk->owner;
*expires = tk->expires;
return 0;
}
static int ticket_write(pl_handle_t handle, struct paxos_lease_result *result)
{
struct ticket *tk;
int found = 0;
list_for_each_entry(tk, &ticket_list, list) {
if (tk->handle == handle) {
found = 1;
break;
}
}
if (!found) {
log_error("BUG: ticket_write failed "
- "(ticket handle %d does not exist)", handle);
+ "(ticket handle %ld does not exist)", handle);
return -1;
}
tk->owner = result->owner;
tk->expires = result->expires;
if (tk->owner == ticket_get_myid()) {
pcmk_handler.store_ticket(tk->id, tk->owner, tk->expires);
pcmk_handler.grant_ticket(tk->id);
} else if (tk->owner == -1) {
pcmk_handler.store_ticket(tk->id, tk->owner, tk->expires);
pcmk_handler.revoke_ticket(tk->id);
} else
pcmk_handler.store_ticket(tk->id, tk->owner, tk->expires);
return 0;
}
int ticket_recv(void *msg, int msglen)
{
struct booth_msghdr *hdr;
char *data;
hdr = msg;
if (ntohs(hdr->magic) != PAXOS_MAGIC ||
ntohl(hdr->len) != msglen) {
log_error("message received error");
return -1;
}
data = (char *)msg + sizeof(struct booth_msghdr);
return paxos_lease_on_receive(data,
msglen - sizeof(struct booth_msghdr));
}
int grant_ticket(char *ticket, int force)
{
struct ticket *tk;
int found = 0;
if (force) {
pcmk_handler.store_ticket(ticket, ticket_get_myid(), -1);
pcmk_handler.grant_ticket(ticket);
return BOOTHC_RLT_SYNC_SUCC;
}
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, ticket)) {
found = 1;
break;
}
}
if (!found) {
log_error("ticket %s does not exist", ticket);
return BOOTHC_RLT_SYNC_FAIL;
}
if (tk->owner == ticket_get_myid())
return BOOTHC_RLT_SYNC_SUCC;
else {
paxos_lease_acquire(tk->handle, 1, end_acquire);
return BOOTHC_RLT_ASYNC;
}
}
int revoke_ticket(char *ticket, int force)
{
struct ticket *tk;
int found = 0;
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, ticket)) {
found = 1;
break;
}
}
if (!found) {
log_error("ticket %s does not exist", ticket);
return BOOTHC_RLT_SYNC_FAIL;
}
if (force) {
pcmk_handler.store_ticket(tk->id, -1, 0);
pcmk_handler.revoke_ticket(tk->id);
}
if (tk->owner == -1)
return BOOTHC_RLT_SYNC_SUCC;
else {
paxos_lease_release(tk->handle);
return BOOTHC_RLT_ASYNC;
}
}
int list_ticket(char **pdata, unsigned int *len)
{
struct ticket *tk;
char tmp[TK_LINE];
*pdata = NULL;
*len = 0;
list_for_each_entry(tk, &ticket_list, list) {
memset(tmp, 0, TK_LINE);
snprintf(tmp, TK_LINE, "ticket: %s, owner: %d, expires: %llu\n",
tk->id, tk->owner, tk->expires);
*pdata = realloc(*pdata, *len + TK_LINE);
if (*pdata == NULL)
return -ENOMEM;
memset(*pdata + *len, 0, TK_LINE);
memcpy(*pdata + *len, tmp, TK_LINE);
*len += TK_LINE;
}
return 0;
}
const struct paxos_lease_operations ticket_operations = {
.get_myid = ticket_get_myid,
.send = ticket_send,
.broadcast = ticket_broadcast,
.catchup = ticket_read,
.notify = ticket_write,
};
int setup_ticket(void)
{
struct ticket *tk, *tmp;
int i, rv;
pl_handle_t plh;
role = malloc(booth_conf->node_count * sizeof(unsigned char));
if (!role)
return -ENOMEM;
memset(role, 0, booth_conf->node_count * sizeof(unsigned char));
for (i = 0; i < booth_conf->node_count; i++) {
if (booth_conf->node[i].type == SITE)
role[i] = PROPOSER | ACCEPTOR | LEARNER;
else if (booth_conf->node[i].type == ARBITRATOR)
role[i] = ACCEPTOR | LEARNER;
}
for (i = 0; i < booth_conf->ticket_count; i++) {
tk = malloc(sizeof(struct ticket));
if (!tk) {
rv = -ENOMEM;
goto out;
}
memset(tk, 0, sizeof(struct ticket));
strcpy(tk->id, booth_conf->ticket[i].name);
tk->owner = -1;
tk->expiry = booth_conf->ticket[i].expiry;
if (!tk->expiry)
tk->expiry = DEFAULT_TICKET_EXPIRY;
list_add_tail(&tk->list, &ticket_list);
plh = paxos_lease_init(tk->id,
BOOTH_NAME_LEN,
tk->expiry,
booth_conf->node_count,
1,
role,
ticket_priority(i),
&ticket_operations);
if (plh <= 0) {
log_error("paxos lease initialization failed");
rv = plh;
goto out;
}
tk->handle = plh;
}
return 0;
out:
list_for_each_entry_safe(tk, tmp, &ticket_list, list) {
list_del(&tk->list);
}
free(role);
return rv;
}

File Metadata

Mime Type
text/x-diff
Expires
Thu, Jul 10, 1:55 AM (1 d, 9 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2009607
Default Alt Text
(72 KB)

Event Timeline