Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F4639015
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
72 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/src/main.c b/src/main.c
index a78d732..819ce4d 100644
--- a/src/main.c
+++ b/src/main.c
@@ -1,1087 +1,1087 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sched.h>
#include <errno.h>
#include <limits.h>
#include <sys/file.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/poll.h>
#include <fcntl.h>
#include <string.h>
#include <assert.h>
#include "log.h"
#include "booth.h"
#include "config.h"
#include "transport.h"
#include "timer.h"
#include "pacemaker.h"
#include "ticket.h"
#include <error.h>
#define RELEASE_VERSION "1.0"
#define CLIENT_NALLOC 32
int log_logfile_priority = LOG_INFO;
int log_syslog_priority = LOG_ERR;
int log_stderr_priority = LOG_ERR;
static int client_maxi;
static int client_size = 0;
struct client *client = NULL;
struct pollfd *pollfd = NULL;
int poll_timeout = -1;
typedef enum {
ACT_ARBITRATOR = 1,
ACT_SITE,
ACT_CLIENT,
} booth_role_t;
typedef enum {
OP_LIST = 1,
OP_GRANT,
OP_REVOKE,
} operation_t;
struct command_line {
int type; /* ACT_ */
int op; /* OP_ */
int force;
char configfile[BOOTH_PATH_LEN];
char lockfile[BOOTH_PATH_LEN];
char site[BOOTH_NAME_LEN];
char ticket[BOOTH_NAME_LEN];
};
static struct command_line cl;
int do_read(int fd, void *buf, size_t count)
{
int rv, off = 0;
while (off < count) {
rv = read(fd, (char *)buf + off, count - off);
if (rv == 0)
return -1;
if (rv == -1 && errno == EINTR)
continue;
if (rv == -1)
return -1;
off += rv;
}
return 0;
}
int do_write(int fd, void *buf, size_t count)
{
int rv, off = 0;
retry:
rv = write(fd, (char *)buf + off, count);
if (rv == -1 && errno == EINTR)
goto retry;
if (rv < 0) {
log_error("write errno %d", errno);
return rv;
}
if (rv != count) {
count -= rv;
off += rv;
goto retry;
}
return 0;
}
static int do_connect(const char *sock_path)
{
struct sockaddr_un sun;
socklen_t addrlen;
int rv, fd;
fd = socket(PF_UNIX, SOCK_STREAM, 0);
if (fd < 0)
goto out;
memset(&sun, 0, sizeof(sun));
sun.sun_family = AF_UNIX;
strcpy(&sun.sun_path[1], sock_path);
addrlen = sizeof(sa_family_t) + strlen(sun.sun_path+1) + 1;
rv = connect(fd, (struct sockaddr *) &sun, addrlen);
if (rv < 0) {
close(fd);
fd = rv;
}
out:
return fd;
}
static void init_header(struct boothc_header *h, int cmd, int option,
int result, int data_len)
{
memset(h, 0, sizeof(struct boothc_header));
h->magic = BOOTHC_MAGIC;
h->version = BOOTHC_VERSION;
h->len = data_len;
h->cmd = cmd;
h->option = option;
h->result = result;
}
static void client_alloc(void)
{
int i;
if (!client) {
client = malloc(CLIENT_NALLOC * sizeof(struct client));
pollfd = malloc(CLIENT_NALLOC * sizeof(struct pollfd));
} else {
client = realloc(client, (client_size + CLIENT_NALLOC) *
sizeof(struct client));
pollfd = realloc(pollfd, (client_size + CLIENT_NALLOC) *
sizeof(struct pollfd));
if (!pollfd)
log_error("can't alloc for pollfd");
}
if (!client || !pollfd)
log_error("can't alloc for client array");
for (i = client_size; i < client_size + CLIENT_NALLOC; i++) {
client[i].workfn = NULL;
client[i].deadfn = NULL;
client[i].fd = -1;
pollfd[i].fd = -1;
pollfd[i].revents = 0;
}
client_size += CLIENT_NALLOC;
}
static void client_dead(int ci)
{
close(client[ci].fd);
client[ci].workfn = NULL;
client[ci].fd = -1;
pollfd[ci].fd = -1;
}
int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci))
{
int i;
if (!client)
client_alloc();
again:
for (i = 0; i < client_size; i++) {
if (client[i].fd == -1) {
client[i].workfn = workfn;
if (deadfn)
client[i].deadfn = deadfn;
else
client[i].deadfn = client_dead;
client[i].fd = fd;
pollfd[i].fd = fd;
pollfd[i].events = POLLIN;
if (i > client_maxi)
client_maxi = i;
return i;
}
}
client_alloc();
goto again;
}
static int setup_listener(const char *sock_path)
{
struct sockaddr_un addr;
socklen_t addrlen;
int rv, s;
s = socket(AF_LOCAL, SOCK_STREAM, 0);
if (s < 0) {
log_error("socket error %d %d", s, errno);
return s;
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_LOCAL;
strcpy(&addr.sun_path[1], sock_path);
addrlen = sizeof(sa_family_t) + strlen(addr.sun_path+1) + 1;
rv = bind(s, (struct sockaddr *) &addr, addrlen);
if (rv < 0) {
log_error("bind error %d %d", rv, errno);
close(s);
return rv;
}
rv = listen(s, 5);
if (rv < 0) {
log_error("listen error %d %d", rv, errno);
close(s);
return rv;
}
return s;
}
void process_connection(int ci)
{
struct boothc_header h;
char *data = NULL;
char *site, *ticket;
int local, rv;
void (*deadfn) (int ci);
rv = do_read(client[ci].fd, &h, sizeof(h));
if (rv < 0) {
log_error("connection %d read error %d", ci, rv);
if (errno == ECONNRESET)
log_debug("client %d aborted conection fd %d", ci, client[ci].fd);
else
log_debug("client %d closed connnection fd %d", ci, client[ci].fd);
deadfn = client[ci].deadfn;
if(deadfn) {
log_debug("run deadfn");
deadfn(ci);
}
return;
}
if (h.magic != BOOTHC_MAGIC) {
log_error("connection %d magic error %x", ci, h.magic);
return;
}
if (h.version != BOOTHC_VERSION) {
log_error("connection %d version error %x", ci, h.version);
return;
}
if (h.len) {
data = malloc(h.len);
if (!data) {
log_error("process_connection no mem %u", h.len);
return;
}
memset(data, 0, h.len);
rv = do_read(client[ci].fd, data, h.len);
if (rv < 0) {
log_error("connection %d read data error %d", ci, rv);
goto out;
}
h.len = 0;
}
switch (h.cmd) {
case BOOTHC_CMD_LIST:
assert(!data);
h.result = list_ticket(&data, &h.len);
break;
case BOOTHC_CMD_GRANT:
site = data;
ticket = data + BOOTH_NAME_LEN;
if (!check_ticket(ticket)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (!check_site(site, &local)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (local)
h.result = grant_ticket(ticket, h.option);
else
h.result = BOOTHC_RLT_REMOTE_OP;
break;
case BOOTHC_CMD_REVOKE:
site = data;
ticket = data + BOOTH_NAME_LEN;
if (!check_ticket(ticket)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (!check_site(site, &local)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (local)
h.result = revoke_ticket(ticket, h.option);
else
h.result = BOOTHC_RLT_REMOTE_OP;
break;
default:
log_error("connection %d cmd %x unknown", ci, h.cmd);
break;
}
reply:
rv = do_write(client[ci].fd, &h, sizeof(h));
if (rv < 0)
log_error("connection %d write error %d", ci, rv);
if (h.len) {
rv = do_write(client[ci].fd, data, h.len);
if (rv < 0)
log_error("connection %d write error %d", ci, rv);
}
out:
free(data);
}
static void process_listener(int ci)
{
int fd, i;
fd = accept(client[ci].fd, NULL, NULL);
if (fd < 0) {
log_error("process_listener: accept error %d %d", fd, errno);
return;
}
i = client_add(fd, process_connection, NULL);
log_debug("add client connection %d fd %d", i, fd);
}
static int setup_config(int type)
{
int rv;
rv = read_config(cl.configfile);
if (rv < 0)
goto out;
rv = check_config(type);
if (rv < 0)
goto out;
out:
return rv;
}
static int setup_transport(void)
{
int rv;
transport_layer_t proto = booth_conf->proto;
rv = booth_transport[proto].init(ticket_recv);
if (rv < 0) {
log_error("failed to init booth_transport[%d]", proto);
goto out;
}
rv = booth_transport[TCP].init(NULL);
if (rv < 0) {
log_error("failed to init booth_transport[TCP]");
goto out;
}
out:
return rv;
}
static int setup_timer(void)
{
return timerlist_init();
}
static int setup(int type)
{
int rv;
rv = setup_config(type);
if (rv < 0)
goto fail;
rv = setup_timer();
if (rv < 0)
goto fail;
rv = setup_transport();
if (rv < 0)
goto fail;
rv = setup_ticket();
if (rv < 0)
goto fail;
rv = setup_listener(BOOTHC_SOCK_PATH);
if (rv < 0)
goto fail;
client_add(rv, process_listener, NULL);
return 0;
fail:
return -1;
}
static int loop(void)
{
void (*workfn) (int ci);
void (*deadfn) (int ci);
int rv, i;
while (1) {
rv = poll(pollfd, client_maxi + 1, poll_timeout);
if (rv == -1 && errno == EINTR)
continue;
if (rv < 0) {
log_error("poll errno %d", errno);
goto fail;
}
for (i = 0; i <= client_maxi; i++) {
if (client[i].fd < 0)
continue;
if (pollfd[i].revents & POLLIN) {
workfn = client[i].workfn;
if (workfn)
workfn(i);
}
if (pollfd[i].revents &
(POLLERR | POLLHUP | POLLNVAL)) {
deadfn = client[i].deadfn;
if (deadfn)
deadfn(i);
}
}
process_timerlist();
}
return 0;
fail:
return -1;
}
static int do_list(void)
{
struct boothc_header h, *rh;
char *reply = NULL, *data;
int data_len;
int fd, rv;
init_header(&h, BOOTHC_CMD_LIST, 0, 0, 0);
fd = do_connect(BOOTHC_SOCK_PATH);
if (fd < 0) {
rv = fd;
goto out;
}
rv = do_write(fd, &h, sizeof(h));
if (rv < 0)
goto out_close;
reply = malloc(sizeof(struct boothc_header));
if (!reply) {
rv = -ENOMEM;
goto out_close;
}
rv = do_read(fd, reply, sizeof(struct boothc_header));
if (rv < 0)
goto out_free;
rh = (struct boothc_header *)reply;
data_len = rh->len;
reply = realloc(reply, sizeof(struct boothc_header) + data_len);
if (!reply) {
rv = -ENOMEM;
goto out_free;
}
data = reply + sizeof(struct boothc_header);
rv = do_read(fd, data, data_len);
if (rv < 0)
goto out_free;
do_write(STDOUT_FILENO, data, data_len);
rv = 0;
out_free:
free(reply);
out_close:
close(fd);
out:
return rv;
}
static int do_grant(void)
{
char *buf;
struct boothc_header *h, reply;
int buflen;
uint32_t force = 0;
int fd, rv;
buflen = sizeof(struct boothc_header) +
sizeof(cl.site) + sizeof(cl.ticket);
buf = malloc(buflen);
if (!buf) {
rv = -ENOMEM;
goto out;
}
h = (struct boothc_header *)buf;
if (cl.force)
force = BOOTHC_OPT_FORCE;
init_header(h, BOOTHC_CMD_GRANT, force, 0,
sizeof(cl.site) + sizeof(cl.ticket));
strcpy(buf + sizeof(struct boothc_header), cl.site);
strcpy(buf + sizeof(struct boothc_header) + sizeof(cl.site), cl.ticket);
fd = do_connect(BOOTHC_SOCK_PATH);
if (fd < 0) {
rv = fd;
goto out_free;
}
rv = do_write(fd, buf, buflen);
if (rv < 0)
goto out_close;
rv = do_read(fd, &reply, sizeof(struct boothc_header));
if (rv < 0)
goto out_close;
if (reply.result == BOOTHC_RLT_INVALID_ARG) {
log_info("invalid argument!");
rv = -1;
goto out_close;
}
if (reply.result == BOOTHC_RLT_REMOTE_OP) {
struct booth_node to;
int s;
memset(&to, 0, sizeof(struct booth_node));
to.family = BOOTH_PROTO_FAMILY;
strcpy(to.addr, cl.site);
s = booth_transport[TCP].open(&to);
if (s < 0)
goto out_close;
rv = booth_transport[TCP].send(s, buf, buflen);
if (rv < 0) {
booth_transport[TCP].close(s);
goto out_close;
}
rv = booth_transport[TCP].recv(s, &reply,
sizeof(struct boothc_header));
if (rv < 0) {
booth_transport[TCP].close(s);
goto out_close;
}
booth_transport[TCP].close(s);
}
if (reply.result == BOOTHC_RLT_ASYNC) {
log_info("grant command sent, but result is async.");
rv = 0;
} else if (reply.result == BOOTHC_RLT_SYNC_SUCC) {
log_info("grant succeeded!");
rv = 0;
} else if (reply.result == BOOTHC_RLT_SYNC_FAIL) {
log_info("grant failed!");
rv = 0;
} else {
log_error("internal error!");
rv = -1;
}
out_close:
close(fd);
out_free:
free(buf);
out:
return rv;
}
static int do_revoke(void)
{
char *buf;
struct boothc_header *h, reply;
int buflen;
uint32_t force = 0;
int fd, rv;
buflen = sizeof(struct boothc_header) +
sizeof(cl.site) + sizeof(cl.ticket);
buf = malloc(buflen);
if (!buf) {
rv = -ENOMEM;
goto out;
}
h = (struct boothc_header *)buf;
if (cl.force)
force = BOOTHC_OPT_FORCE;
init_header(h, BOOTHC_CMD_REVOKE, force, 0,
sizeof(cl.site) + sizeof(cl.ticket));
strcpy(buf + sizeof(struct boothc_header), cl.site);
strcpy(buf + sizeof(struct boothc_header) + sizeof(cl.site), cl.ticket);
fd = do_connect(BOOTHC_SOCK_PATH);
if (fd < 0) {
rv = fd;
goto out_free;
}
rv = do_write(fd, buf, buflen);
if (rv < 0)
goto out_close;
rv = do_read(fd, &reply, sizeof(struct boothc_header));
if (rv < 0)
goto out_close;
if (reply.result == BOOTHC_RLT_INVALID_ARG) {
log_info("invalid argument!");
rv = -1;
goto out_close;
}
if (reply.result == BOOTHC_RLT_REMOTE_OP) {
struct booth_node to;
int s;
memset(&to, 0, sizeof(struct booth_node));
to.family = BOOTH_PROTO_FAMILY;
strcpy(to.addr, cl.site);
s = booth_transport[TCP].open(&to);
if (s < 0)
goto out_close;
rv = booth_transport[TCP].send(s, buf, buflen);
if (rv < 0) {
booth_transport[TCP].close(s);
goto out_close;
}
rv = booth_transport[TCP].recv(s, &reply,
sizeof(struct boothc_header));
if (rv < 0) {
booth_transport[TCP].close(s);
goto out_close;
}
booth_transport[TCP].close(s);
}
if (reply.result == BOOTHC_RLT_ASYNC) {
log_info("revoke command sent, but result is async.");
rv = 0;
} else if (reply.result == BOOTHC_RLT_SYNC_SUCC) {
log_info("revoke succeeded!");
rv = 0;
} else if (reply.result == BOOTHC_RLT_SYNC_FAIL) {
log_info("revoke failed!");
rv = 0;
} else {
log_error("internal error!");
rv = -1;
}
out_close:
close(fd);
out_free:
free(buf);
out:
return rv;
}
static int lockfile(void)
{
char buf[16];
struct flock lock;
int fd, rv;
fd = open(cl.lockfile, O_CREAT|O_WRONLY, 0666);
if (fd < 0) {
log_error("lockfile open error %s: %s",
cl.lockfile, strerror(errno));
return -1;
}
lock.l_type = F_WRLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
rv = fcntl(fd, F_SETLK, &lock);
if (rv < 0) {
log_error("lockfile setlk error %s: %s",
cl.lockfile, strerror(errno));
goto fail;
}
rv = ftruncate(fd, 0);
if (rv < 0) {
log_error("lockfile truncate error %s: %s",
cl.lockfile, strerror(errno));
goto fail;
}
memset(buf, 0, sizeof(buf));
snprintf(buf, sizeof(buf), "%d\n", getpid());
rv = write(fd, buf, strlen(buf));
if (rv <= 0) {
log_error("lockfile write error %s: %s",
cl.lockfile, strerror(errno));
goto fail;
}
return fd;
fail:
close(fd);
return -1;
}
static void unlink_lockfile(int fd)
{
unlink(cl.lockfile);
close(fd);
}
static void print_usage(void)
{
printf("Usage:\n");
printf("booth <type> <operation> [options]\n");
printf("\n");
printf("Types:\n");
printf(" arbitrator: daemon running on arbitrator\n");
printf(" site: daemon running on cluster site\n");
printf(" client: command running from client\n");
printf("\n");
printf("Operations:\n");
printf("Please note that operations are valid iff type is client!\n");
printf(" list: List all the tickets\n");
printf(" grant: Grant ticket T(-t T) to site S(-s S)\n");
printf(" revoke: Revoke ticket T(-t T) from site S(-s S)\n");
printf("\n");
printf("Options:\n");
printf(" -c FILE Specify config file [default " BOOTH_DEFAULT_CONF "]\n");
printf(" -l LOCKFILE Specify lock file [default " BOOTH_DEFAULT_LOCKFILE "]\n");
printf(" -D Enable debugging to stderr and don't fork\n");
printf(" -t ticket name\n");
printf(" -s site name\n");
printf(" -f ticket attribute: force, only valid when "
"granting\n");
printf(" -h Print this help, then exit\n");
}
#define OPTION_STRING "c:Dl:t:s:fh"
static char *logging_entity = NULL;
void safe_copy(char *dest, char *value, size_t buflen, const char *description) {
if (strlen(value) >= buflen) {
fprintf(stderr, "'%s' exceeds maximum %s length of %ld\n",
value, description, buflen - 1);
exit(EXIT_FAILURE);
}
strncpy(dest, value, buflen - 1);
}
static int read_arguments(int argc, char **argv)
{
int optchar;
char *arg1 = argv[1];
char *op = NULL;
if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") ||
!strcmp(arg1, "-h")) {
print_usage();
exit(EXIT_SUCCESS);
}
if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") ||
!strcmp(arg1, "-V")) {
printf("%s %s (built %s %s)\n",
argv[0], RELEASE_VERSION, __DATE__, __TIME__);
exit(EXIT_SUCCESS);
}
if (!strcmp(arg1, "arbitrator")) {
cl.type = ACT_ARBITRATOR;
logging_entity = (char *) DAEMON_NAME "-arbitrator";
optind = 2;
} else if (!strcmp(arg1, "site")) {
cl.type = ACT_SITE;
logging_entity = (char *) DAEMON_NAME "-site";
optind = 2;
} else if (!strcmp(arg1, "client")) {
cl.type = ACT_CLIENT;
if (argc < 3) {
print_usage();
exit(EXIT_FAILURE);
}
op = argv[2];
optind = 3;
} else {
cl.type = ACT_CLIENT;
op = argv[1];
optind = 2;
}
switch (cl.type) {
case ACT_ARBITRATOR:
break;
case ACT_SITE:
break;
case ACT_CLIENT:
if (!strcmp(op, "list"))
cl.op = OP_LIST;
else if (!strcmp(op, "grant"))
cl.op = OP_GRANT;
else if (!strcmp(op, "revoke"))
cl.op = OP_REVOKE;
else {
fprintf(stderr, "client operation \"%s\" is unknown\n",
op);
exit(EXIT_FAILURE);
}
break;
}
while (optind < argc) {
optchar = getopt(argc, argv, OPTION_STRING);
switch (optchar) {
case 'c':
safe_copy(cl.configfile, optarg, sizeof(cl.configfile), "config file");
break;
case 'D':
debug_level = 1;
log_logfile_priority = LOG_DEBUG;
log_syslog_priority = LOG_DEBUG;
break;
case 'l':
safe_copy(cl.lockfile, optarg, sizeof(cl.lockfile), "lock file");
break;
case 't':
if (cl.op == OP_GRANT || cl.op == OP_REVOKE) {
safe_copy(cl.ticket, optarg, sizeof(cl.ticket), "ticket name");
} else {
print_usage();
exit(EXIT_FAILURE);
}
break;
case 's':
if (cl.op == OP_GRANT || cl.op == OP_REVOKE) {
safe_copy(cl.site, optarg, sizeof(cl.ticket), "site name");
} else {
print_usage();
exit(EXIT_FAILURE);
}
break;
case 'f':
if (cl.op == OP_GRANT)
cl.force = 1;
else {
print_usage();
exit(EXIT_FAILURE);
}
break;
case 'h':
print_usage();
exit(EXIT_SUCCESS);
break;
case ':':
case '?':
fprintf(stderr, "Please use '-h' for usage.\n");
exit(EXIT_FAILURE);
break;
default:
fprintf(stderr, "unknown option: %s\n", argv[optind]);
exit(EXIT_FAILURE);
break;
};
}
return 0;
}
static void set_scheduler(void)
{
struct sched_param sched_param;
struct rlimit rlimit;
int rv;
rlimit.rlim_cur = RLIM_INFINITY;
rlimit.rlim_max = RLIM_INFINITY;
setrlimit(RLIMIT_MEMLOCK, &rlimit);
rv = mlockall(MCL_CURRENT | MCL_FUTURE);
if (rv < 0) {
log_error("mlockall failed");
}
rv = sched_get_priority_max(SCHED_RR);
if (rv != -1) {
sched_param.sched_priority = rv;
rv = sched_setscheduler(0, SCHED_RR, &sched_param);
if (rv == -1)
log_error("could not set SCHED_RR priority %d err %d",
sched_param.sched_priority, errno);
} else {
log_error("could not get maximum scheduler priority err %d",
errno);
}
}
static void set_oom_adj(int val)
{
FILE *fp;
fp = fopen("/proc/self/oom_adj", "w");
if (!fp)
return;
fprintf(fp, "%i", val);
fclose(fp);
}
static int do_server(int type)
{
- int fd;
+ int fd = -1;
int rv = -1;
rv = setup(type);
if (rv < 0)
goto out;
if (!debug_level) {
if (daemon(0, 0) < 0) {
perror("daemon error");
exit(EXIT_FAILURE);
}
}
/*
The lock cannot be obtained before the call to daemon(), otherwise
the lockfile would contain the pid of the parent, not the daemon.
*/
fd = lockfile();
if (fd < 0)
return fd;
if (type == ARBITRATOR)
log_info("BOOTH arbitrator daemon started");
else if (type == SITE)
log_info("BOOTH cluster site daemon started");
set_scheduler();
set_oom_adj(-16);
rv = loop();
out:
unlink_lockfile(fd);
return rv;
}
static int do_client(void)
{
int rv = -1;
switch (cl.op) {
case OP_LIST:
rv = do_list();
break;
case OP_GRANT:
rv = do_grant();
break;
case OP_REVOKE:
rv = do_revoke();
break;
}
return rv;
}
int main(int argc, char *argv[])
{
int rv;
memset(&cl, 0, sizeof(cl));
strncpy(cl.configfile, BOOTH_DEFAULT_CONF, BOOTH_PATH_LEN - 1);
strncpy(cl.lockfile, BOOTH_DEFAULT_LOCKFILE, BOOTH_PATH_LEN - 1);
rv = read_arguments(argc, argv);
if (rv < 0)
goto out;
if (cl.type == ACT_CLIENT) {
cl_log_enable_stderr(TRUE);
cl_log_set_facility(0);
} else {
cl_log_set_entity(logging_entity);
cl_log_enable_stderr(debug_level ? TRUE : FALSE);
cl_log_set_facility(HA_LOG_FACILITY);
}
cl_inherit_logging_environment(0);
switch (cl.type) {
case ACT_ARBITRATOR:
rv = do_server(ARBITRATOR);
break;
case ACT_SITE:
rv = do_server(SITE);
break;
case ACT_CLIENT:
rv = do_client();
break;
}
out:
return rv ? EXIT_FAILURE : EXIT_SUCCESS;
}
diff --git a/src/paxos.c b/src/paxos.c
index c82a130..0c6512f 100644
--- a/src/paxos.c
+++ b/src/paxos.c
@@ -1,850 +1,850 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include "list.h"
#include "paxos.h"
#include "log.h"
typedef enum {
INIT = 1,
PREPARING,
PROMISING,
PROPOSING,
ACCEPTING,
RECOVERY,
COMMITTED,
} paxos_state_t;
struct proposal {
int ballot_number;
char value[0];
};
struct learned {
int ballot;
int number;
};
struct paxos_msghdr {
paxos_state_t state;
int from;
char psname[PAXOS_NAME_LEN+1];
char piname[PAXOS_NAME_LEN+1];
int ballot_number;
int reject;
int proposer_id;
unsigned int extralen;
unsigned int valuelen;
};
struct proposer {
int state;
int ballot;
int open_number;
int accepted_number;
int proposed;
struct proposal *proposal;
};
struct acceptor {
int state;
int highest_promised;
struct proposal *accepted_proposal;
};
struct learner {
int state;
int learned_max;
int learned_ballot;
struct learned learned[0];
};
struct paxos_space;
struct paxos_instance;
struct proposer_operations {
void (*prepare) (struct paxos_instance *,
int *);
void (*propose) (struct paxos_space *,
struct paxos_instance *,
void *, int);
void (*commit) (struct paxos_space *,
struct paxos_instance *,
void *, int);
};
struct acceptor_operations {
void (*promise) (struct paxos_space *,
struct paxos_instance *,
void *, int);
void (*accepted) (struct paxos_space *,
struct paxos_instance *,
void *, int);
};
struct learner_operations {
void (*response) (struct paxos_space *,
struct paxos_instance *,
void *, int);
};
struct paxos_space {
char name[PAXOS_NAME_LEN+1];
unsigned int number;
unsigned int extralen;
unsigned int valuelen;
const unsigned char *role;
const struct paxos_operations *p_op;
const struct proposer_operations *r_op;
const struct acceptor_operations *a_op;
const struct learner_operations *l_op;
struct list_head list;
struct list_head pi_head;
};
struct paxos_instance {
char name[PAXOS_NAME_LEN+1];
int round;
int *prio;
struct proposer *proposer;
struct acceptor *acceptor;
struct learner *learner;
void (*end) (pi_handle_t pih, int round, int result);
struct list_head list;
struct paxos_space *ps;
};
static LIST_HEAD(ps_head);
static int have_quorum(struct paxos_space *ps, int member)
{
int i, sum = 0;
for (i = 0; i < ps->number; i++) {
if (ps->role[i] & ACCEPTOR)
sum++;
}
if (member * 2 > sum)
return 1;
else
return 0;
}
static int next_ballot_number(struct paxos_instance *pi)
{
int ballot;
int myid = pi->ps->p_op->get_myid();
if (pi->prio)
ballot = pi->prio[myid];
else
ballot = myid;
while (ballot <= pi->round)
ballot += pi->ps->number;
return ballot;
}
static void proposer_prepare(struct paxos_instance *pi, int *round)
{
struct paxos_msghdr *hdr;
void *msg;
int msglen = sizeof(struct paxos_msghdr) + pi->ps->extralen;
int ballot;
log_debug("preposer prepare ...");
msg = malloc(msglen);
if (!msg) {
log_error("no mem for msg");
*round = -ENOMEM;
return;
}
memset(msg, 0, msglen);
hdr = msg;
if (*round > pi->round)
pi->round = *round;
ballot = next_ballot_number(pi);
pi->proposer->ballot = ballot;
hdr->state = htonl(PREPARING);
hdr->from = htonl(pi->ps->p_op->get_myid());
hdr->proposer_id = hdr->from;
strcpy(hdr->psname, pi->ps->name);
strcpy(hdr->piname, pi->name);
hdr->ballot_number = htonl(ballot);
hdr->extralen = htonl(pi->ps->extralen);
if (pi->ps->p_op->broadcast)
pi->ps->p_op->broadcast(msg, msglen);
else {
int i;
for (i = 0; i < pi->ps->number; i++) {
if (pi->ps->role[i] & ACCEPTOR)
pi->ps->p_op->send(i, msg, msglen);
}
}
free(msg);
*round = ballot;
}
static void proposer_propose(struct paxos_space *ps,
struct paxos_instance *pi,
void *msg, int msglen)
{
struct paxos_msghdr *hdr;
pi_handle_t pih = (pi_handle_t)pi;
void *extra, *value, *message;
int ballot;
log_debug("proposer propose ...");
if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) {
log_error("message length incorrect, "
"msglen: %d, msghdr len: %lu, extralen: %u",
msglen, sizeof(struct paxos_msghdr), ps->extralen);
return;
}
hdr = msg;
ballot = ntohl(hdr->ballot_number);
if (pi->proposer->ballot != ballot) {
log_debug("not the same ballot, proposer ballot: %d, "
"received ballot: %d", pi->proposer->ballot, ballot);
return;
}
if (ntohl(hdr->reject)) {
log_debug("proposal was rejected");
pi->round = ballot;
pi->proposer->state = INIT;
pi->end(pih, pi->round, -EAGAIN);
return;
}
extra = (char *)msg + sizeof(struct paxos_msghdr);
if (ps->p_op->prepare) {
if (ps->p_op->prepare(pih, extra))
pi->proposer->open_number++;
} else
pi->proposer->open_number++;
if (!have_quorum(ps, pi->proposer->open_number))
return;
if (pi->proposer->proposed)
return;
pi->proposer->proposed = 1;
value = pi->proposer->proposal->value;
if (ps->p_op->propose)
ps->p_op->propose(pih, extra, ballot, value);
hdr->valuelen = htonl(ps->valuelen);
message = malloc(msglen + ps->valuelen);
if (!message) {
log_error("no mem for value");
return;
}
memset(message, 0, msglen + ps->valuelen);
memcpy(message, msg, msglen);
memcpy((char *)message + msglen, value, ps->valuelen);
pi->proposer->state = PROPOSING;
hdr = message;
hdr->from = htonl(ps->p_op->get_myid());
hdr->state = htonl(PROPOSING);
if (ps->p_op->broadcast)
ps->p_op->broadcast(message, msglen + ps->valuelen);
else {
int i;
for (i = 0; i < ps->number; i++) {
if (ps->role[i] & ACCEPTOR)
ps->p_op->send(i, message,
msglen + ps->valuelen);
}
}
}
static void proposer_commit(struct paxos_space *ps,
struct paxos_instance *pi,
void *msg, int msglen)
{
struct paxos_msghdr *hdr;
pi_handle_t pih = (pi_handle_t)pi;
void *extra;
int ballot;
log_debug("proposer commit ...");
if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) {
log_error("message length incorrect, "
"msglen: %d, msghdr len: %lu, extralen: %u",
msglen, sizeof(struct paxos_msghdr), ps->extralen);
return;
}
extra = (char *)msg + sizeof(struct paxos_msghdr);
hdr = msg;
ballot = ntohl(hdr->ballot_number);
if (pi->proposer->ballot != ballot) {
log_debug("not the same ballot, proposer ballot: %d, "
"received ballot: %d", pi->proposer->ballot, ballot);
return;
}
pi->proposer->accepted_number++;
if (!have_quorum(ps, pi->proposer->accepted_number))
return;
pi->round = ballot;
if (ps->p_op->commit)
ps->p_op->commit(pih, extra, pi->round);
pi->proposer->state = COMMITTED;
pi->end(pih, pi->round, 0);
}
static void acceptor_promise(struct paxos_space *ps,
struct paxos_instance *pi,
void *msg, int msglen)
{
struct paxos_msghdr *hdr;
unsigned long to;
pi_handle_t pih = (pi_handle_t)pi;
void *extra;
log_debug("acceptor promise ...");
if (pi->acceptor->state == RECOVERY) {
log_debug("still in recovery");
return;
}
if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) {
log_error("message length incorrect, "
"msglen: %d, msghdr len: %lu, extralen: %u",
msglen, sizeof(struct paxos_msghdr), ps->extralen);
return;
}
hdr = msg;
extra = (char *)msg + sizeof(struct paxos_msghdr);
if (ntohl(hdr->ballot_number) < pi->acceptor->highest_promised) {
log_debug("ballot number: %d, highest promised: %d",
ntohl(hdr->ballot_number),
pi->acceptor->highest_promised);
to = ntohl(hdr->from);
hdr->from = htonl(ps->p_op->get_myid());
hdr->state = htonl(PROMISING);
hdr->reject = htonl(1);
memset(extra, 0, ps->extralen);
ps->p_op->send(to, msg, msglen);
return;
}
pi->acceptor->highest_promised = ntohl(hdr->ballot_number);
if (ps->p_op->promise)
ps->p_op->promise(pih, extra);
pi->acceptor->state = PROMISING;
to = ntohl(hdr->from);
hdr->from = htonl(ps->p_op->get_myid());
hdr->state = htonl(PROMISING);
ps->p_op->send(to, msg, msglen);
}
static void acceptor_accepted(struct paxos_space *ps,
struct paxos_instance *pi,
void *msg, int msglen)
{
struct paxos_msghdr *hdr;
unsigned long to;
pi_handle_t pih = (pi_handle_t)pi;
void *extra, *value;
int myid = ps->p_op->get_myid();
int ballot;
log_debug("acceptor accepted ...");
if (pi->acceptor->state == RECOVERY) {
log_debug("still in recovery");
return;
}
if (msglen != sizeof(struct paxos_msghdr) + ps->extralen + ps->valuelen) {
log_error("message length incorrect, msglen: "
"%d, msghdr len: %lu, extralen: %u, valuelen: %u",
msglen, sizeof(struct paxos_msghdr), ps->extralen,
ps->valuelen);
return;
}
hdr = msg;
extra = (char *)msg + sizeof(struct paxos_msghdr);
ballot = ntohl(hdr->ballot_number);
if (ballot < pi->acceptor->highest_promised) {
log_debug("ballot: %d, highest promised: %d",
ballot, pi->acceptor->highest_promised);
to = ntohl(hdr->from);
hdr->from = htonl(myid);
hdr->state = htonl(ACCEPTING);
hdr->reject = htonl(1);
ps->p_op->send(to, hdr, sizeof(struct paxos_msghdr));
return;
}
value = pi->acceptor->accepted_proposal->value;
memcpy(value, (char *)msg + sizeof(struct paxos_msghdr) + ps->extralen,
ps->valuelen);
if (ps->p_op->accepted)
ps->p_op->accepted(pih, extra, ballot, value);
pi->acceptor->state = ACCEPTING;
to = ntohl(hdr->from);
hdr->from = htonl(myid);
hdr->state = htonl(ACCEPTING);
if (ps->p_op->broadcast)
ps->p_op->broadcast(msg, sizeof(struct paxos_msghdr)
+ ps->extralen);
else {
int i;
for (i = 0; i < ps->number; i++) {
if (ps->role[i] & LEARNER)
ps->p_op->send(i, msg,
sizeof(struct paxos_msghdr)
+ ps->extralen);
}
if (!(ps->role[to] & LEARNER))
ps->p_op->send(to, msg, sizeof(struct paxos_msghdr)
+ ps->extralen);
}
}
static void learner_response(struct paxos_space *ps,
struct paxos_instance *pi,
void *msg, int msglen)
{
struct paxos_msghdr *hdr;
pi_handle_t pih = (pi_handle_t)pi;
void *extra;
- int i, unused, found = 0;
+ int i, unused = 0, found = 0;
int ballot;
log_debug("learner response ...");
if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) {
log_error("message length incorrect, "
"msglen: %d, msghdr len: %lu, extralen: %u",
msglen, sizeof(struct paxos_msghdr), ps->extralen);
return;
}
hdr = msg;
extra = (char *)msg + sizeof(struct paxos_msghdr);
ballot = ntohl(hdr->ballot_number);
for (i = 0; i < ps->number; i++) {
if (!pi->learner->learned[i].ballot) {
unused = i;
break;
}
if (pi->learner->learned[i].ballot == ballot) {
pi->learner->learned[i].number++;
if (pi->learner->learned[i].number
> pi->learner->learned_max)
pi->learner->learned_max
= pi->learner->learned[i].number;
found = 1;
break;
}
}
if (!found) {
pi->learner->learned[unused].ballot = ntohl(hdr->ballot_number);
pi->learner->learned[unused].number = 1;
}
if (!have_quorum(ps, pi->learner->learned_max))
return;
if (ps->p_op->learned)
ps->p_op->learned(pih, extra, ballot);
}
const struct proposer_operations generic_proposer_operations = {
.prepare = proposer_prepare,
.propose = proposer_propose,
.commit = proposer_commit,
};
const struct acceptor_operations generic_acceptor_operations = {
.promise = acceptor_promise,
.accepted = acceptor_accepted,
};
const struct learner_operations generic_learner_operations = {
.response = learner_response,
};
ps_handle_t paxos_space_init(const void *name,
unsigned int number,
unsigned int extralen,
unsigned int valuelen,
const unsigned char *role,
const struct paxos_operations *p_op)
{
struct paxos_space *ps;
list_for_each_entry(ps, &ps_head, list) {
if (!strcmp(ps->name, name)) {
log_info("paxos space (%s) has already been "
"initialized", (char *)name);
return -EEXIST;
}
}
if (!number || !valuelen || !p_op || !p_op->get_myid || !p_op->send) {
log_error("invalid agruments");
return -EINVAL;
}
ps = malloc(sizeof(struct paxos_space));
if (!ps) {
log_error("no mem for paxos space");
return -ENOMEM;
}
memset(ps, 0, sizeof(struct paxos_space));
strncpy(ps->name, name, PAXOS_NAME_LEN + 1);
ps->number = number;
ps->extralen = extralen;
ps->valuelen = valuelen;
ps->role = role;
ps->p_op = p_op;
ps->r_op = &generic_proposer_operations;
ps->a_op = &generic_acceptor_operations;
ps->l_op = &generic_learner_operations;
list_add_tail(&ps->list, &ps_head);
INIT_LIST_HEAD(&ps->pi_head);
return (ps_handle_t)ps;
}
pi_handle_t paxos_instance_init(ps_handle_t handle, const void *name, int *prio)
{
struct paxos_space *ps = (struct paxos_space *)handle;
struct paxos_instance *pi;
- struct proposer *proposer;
- struct acceptor *acceptor;
- struct learner *learner;
+ struct proposer *proposer = NULL;
+ struct acceptor *acceptor = NULL;
+ struct learner *learner = NULL;
int myid, valuelen, rv;
list_for_each_entry(pi, &ps->pi_head, list) {
if (!strcmp(pi->name, name))
return (pi_handle_t)pi;
}
if (handle <= 0 || !ps->p_op || !ps->p_op->get_myid) {
log_error("invalid agruments");
rv = -EINVAL;
goto out;
}
myid = ps->p_op->get_myid();
valuelen = ps->valuelen;
pi = malloc(sizeof(struct paxos_instance));
if (!pi) {
log_error("no mem for paxos instance");
rv = -ENOMEM;
goto out;
}
memset(pi, 0, sizeof(struct paxos_instance));
strncpy(pi->name, name, PAXOS_NAME_LEN + 1);
if (prio) {
pi->prio = malloc(ps->number * sizeof(int));
if (!pi->prio) {
log_error("no mem for prio");
rv = -ENOMEM;
goto out_pi;
}
memcpy(pi->prio, prio, ps->number * sizeof(int));
}
if (ps->role[myid] & PROPOSER) {
proposer = malloc(sizeof(struct proposer));
if (!proposer) {
log_error("no mem for proposer");
rv = -ENOMEM;
goto out_prio;
}
memset(proposer, 0, sizeof(struct proposer));
proposer->state = INIT;
proposer->proposal = malloc(sizeof(struct proposal) + valuelen);
if (!proposer->proposal) {
log_error("no mem for proposal");
rv = -ENOMEM;
goto out_proposer;
}
memset(proposer->proposal, 0,
sizeof(struct proposal) + valuelen);
pi->proposer = proposer;
}
if (ps->role[myid] & ACCEPTOR) {
acceptor = malloc(sizeof(struct acceptor));
if (!acceptor) {
log_error("no mem for acceptor");
rv = -ENOMEM;
goto out_proposal;
}
memset(acceptor, 0, sizeof(struct acceptor));
acceptor->state = INIT;
acceptor->accepted_proposal = malloc(sizeof(struct proposal)
+ valuelen);
if (!acceptor->accepted_proposal) {
log_error("no mem for accepted proposal");
rv = -ENOMEM;
goto out_acceptor;
}
memset(acceptor->accepted_proposal, 0,
sizeof(struct proposal) + valuelen);
pi->acceptor = acceptor;
if (ps->p_op->catchup) {
pi->acceptor->state = RECOVERY;
ps->p_op->catchup(name);
pi->acceptor->state = INIT;
}
}
if (ps->role[myid] & LEARNER) {
learner = malloc(sizeof(struct learner)
+ ps->number * sizeof(struct learned));
if (!learner) {
log_error("no mem for learner");
rv = -ENOMEM;
goto out_accepted_proposal;
}
memset(learner, 0,
sizeof(struct learner)
+ ps->number * sizeof(struct learned));
learner->state = INIT;
pi->learner = learner;
}
pi->ps = ps;
list_add_tail(&pi->list, &ps->pi_head);
return (pi_handle_t)pi;
out_accepted_proposal:
if (ps->role[myid] & ACCEPTOR)
free(acceptor->accepted_proposal);
out_acceptor:
if (ps->role[myid] & ACCEPTOR)
free(acceptor);
out_proposal:
if (ps->role[myid] & PROPOSER)
free(proposer->proposal);
out_proposer:
if (ps->role[myid] & PROPOSER)
free(proposer);
out_prio:
if (pi->prio)
free(pi->prio);
out_pi:
free(pi);
out:
return rv;
}
int paxos_round_request(pi_handle_t handle,
void *value,
int *round,
void (*end_request) (pi_handle_t handle,
int round,
int result))
{
struct paxos_instance *pi = (struct paxos_instance *)handle;
int myid = pi->ps->p_op->get_myid();
int rv = *round;
if (!(pi->ps->role[myid] & PROPOSER)) {
log_debug("only proposer can do this");
return -EOPNOTSUPP;
}
pi->proposer->state = PREPARING;
pi->proposer->open_number = 0;
pi->proposer->accepted_number = 0;
pi->proposer->proposed = 0;
memcpy(pi->proposer->proposal->value, value, pi->ps->valuelen);
pi->end = end_request;
pi->ps->r_op->prepare(pi, &rv);
return rv;
}
int paxos_recovery_status_get(pi_handle_t handle)
{
struct paxos_instance *pi = (struct paxos_instance *)handle;
int myid = pi->ps->p_op->get_myid();
if (!(pi->ps->role[myid] & ACCEPTOR))
return -EOPNOTSUPP;
if (pi->acceptor->state == RECOVERY)
return 1;
else
return 0;
}
int paxos_recovery_status_set(pi_handle_t handle, int recovery)
{
struct paxos_instance *pi = (struct paxos_instance *)handle;
int myid = pi->ps->p_op->get_myid();
if (!(pi->ps->role[myid] & ACCEPTOR))
return -EOPNOTSUPP;
if (recovery)
pi->acceptor->state = RECOVERY;
else
pi->acceptor->state = INIT;
return 0;
}
int paxos_propose(pi_handle_t handle, void *value, int round)
{
struct paxos_instance *pi = (struct paxos_instance *)handle;
struct paxos_msghdr *hdr;
void *extra, *msg;
int len = sizeof(struct paxos_msghdr)
+ pi->ps->extralen + pi->ps->valuelen;
if (round != pi->proposer->ballot) {
log_debug("round: %d, proposer ballot: %d",
round, pi->proposer->ballot);
return -EINVAL;
}
strcpy(pi->proposer->proposal->value, value);
pi->round = round;
msg = malloc(len);
if (!msg) {
log_error("no mem for msg");
return -ENOMEM;
}
memset(msg, 0, len);
hdr = msg;
hdr->state = htonl(PROPOSING);
hdr->from = htonl(pi->ps->p_op->get_myid());
hdr->proposer_id = hdr->from;
strcpy(hdr->psname, pi->ps->name);
strcpy(hdr->piname, pi->name);
hdr->ballot_number = htonl(pi->round);
hdr->extralen = htonl(pi->ps->extralen);
extra = (char *)msg + sizeof(struct paxos_msghdr);
memcpy((char *)msg + sizeof(struct paxos_msghdr) + pi->ps->extralen,
value, pi->ps->valuelen);
if (pi->ps->p_op->propose)
pi->ps->p_op->propose(handle, extra, round, value);
if (pi->ps->p_op->broadcast)
pi->ps->p_op->broadcast(msg, len);
else {
int i;
for (i = 0; i < pi->ps->number; i++) {
if (pi->ps->role[i] & ACCEPTOR)
pi->ps->p_op->send(i, msg, len);
}
}
return 0;
}
int paxos_recvmsg(void *msg, int msglen)
{
struct paxos_msghdr *hdr = msg;
struct paxos_space *ps;
struct paxos_instance *pi;
int found = 0;
int myid;
list_for_each_entry(ps, &ps_head, list) {
if (!strcmp(ps->name, hdr->psname)) {
found = 1;
break;
}
}
if (!found) {
log_error("could not find the received ps name (%s) "
"in registered list", hdr->psname);
return -EINVAL;
}
myid = ps->p_op->get_myid();
found = 0;
list_for_each_entry(pi, &ps->pi_head, list) {
if (!strcmp(pi->name, hdr->piname)) {
found = 1;
break;
}
}
if (!found)
paxos_instance_init((ps_handle_t)ps, hdr->piname, NULL);
switch (ntohl(hdr->state)) {
case PREPARING:
if (ps->role[myid] & ACCEPTOR)
ps->a_op->promise(ps, pi, msg, msglen);
break;
case PROMISING:
ps->r_op->propose(ps, pi, msg, msglen);
break;
case PROPOSING:
if (ps->role[myid] & ACCEPTOR)
ps->a_op->accepted(ps, pi, msg, msglen);
break;
case ACCEPTING:
if (ntohl(hdr->proposer_id) == myid)
ps->r_op->commit(ps, pi, msg, msglen);
else if (ps->role[myid] & LEARNER)
ps->l_op->response(ps, pi, msg, msglen);
break;
default:
log_debug("invalid message type: %d", ntohl(hdr->state));
break;
};
return 0;
}
diff --git a/src/paxos.h b/src/paxos.h
index ea8c54e..b91d9a9 100644
--- a/src/paxos.h
+++ b/src/paxos.h
@@ -1,80 +1,80 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _PAXOS_H
#define _PAXOS_H
#define PAXOS_NAME_LEN 63
#define PROPOSER 0x4
#define ACCEPTOR 0x2
#define LEARNER 0x1
-typedef int ps_handle_t;
-typedef int pi_handle_t;
+typedef long ps_handle_t;
+typedef long pi_handle_t;
struct paxos_operations {
int (*get_myid) (void);
int (*send) (unsigned long id, void *value, int len);
int (*broadcast) (void *value, int len);
int (*catchup) (const void *name);
int (*prepare) (pi_handle_t handle, void *extra);
int (*promise) (pi_handle_t handle, void *extra);
int (*propose) (pi_handle_t handle, void *extra,
int round, void *value);
int (*accepted) (pi_handle_t handle, void *extra,
int round, void *value);
int (*commit) (pi_handle_t handle, void *extra, int round);
int (*learned) (pi_handle_t handle, void *extra, int round);
};
int paxos_recvmsg(void *msg, int msglen);
ps_handle_t paxos_space_init(const void *name,
unsigned int number,
unsigned int extralen,
unsigned int valuelen,
const unsigned char *role,
const struct paxos_operations *p_op);
pi_handle_t paxos_instance_init(ps_handle_t handle,
const void *name,
int *prio);
int paxos_round_request(pi_handle_t handle,
void *value,
int *round,
void (*end_request) (pi_handle_t handle,
int round,
int result));
int paxos_round_discard(pi_handle_t handle, int round);
int paxos_leader_get(pi_handle_t handle, int *round);
int paxos_recovery_status_get(pi_handle_t handle);
int paxos_recovery_status_set(pi_handle_t handle, int recovery);
int paxos_propose(pi_handle_t handle, void *value, int round);
int paxos_instance_exit(pi_handle_t handle);
int paxos_space_exit(ps_handle_t handle);
#endif /* _PAXOS_H */
diff --git a/src/paxos_lease.c b/src/paxos_lease.c
index 9cd36f5..9b16d8c 100644
--- a/src/paxos_lease.c
+++ b/src/paxos_lease.c
@@ -1,572 +1,572 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include "paxos.h"
#include "paxos_lease.h"
#include "transport.h"
#include "config.h"
#include "timer.h"
#include "list.h"
#include "log.h"
#define PAXOS_LEASE_SPACE "paxoslease"
#define PLEASE_VALUE_LEN 1024
struct paxos_lease_msghdr {
int leased;
};
struct paxos_lease_value {
char name[PAXOS_NAME_LEN+1];
int owner;
int expiry;
};
struct lease_state {
int round;
struct paxos_lease_value *plv;
unsigned long long expires;
struct timerlist *timer;
};
struct paxos_lease {
char name[PAXOS_NAME_LEN+1];
pi_handle_t pih;
struct lease_state proposer;
struct lease_state acceptor;
int owner;
int expiry;
int relet;
int failover;
int release;
unsigned long long expires;
void (*end_lease) (pi_handle_t, int);
struct timerlist *timer;
struct list_head list;
};
static LIST_HEAD(lease_head);
static int myid = -1;
static struct paxos_operations *px_op = NULL;
const struct paxos_lease_operations *p_l_op;
ps_handle_t ps_handle = 0;
static void end_paxos_request(pi_handle_t handle, int round, int result)
{
struct paxos_lease *pl;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found) {
- log_error("cound not found the handle for paxos lease: %d",
+ log_error("cound not found the handle for paxos lease: %ld",
handle);
return;
}
if (round != pl->proposer.round) {
log_error("current paxos round is not the proposer round, "
"current round: %d, proposer round: %d",
round, pl->proposer.round);
return;
}
if (pl->end_lease)
pl->end_lease((pl_handle_t)pl, result);
return;
}
static void lease_expires(unsigned long data)
{
struct paxos_lease *pl = (struct paxos_lease *)data;
pl_handle_t plh = (pl_handle_t)pl;
struct paxos_lease_result plr;
if (pl->owner != myid) {
log_debug("lease owner is not myself");
pl->owner = -1;
strcpy(plr.name, pl->name);
plr.owner = -1;
plr.expires = 0;
p_l_op->notify(plh, &plr);
if (pl->proposer.timer)
del_timer(pl->proposer.timer);
if (pl->acceptor.timer)
del_timer(pl->acceptor.timer);
if (pl->failover)
paxos_lease_acquire(plh, 1, NULL);
} else if (pl->owner == myid && pl->relet && !pl->release) {
struct paxos_lease_value value;
log_debug("lease owner is myself, need renew");
strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1);
value.owner = myid;
value.expiry = pl->expiry;
paxos_propose(pl->pih, &value, pl->proposer.round);
} else {
log_debug("lease owner is myself");
pl->owner = -1;
strcpy(plr.name, pl->name);
plr.owner = -1;
plr.expires = 0;
p_l_op->notify(plh, &plr);
if (pl->proposer.timer)
del_timer(pl->proposer.timer);
if (pl->acceptor.timer)
del_timer(pl->acceptor.timer);
}
}
static void lease_retry(unsigned long data)
{
struct paxos_lease *pl = (struct paxos_lease *)data;
struct paxos_lease_value value;
int round;
if (pl->timer)
del_timer(pl->timer);
if (pl->owner == myid) {
log_debug("already got the lease, no need to retry");
return;
}
strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1);
value.owner = myid;
value.expiry = pl->expiry;
round = paxos_round_request(pl->pih, &value, &pl->acceptor.round,
end_paxos_request);
if (round > 0)
pl->proposer.round = round;
}
int paxos_lease_acquire(pl_handle_t handle,
int relet,
void (*end_acquire) (pl_handle_t handle, int result))
{
struct paxos_lease *pl = (struct paxos_lease *)handle;
struct paxos_lease_value value;
int round;
strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1);
value.owner = myid;
value.expiry = pl->expiry;
pl->relet = relet;
pl->end_lease = end_acquire;
pl->release = 0;
round = paxos_round_request(pl->pih, &value, &pl->acceptor.round,
end_paxos_request);
pl->timer = add_timer(1 * pl->expiry / 10, (unsigned long)pl,
lease_retry);
if (round <= 0)
return -1;
else {
pl->proposer.round = round;
return 0;
}
}
int paxos_lease_release(pl_handle_t handle)
{
struct paxos_lease *pl = (struct paxos_lease *)handle;
pl->release = 1;
return 0;
}
static int lease_catchup(const void *name)
{
struct paxos_lease *pl;
struct paxos_lease_result plr;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (!strcmp(pl->name, name)) {
found = 1;
break;
}
}
if (!found) {
log_error("could not found the lease name (%s) "
"in registered list", (char *)name);
return -1;
}
p_l_op->catchup(name, &pl->owner, &pl->expires);
log_debug("catchup result: name: %s, owner: %d, expires: %llu",
(char *)name, pl->owner, pl->expires);
if (pl->owner == -1)
return 0;
if (current_time() > pl->expires) {
plr.owner = pl->owner = -1;
plr.expires = pl->expires = 0;
strcpy(plr.name, pl->name);
p_l_op->notify((pl_handle_t)pl, &plr);
return 0;
}
if (pl->owner == myid) {
pl->acceptor.timer = add_timer(pl->expires - current_time(),
(unsigned long)pl,
lease_expires);
if (current_time() < pl->expires - 1 * pl->expiry / 5)
pl->proposer.timer = add_timer(pl->expires
- 1 * pl->expiry / 5
- current_time(),
(unsigned long)pl,
lease_expires);
} else
pl->acceptor.timer = add_timer(pl->expires - current_time(),
(unsigned long)pl,
lease_expires);
plr.owner = pl->owner;
plr.expires = pl->expires;
strcpy(plr.name, pl->name);
p_l_op->notify((pl_handle_t)pl, &plr);
return 0;
}
static int lease_prepared(pi_handle_t handle __attribute__((unused)),
void *header)
{
struct paxos_lease_msghdr *hdr = header;
if (hdr->leased) {
log_debug("already leased");
return 0;
} else {
log_debug("not leased");
return 1;
}
}
static int handle_lease_request(pi_handle_t handle, void *header)
{
struct paxos_lease_msghdr *hdr;
struct paxos_lease *pl;
int found = 0;
hdr = header;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found) {
- log_error("could not find the lease handle: %d", handle);
+ log_error("could not find the lease handle: %ld", handle);
return -1;
}
if (pl->owner == -1) {
log_debug("has not been leased");
hdr->leased = 0;
} else {
log_debug("has been leased");
hdr->leased = 1;
}
return 0;
}
static int lease_propose(pi_handle_t handle,
void *extra __attribute__((unused)),
int round, void *value)
{
struct paxos_lease *pl;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found) {
- log_error("could not find the lease handle: %d", handle);
+ log_error("could not find the lease handle: %ld", handle);
return -1;
}
if (round != pl->proposer.round) {
log_error("current round is not the proposer round, "
"current round: %d, proposer round: %d",
round, pl->proposer.round);
return -1;
}
if (!pl->proposer.plv) {
pl->proposer.plv = malloc(sizeof(struct paxos_lease_value));
if (!pl->proposer.plv) {
log_error("could not alloc mem for propsoer plv");
return -ENOMEM;
}
}
memcpy(pl->proposer.plv, value, sizeof(struct paxos_lease_value));
if (pl->proposer.timer)
del_timer(pl->proposer.timer);
if (pl->relet) {
pl->proposer.timer = add_timer(4 * pl->expiry / 5,
(unsigned long)pl,
lease_expires);
pl->proposer.expires = current_time() + 4 * pl->expiry / 5;
} else {
pl->proposer.timer = add_timer(pl->expiry, (unsigned long)pl,
lease_expires);
pl->proposer.expires = current_time() + pl->expiry;
}
return 0;
}
static int lease_accepted(pi_handle_t handle,
void *extra __attribute__((unused)),
int round, void *value)
{
struct paxos_lease *pl;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found) {
- log_error("could not find the lease handle: %d", handle);
+ log_error("could not find the lease handle: %ld", handle);
return -1;
}
pl->acceptor.round = round;
if (!pl->acceptor.plv) {
pl->acceptor.plv = malloc(sizeof(struct paxos_lease_value));
if (!pl->acceptor.plv) {
log_error("could not alloc mem for acceptor plv");
return -ENOMEM;
}
}
memcpy(pl->acceptor.plv, value, sizeof(struct paxos_lease_value));
if (pl->acceptor.timer)
del_timer(pl->acceptor.timer);
pl->acceptor.timer = add_timer(pl->expiry, (unsigned long)pl,
lease_expires);
pl->acceptor.expires = current_time() + pl->expiry;
return 0;
}
static int lease_commit(pi_handle_t handle,
void *extra __attribute__((unused)),
int round)
{
struct paxos_lease *pl;
struct paxos_lease_result plr;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found) {
- log_error("could not find the lease handle: %d", handle);
+ log_error("could not find the lease handle: %ld", handle);
return -1;
}
if (round != pl->proposer.round) {
log_error("current round is not the proposer round, "
"current round: %d, proposer round: %d",
round, pl->proposer.round);
return -1;
}
pl->owner = pl->proposer.plv->owner;
pl->expiry = pl->proposer.plv->expiry;
strcpy(plr.name, pl->proposer.plv->name);
plr.owner = pl->proposer.plv->owner;
plr.expires = current_time() + pl->proposer.plv->expiry;
p_l_op->notify((pl_handle_t)pl, &plr);
return 0;
}
static int lease_learned(pi_handle_t handle,
void *extra __attribute__((unused)),
int round)
{
struct paxos_lease *pl;
struct paxos_lease_result plr;
int found = 0;
list_for_each_entry(pl, &lease_head, list) {
if (pl->pih == handle) {
found = 1;
break;
}
}
if (!found) {
- log_error("could not find the lease handle: %d", handle);
+ log_error("could not find the lease handle: %ld", handle);
return -1;
}
if (round != pl->acceptor.round) {
log_error("current round is not the proposer round, "
"current round: %d, proposer round: %d",
round, pl->proposer.round);
return -1;
}
pl->owner = pl->acceptor.plv->owner;
pl->expiry = pl->acceptor.plv->expiry;
strcpy(plr.name, pl->acceptor.plv->name);
plr.owner = pl->acceptor.plv->owner;
plr.expires = current_time() + pl->acceptor.plv->expiry;
p_l_op->notify((pl_handle_t)pl, &plr);
return 0;
}
pl_handle_t paxos_lease_init(const void *name,
unsigned int namelen,
int expiry,
int number,
int failover,
unsigned char *role,
int *prio,
const struct paxos_lease_operations *pl_op)
{
ps_handle_t psh;
pi_handle_t pih;
struct paxos_lease *lease;
if (namelen > PAXOS_NAME_LEN) {
log_error("length of paxos name is too long (%u)", namelen);
return -EINVAL;
}
if (myid == -1)
myid = pl_op->get_myid();
if (!ps_handle) {
px_op = malloc(sizeof(struct paxos_operations));
if (!px_op) {
log_error("could not alloc for paxos operations");
return -ENOMEM;
}
memset(px_op, 0, sizeof(struct paxos_operations));
px_op->get_myid = pl_op->get_myid;
px_op->send = pl_op->send;
px_op->broadcast = pl_op->broadcast;
px_op->catchup = lease_catchup;
px_op->prepare = lease_prepared;
px_op->promise = handle_lease_request;
px_op->propose = lease_propose;
px_op->accepted = lease_accepted;
px_op->commit = lease_commit;
px_op->learned = lease_learned;
p_l_op = pl_op;
psh = paxos_space_init(PAXOS_LEASE_SPACE,
number,
sizeof(struct paxos_lease_msghdr),
PLEASE_VALUE_LEN,
role,
px_op);
if (psh <= 0) {
- log_error("failed to initialize paxos space: %d", psh);
+ log_error("failed to initialize paxos space: %ld", psh);
free(px_op);
px_op = NULL;
return psh;
}
ps_handle = psh;
}
lease = malloc(sizeof(struct paxos_lease));
if (!lease) {
log_error("cound not alloc for paxos lease");
return -ENOMEM;
}
memset(lease, 0, sizeof(struct paxos_lease));
strncpy(lease->name, name, PAXOS_NAME_LEN + 1);
lease->owner = -1;
lease->expiry = expiry;
lease->failover = failover;
list_add_tail(&lease->list, &lease_head);
pih = paxos_instance_init(ps_handle, name, prio);
if (pih <= 0) {
- log_error("failed to initialize paxos instance: %d", pih);
+ log_error("failed to initialize paxos instance: %ld", pih);
free(lease);
return pih;
}
lease->pih = pih;
return (pl_handle_t)lease;
}
int paxos_lease_on_receive(void *msg, int msglen)
{
return paxos_recvmsg(msg, msglen);
}
int paxos_lease_exit(pl_handle_t handle)
{
struct paxos_lease *pl = (struct paxos_lease *)handle;
if (px_op)
free(px_op);
if (pl->proposer.plv)
free(pl->proposer.plv);
if (pl->proposer.timer)
del_timer(pl->proposer.timer);
if (pl->acceptor.plv)
free(pl->acceptor.plv);
if (pl->acceptor.timer)
del_timer(pl->acceptor.timer);
return 0;
}
diff --git a/src/paxos_lease.h b/src/paxos_lease.h
index 498070c..8a03d97 100644
--- a/src/paxos_lease.h
+++ b/src/paxos_lease.h
@@ -1,66 +1,66 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _PAXOS_LEASE_H
#define _PAXOS_LEASE_H
#define PLEASE_NAME_LEN 63
-typedef int pl_handle_t;
+typedef long pl_handle_t;
struct paxos_lease_result {
char name[PLEASE_NAME_LEN+1];
int owner;
unsigned long long expires;
};
struct paxos_lease_operations {
int (*get_myid) (void);
int (*send) (unsigned long id, void *value, int len);
int (*broadcast) (void *value, int len);
int (*catchup) (const void *name, int *owner,
unsigned long long *expires);
int (*notify) (pl_handle_t handle, struct paxos_lease_result *result);
};
pl_handle_t paxos_lease_init(const void *name,
unsigned int namelen,
int expiry,
int number,
int failover,
unsigned char *role,
int *prio,
const struct paxos_lease_operations *pl_op);
int paxos_lease_on_receive(void *msg, int msglen);
int paxos_lease_acquire(pl_handle_t handle,
int relet,
void (*end_acquire) (pl_handle_t handle, int result));
/*
int paxos_lease_owner_get(const void *name);
int paxos_lease_epoch_get(const void *name);
int paxos_lease_timeout(const void *name);
*/
int paxos_lease_release(pl_handle_t handle);
int paxos_lease_exit(pl_handle_t handle);
#endif /* _PAXOS_LEASE_H */
diff --git a/src/ticket.c b/src/ticket.c
index 26e501b..fa09fed 100644
--- a/src/ticket.c
+++ b/src/ticket.c
@@ -1,397 +1,397 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <stdio.h>
#include "ticket.h"
#include "config.h"
#include "pacemaker.h"
#include "list.h"
#include "log.h"
#include "paxos_lease.h"
#include "paxos.h"
#define PAXOS_MAGIC 0xDB12
#define TK_LINE 256
struct booth_msghdr {
uint16_t magic;
uint16_t checksum;
uint32_t len;
} __attribute__((packed));
struct ticket {
char id[BOOTH_NAME_LEN+1];
pl_handle_t handle;
int owner;
int expiry;
unsigned long long expires;
struct list_head list;
};
static LIST_HEAD(ticket_list);
static unsigned char *role;
int check_ticket(char *ticket)
{
int i;
if (!booth_conf)
return 0;
for (i = 0; i < booth_conf->ticket_count; i++) {
if (!strcmp(booth_conf->ticket[i].name, ticket))
return 1;
}
return 0;
}
int check_site(char *site, int *local)
{
int i;
if (!booth_conf)
return 0;
for (i = 0; i < booth_conf->node_count; i++) {
if (booth_conf->node[i].type == SITE
&& !strcmp(booth_conf->node[i].addr, site)) {
*local = booth_conf->node[i].local;
return 1;
}
}
return 0;
}
static int * ticket_priority(int i)
{
int j;
/* TODO: need more precise check */
for (j = 0; j < booth_conf->node_count; j++) {
if (booth_conf->ticket[i].weight[j] == 0)
return NULL;
}
return booth_conf->ticket[i].weight;
}
static int ticket_get_myid(void)
{
return booth_transport[booth_conf->proto].get_myid();
}
static void end_acquire(pl_handle_t handle, int result)
{
struct ticket *tk;
int found = 0;
if (result == 0) {
list_for_each_entry(tk, &ticket_list, list) {
if (tk->handle == handle) {
tk->owner = ticket_get_myid();
found = 1;
break;
}
}
if (!found)
- log_error("BUG: ticket handle %d does not exist",
+ log_error("BUG: ticket handle %ld does not exist",
handle);
log_info("ticket %s acquired", tk->id);
log_info("ticket %s granted to local (id %d)", tk->id,
ticket_get_myid());
}
}
static int ticket_send(unsigned long id, void *value, int len)
{
int i, rv = -1;
struct booth_node *to = NULL;
struct booth_msghdr *hdr;
void *buf;
for (i = 0; i < booth_conf->node_count; i++) {
if (booth_conf->node[i].nodeid == id)
to = &booth_conf->node[i];
}
if (!to)
return rv;
buf = malloc(sizeof(struct booth_msghdr) + len);
if (!buf)
return -ENOMEM;
memset(buf, 0, sizeof(struct booth_msghdr) + len);
hdr = buf;
hdr->magic = htons(PAXOS_MAGIC);
hdr->len = htonl(sizeof(struct booth_msghdr) + len);
memcpy((char *)buf + sizeof(struct booth_msghdr), value, len);
rv = booth_transport[booth_conf->proto].send(
(unsigned long)to, buf, sizeof(struct booth_msghdr) + len);
free(buf);
return rv;
}
static int ticket_broadcast(void *value, int len)
{
void *buf;
struct booth_msghdr *hdr;
int rv;
buf = malloc(sizeof(struct booth_msghdr) + len);
if (!buf)
return -ENOMEM;
memset(buf, 0, sizeof(struct booth_msghdr) + len);
hdr = buf;
hdr->magic = htons(PAXOS_MAGIC);
hdr->len = htonl(sizeof(struct booth_msghdr) + len);
memcpy((char *)buf + sizeof(struct booth_msghdr), value, len);
rv = booth_transport[booth_conf->proto].broadcast(
buf, sizeof(struct booth_msghdr) + len);
free(buf);
return rv;
}
static int ticket_read(const void *name, int *owner,
unsigned long long *expires)
{
struct ticket *tk;
int found = 0;
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, name)) {
found = 1;
break;
}
}
if (!found) {
log_error("BUG: ticket_read failed (ticket %s does not exist)",
(char *)name);
return -1;
}
pcmk_handler.load_ticket(tk->id, &tk->owner, &tk->expires);
*owner = tk->owner;
*expires = tk->expires;
return 0;
}
static int ticket_write(pl_handle_t handle, struct paxos_lease_result *result)
{
struct ticket *tk;
int found = 0;
list_for_each_entry(tk, &ticket_list, list) {
if (tk->handle == handle) {
found = 1;
break;
}
}
if (!found) {
log_error("BUG: ticket_write failed "
- "(ticket handle %d does not exist)", handle);
+ "(ticket handle %ld does not exist)", handle);
return -1;
}
tk->owner = result->owner;
tk->expires = result->expires;
if (tk->owner == ticket_get_myid()) {
pcmk_handler.store_ticket(tk->id, tk->owner, tk->expires);
pcmk_handler.grant_ticket(tk->id);
} else if (tk->owner == -1) {
pcmk_handler.store_ticket(tk->id, tk->owner, tk->expires);
pcmk_handler.revoke_ticket(tk->id);
} else
pcmk_handler.store_ticket(tk->id, tk->owner, tk->expires);
return 0;
}
int ticket_recv(void *msg, int msglen)
{
struct booth_msghdr *hdr;
char *data;
hdr = msg;
if (ntohs(hdr->magic) != PAXOS_MAGIC ||
ntohl(hdr->len) != msglen) {
log_error("message received error");
return -1;
}
data = (char *)msg + sizeof(struct booth_msghdr);
return paxos_lease_on_receive(data,
msglen - sizeof(struct booth_msghdr));
}
int grant_ticket(char *ticket, int force)
{
struct ticket *tk;
int found = 0;
if (force) {
pcmk_handler.store_ticket(ticket, ticket_get_myid(), -1);
pcmk_handler.grant_ticket(ticket);
return BOOTHC_RLT_SYNC_SUCC;
}
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, ticket)) {
found = 1;
break;
}
}
if (!found) {
log_error("ticket %s does not exist", ticket);
return BOOTHC_RLT_SYNC_FAIL;
}
if (tk->owner == ticket_get_myid())
return BOOTHC_RLT_SYNC_SUCC;
else {
paxos_lease_acquire(tk->handle, 1, end_acquire);
return BOOTHC_RLT_ASYNC;
}
}
int revoke_ticket(char *ticket, int force)
{
struct ticket *tk;
int found = 0;
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, ticket)) {
found = 1;
break;
}
}
if (!found) {
log_error("ticket %s does not exist", ticket);
return BOOTHC_RLT_SYNC_FAIL;
}
if (force) {
pcmk_handler.store_ticket(tk->id, -1, 0);
pcmk_handler.revoke_ticket(tk->id);
}
if (tk->owner == -1)
return BOOTHC_RLT_SYNC_SUCC;
else {
paxos_lease_release(tk->handle);
return BOOTHC_RLT_ASYNC;
}
}
int list_ticket(char **pdata, unsigned int *len)
{
struct ticket *tk;
char tmp[TK_LINE];
*pdata = NULL;
*len = 0;
list_for_each_entry(tk, &ticket_list, list) {
memset(tmp, 0, TK_LINE);
snprintf(tmp, TK_LINE, "ticket: %s, owner: %d, expires: %llu\n",
tk->id, tk->owner, tk->expires);
*pdata = realloc(*pdata, *len + TK_LINE);
if (*pdata == NULL)
return -ENOMEM;
memset(*pdata + *len, 0, TK_LINE);
memcpy(*pdata + *len, tmp, TK_LINE);
*len += TK_LINE;
}
return 0;
}
const struct paxos_lease_operations ticket_operations = {
.get_myid = ticket_get_myid,
.send = ticket_send,
.broadcast = ticket_broadcast,
.catchup = ticket_read,
.notify = ticket_write,
};
int setup_ticket(void)
{
struct ticket *tk, *tmp;
int i, rv;
pl_handle_t plh;
role = malloc(booth_conf->node_count * sizeof(unsigned char));
if (!role)
return -ENOMEM;
memset(role, 0, booth_conf->node_count * sizeof(unsigned char));
for (i = 0; i < booth_conf->node_count; i++) {
if (booth_conf->node[i].type == SITE)
role[i] = PROPOSER | ACCEPTOR | LEARNER;
else if (booth_conf->node[i].type == ARBITRATOR)
role[i] = ACCEPTOR | LEARNER;
}
for (i = 0; i < booth_conf->ticket_count; i++) {
tk = malloc(sizeof(struct ticket));
if (!tk) {
rv = -ENOMEM;
goto out;
}
memset(tk, 0, sizeof(struct ticket));
strcpy(tk->id, booth_conf->ticket[i].name);
tk->owner = -1;
tk->expiry = booth_conf->ticket[i].expiry;
if (!tk->expiry)
tk->expiry = DEFAULT_TICKET_EXPIRY;
list_add_tail(&tk->list, &ticket_list);
plh = paxos_lease_init(tk->id,
BOOTH_NAME_LEN,
tk->expiry,
booth_conf->node_count,
1,
role,
ticket_priority(i),
&ticket_operations);
if (plh <= 0) {
log_error("paxos lease initialization failed");
rv = plh;
goto out;
}
tk->handle = plh;
}
return 0;
out:
list_for_each_entry_safe(tk, tmp, &ticket_list, list) {
list_del(&tk->list);
}
free(role);
return rv;
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Jul 10, 1:55 AM (1 d, 9 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2009607
Default Alt Text
(72 KB)
Attached To
Mode
rB Booth
Attached
Detach File
Event Timeline
Log In to Comment