Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F3152738
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
43 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/src/booth.h b/src/booth.h
index 0a5ca98..9190a6a 100644
--- a/src/booth.h
+++ b/src/booth.h
@@ -1,85 +1,82 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _BOOTH_H
#define _BOOTH_H
#include <stdint.h>
#include <sys/types.h>
#include <sys/socket.h>
#define BOOTH_LOG_DUMP_SIZE (1024*1024)
#define BOOTH_RUN_DIR "/var/run"
#define BOOTH_LOG_DIR "/var/log"
#define BOOTH_LOGFILE_NAME "booth.log"
#define BOOTH_DEFAULT_LOCKFILE BOOTH_RUN_DIR "/booth.pid"
#define BOOTH_DEFAULT_CONF "/etc/booth/booth.conf"
#define DAEMON_NAME "booth"
#define BOOTH_NAME_LEN 63
#define BOOTH_PATH_LEN 127
#define BOOTHC_SOCK_PATH "boothc_lock"
#define BOOTH_PROTO_FAMILY AF_INET
#define BOOTH_CMD_PORT 22075
#define BOOTHC_MAGIC 0x5F1BA08C
#define BOOTHC_VERSION 0x00010000
-#define BOOTHC_OPT_FORCE 0x00000001
-
struct boothc_header {
uint32_t magic;
uint32_t version;
uint32_t cmd;
- uint32_t option;
uint32_t expiry;
uint32_t len;
uint32_t result;
};
typedef enum {
BOOTHC_CMD_LIST = 1,
BOOTHC_CMD_GRANT,
BOOTHC_CMD_REVOKE,
BOOTHC_CMD_CATCHUP,
} cmd_request_t;
typedef enum {
BOOTHC_RLT_ASYNC = 1,
BOOTHC_RLT_SYNC_SUCC,
BOOTHC_RLT_SYNC_FAIL,
BOOTHC_RLT_INVALID_ARG,
BOOTHC_RLT_REMOTE_OP,
BOOTHC_RLT_OVERGRANT,
} cmd_result_t;
struct client {
int fd;
void *workfn;
void *deadfn;
};
int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci));
int do_read(int fd, void *buf, size_t count);
int do_write(int fd, void *buf, size_t count);
void process_connection(int ci);
void safe_copy(char *dest, char *value, size_t buflen, const char *description);
#endif /* _BOOTH_H */
diff --git a/src/main.c b/src/main.c
index 4b48c83..4a8ac29 100644
--- a/src/main.c
+++ b/src/main.c
@@ -1,1143 +1,1126 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sched.h>
#include <errno.h>
#include <limits.h>
#include <sys/file.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/poll.h>
#include <fcntl.h>
#include <string.h>
#include <assert.h>
#include <error.h>
#include <sys/ioctl.h>
#include <termios.h>
#include <signal.h>
#include "log.h"
#include "booth.h"
#include "config.h"
#include "transport.h"
#include "timer.h"
#include "pacemaker.h"
#include "ticket.h"
#define RELEASE_VERSION "1.0"
#define CLIENT_NALLOC 32
int log_logfile_priority = LOG_INFO;
int log_syslog_priority = LOG_ERR;
int log_stderr_priority = LOG_ERR;
int daemonize = 0;
static int client_maxi;
static int client_size = 0;
struct client *client = NULL;
struct pollfd *pollfd = NULL;
int poll_timeout = -1;
typedef enum {
ACT_ARBITRATOR = 1,
ACT_SITE,
ACT_CLIENT,
} booth_role_t;
typedef enum {
OP_LIST = 1,
OP_GRANT,
OP_REVOKE,
} operation_t;
struct command_line {
int type; /* ACT_ */
int op; /* OP_ */
- int force;
char configfile[BOOTH_PATH_LEN];
char lockfile[BOOTH_PATH_LEN];
char site[BOOTH_NAME_LEN];
char ticket[BOOTH_NAME_LEN];
};
static struct command_line cl;
int do_read(int fd, void *buf, size_t count)
{
int rv, off = 0;
while (off < count) {
rv = read(fd, (char *)buf + off, count - off);
if (rv == 0)
return -1;
if (rv == -1 && errno == EINTR)
continue;
if (rv == -1)
return -1;
off += rv;
}
return 0;
}
int do_write(int fd, void *buf, size_t count)
{
int rv, off = 0;
retry:
rv = write(fd, (char *)buf + off, count);
if (rv == -1 && errno == EINTR)
goto retry;
if (rv < 0) {
log_error("write failed: %s (%d)", strerror(errno), errno);
return rv;
}
if (rv != count) {
count -= rv;
off += rv;
goto retry;
}
return 0;
}
static int do_connect(const char *sock_path)
{
struct sockaddr_un sun;
socklen_t addrlen;
int rv, fd;
fd = socket(PF_UNIX, SOCK_STREAM, 0);
if (fd < 0) {
log_error("failed to create socket: %s (%d)", strerror(errno), errno);
goto out;
}
memset(&sun, 0, sizeof(sun));
sun.sun_family = AF_UNIX;
strcpy(&sun.sun_path[1], sock_path);
addrlen = sizeof(sa_family_t) + strlen(sun.sun_path+1) + 1;
rv = connect(fd, (struct sockaddr *) &sun, addrlen);
if (rv < 0) {
if (errno == ECONNREFUSED)
log_error("Connection to boothd was refused; "
"please ensure that you are on a "
"machine which has boothd running.");
else
log_error("failed to connect: %s (%d)", strerror(errno), errno);
close(fd);
fd = rv;
}
out:
return fd;
}
-static void init_header(struct boothc_header *h, int cmd, int option,
+static void init_header(struct boothc_header *h, int cmd,
int result, int data_len)
{
memset(h, 0, sizeof(struct boothc_header));
h->magic = BOOTHC_MAGIC;
h->version = BOOTHC_VERSION;
h->len = data_len;
h->cmd = cmd;
- h->option = option;
h->result = result;
}
static void client_alloc(void)
{
int i;
if (!client) {
client = malloc(CLIENT_NALLOC * sizeof(struct client));
pollfd = malloc(CLIENT_NALLOC * sizeof(struct pollfd));
} else {
client = realloc(client, (client_size + CLIENT_NALLOC) *
sizeof(struct client));
pollfd = realloc(pollfd, (client_size + CLIENT_NALLOC) *
sizeof(struct pollfd));
if (!pollfd)
log_error("can't alloc for pollfd");
}
if (!client || !pollfd)
log_error("can't alloc for client array");
for (i = client_size; i < client_size + CLIENT_NALLOC; i++) {
client[i].workfn = NULL;
client[i].deadfn = NULL;
client[i].fd = -1;
pollfd[i].fd = -1;
pollfd[i].revents = 0;
}
client_size += CLIENT_NALLOC;
}
static void client_dead(int ci)
{
close(client[ci].fd);
client[ci].workfn = NULL;
client[ci].fd = -1;
pollfd[ci].fd = -1;
}
int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci))
{
int i;
if (!client)
client_alloc();
again:
for (i = 0; i < client_size; i++) {
if (client[i].fd == -1) {
client[i].workfn = workfn;
if (deadfn)
client[i].deadfn = deadfn;
else
client[i].deadfn = client_dead;
client[i].fd = fd;
pollfd[i].fd = fd;
pollfd[i].events = POLLIN;
if (i > client_maxi)
client_maxi = i;
return i;
}
}
client_alloc();
goto again;
}
static int setup_listener(const char *sock_path)
{
struct sockaddr_un addr;
socklen_t addrlen;
int rv, s;
s = socket(AF_LOCAL, SOCK_STREAM, 0);
if (s < 0) {
log_error("socket error %d: %s (%d)", s, strerror(errno), errno);
return s;
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_LOCAL;
strcpy(&addr.sun_path[1], sock_path);
addrlen = sizeof(sa_family_t) + strlen(addr.sun_path+1) + 1;
rv = bind(s, (struct sockaddr *) &addr, addrlen);
if (rv < 0) {
log_error("bind error %d: %s (%d)", rv, strerror(errno), errno);
close(s);
return rv;
}
rv = listen(s, 5);
if (rv < 0) {
log_error("listen error %d: %s (%d)", rv, strerror(errno), errno);
close(s);
return rv;
}
return s;
}
void process_connection(int ci)
{
struct boothc_header h;
char *data = NULL;
char *site, *ticket;
int ticket_owner;
int local, rv;
void (*deadfn) (int ci);
rv = do_read(client[ci].fd, &h, sizeof(h));
if (rv < 0) {
if (errno == ECONNRESET)
log_debug("client %d connection reset for fd %d",
ci, client[ci].fd);
deadfn = client[ci].deadfn;
if(deadfn) {
deadfn(ci);
}
return;
}
if (h.magic != BOOTHC_MAGIC) {
log_error("connection %d magic error %x", ci, h.magic);
return;
}
if (h.version != BOOTHC_VERSION) {
log_error("connection %d version error %x", ci, h.version);
return;
}
if (h.len) {
data = malloc(h.len);
if (!data) {
log_error("process_connection no mem %u", h.len);
return;
}
memset(data, 0, h.len);
rv = do_read(client[ci].fd, data, h.len);
if (rv < 0) {
log_error("connection %d read data error %d", ci, rv);
goto out;
}
}
switch (h.cmd) {
case BOOTHC_CMD_LIST:
assert(!data);
h.result = list_ticket(&data, &h.len);
break;
case BOOTHC_CMD_GRANT:
h.len = 0;
site = data;
ticket = data + BOOTH_NAME_LEN;
if (!check_ticket(ticket)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (get_ticket_info(ticket, &ticket_owner, NULL) == 0) {
if (ticket_owner > -1) {
log_error("client want to get an granted "
"ticket %s", ticket);
h.result = BOOTHC_RLT_OVERGRANT;
goto reply;
}
} else {
log_error("can not get ticket %s's info", ticket);
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (!check_site(site, &local)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (local)
- h.result = grant_ticket(ticket, h.option);
+ h.result = grant_ticket(ticket);
else
h.result = BOOTHC_RLT_REMOTE_OP;
break;
case BOOTHC_CMD_REVOKE:
h.len = 0;
site = data;
ticket = data + BOOTH_NAME_LEN;
if (!check_ticket(ticket)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (!check_site(site, &local)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (local)
- h.result = revoke_ticket(ticket, h.option);
+ h.result = revoke_ticket(ticket);
else
h.result = BOOTHC_RLT_REMOTE_OP;
break;
case BOOTHC_CMD_CATCHUP:
h.result = catchup_ticket(&data, h.len);
break;
default:
log_error("connection %d cmd %x unknown", ci, h.cmd);
break;
}
reply:
rv = do_write(client[ci].fd, &h, sizeof(h));
if (rv < 0)
log_error("connection %d write error %d", ci, rv);
if (h.len) {
rv = do_write(client[ci].fd, data, h.len);
if (rv < 0)
log_error("connection %d write error %d", ci, rv);
}
out:
free(data);
}
static void process_listener(int ci)
{
int fd, i;
fd = accept(client[ci].fd, NULL, NULL);
if (fd < 0) {
log_error("process_listener: accept error for fd %d: %s (%d)",
fd, strerror(errno), errno);
return;
}
i = client_add(fd, process_connection, NULL);
log_debug("add client connection %d fd %d", i, fd);
}
static int setup_config(int type)
{
int rv;
rv = read_config(cl.configfile);
if (rv < 0)
goto out;
rv = check_config(type);
if (rv < 0)
goto out;
out:
return rv;
}
static int setup_transport(void)
{
int rv;
transport_layer_t proto = booth_conf->proto;
rv = booth_transport[proto].init(ticket_recv);
if (rv < 0) {
log_error("failed to init booth_transport[%d]", proto);
goto out;
}
rv = booth_transport[TCP].init(NULL);
if (rv < 0) {
log_error("failed to init booth_transport[TCP]");
goto out;
}
out:
return rv;
}
static int setup_timer(void)
{
return timerlist_init();
}
static int setup(int type)
{
int rv;
rv = setup_config(type);
if (rv < 0)
goto fail;
rv = setup_timer();
if (rv < 0)
goto fail;
rv = setup_transport();
if (rv < 0)
goto fail;
rv = setup_ticket();
if (rv < 0)
goto fail;
rv = setup_listener(BOOTHC_SOCK_PATH);
if (rv < 0)
goto fail;
client_add(rv, process_listener, NULL);
return 0;
fail:
return -1;
}
static int loop(void)
{
void (*workfn) (int ci);
void (*deadfn) (int ci);
int rv, i;
while (1) {
rv = poll(pollfd, client_maxi + 1, poll_timeout);
if (rv == -1 && errno == EINTR)
continue;
if (rv < 0) {
log_error("poll failed: %s (%d)", strerror(errno), errno);
goto fail;
}
for (i = 0; i <= client_maxi; i++) {
if (client[i].fd < 0)
continue;
if (pollfd[i].revents & POLLIN) {
workfn = client[i].workfn;
if (workfn)
workfn(i);
}
if (pollfd[i].revents &
(POLLERR | POLLHUP | POLLNVAL)) {
deadfn = client[i].deadfn;
if (deadfn)
deadfn(i);
}
}
process_timerlist();
}
return 0;
fail:
return -1;
}
static int do_list(void)
{
struct boothc_header h, *rh;
char *reply = NULL, *data;
int data_len;
int fd, rv;
- init_header(&h, BOOTHC_CMD_LIST, 0, 0, 0);
+ init_header(&h, BOOTHC_CMD_LIST, 0, 0);
fd = do_connect(BOOTHC_SOCK_PATH);
if (fd < 0) {
rv = fd;
goto out;
}
rv = do_write(fd, &h, sizeof(h));
if (rv < 0)
goto out_close;
reply = malloc(sizeof(struct boothc_header));
if (!reply) {
rv = -ENOMEM;
goto out_close;
}
rv = do_read(fd, reply, sizeof(struct boothc_header));
if (rv < 0)
goto out_free;
rh = (struct boothc_header *)reply;
data_len = rh->len;
reply = realloc(reply, sizeof(struct boothc_header) + data_len);
if (!reply) {
rv = -ENOMEM;
goto out_free;
}
data = reply + sizeof(struct boothc_header);
rv = do_read(fd, data, data_len);
if (rv < 0)
goto out_free;
do_write(STDOUT_FILENO, data, data_len);
rv = 0;
out_free:
free(reply);
out_close:
close(fd);
out:
return rv;
}
static inline void load_bar(int x, int n, int r, int w)
{
int i;
float ratio;
int c;
/* Only update r times.*/
if ( x % (n / r) != 0 ) return;
/* Calculuate the ratio of complete-to-incomplete.*/
ratio = x / (float)n;
c = ratio * w;
/* Show the percentage complete.*/
printf("%3d%% [", (int)(ratio * 100));
/* Show the load bar.*/
for (i = 0; i < c; i++)
printf("=");
for (i = c; i < w; i++)
printf(" ");
printf("]");
printf("\r");
fflush(stdout);
}
static void counting_down(int total_time)
{
struct winsize size;
int screen_width;
int i;
ioctl(STDIN_FILENO, TIOCGWINSZ, (char*)&size);
screen_width = size.ws_col / (float)2;
/* ignore signals */
signal(SIGTERM, SIG_IGN);
signal(SIGINT, SIG_IGN);
signal(SIGHUP, SIG_IGN);
i = 0;
while (i <= total_time) {
load_bar(i, total_time, total_time, screen_width);
sleep(1);
i++;
}
log_info("\nCounting Down Over...\n");
}
static int do_command(cmd_request_t cmd)
{
char *buf;
struct boothc_header *h, reply;
int buflen;
- uint32_t force = 0;
int fd, rv;
int expire_time;
int i;
buflen = sizeof(struct boothc_header) +
sizeof(cl.site) + sizeof(cl.ticket);
buf = malloc(buflen);
if (!buf) {
rv = -ENOMEM;
goto out;
}
h = (struct boothc_header *)buf;
- if (cl.force)
- force = BOOTHC_OPT_FORCE;
- init_header(h, cmd, force, 0,
- sizeof(cl.site) + sizeof(cl.ticket));
+ init_header(h, cmd, 0, sizeof(cl.site) + sizeof(cl.ticket));
strcpy(buf + sizeof(struct boothc_header), cl.site);
strcpy(buf + sizeof(struct boothc_header) + sizeof(cl.site), cl.ticket);
fd = do_connect(BOOTHC_SOCK_PATH);
if (fd < 0) {
rv = fd;
goto out_free;
}
rv = do_write(fd, buf, buflen);
if (rv < 0)
goto out_close;
rv = do_read(fd, &reply, sizeof(struct boothc_header));
if (rv < 0)
goto out_close;
if (reply.result == BOOTHC_RLT_INVALID_ARG) {
log_info("invalid argument!");
rv = -1;
goto out_close;
}
if (reply.result == BOOTHC_RLT_OVERGRANT) {
log_info("You're granting a granted ticket"
"If you wanted to migrate a ticket,"
"use revoke first, then use grant");
rv = -1;
goto out_close;
}
if (reply.result == BOOTHC_RLT_REMOTE_OP) {
struct booth_node to;
int s;
memset(&to, 0, sizeof(struct booth_node));
to.family = BOOTH_PROTO_FAMILY;
strcpy(to.addr, cl.site);
s = booth_transport[TCP].open(&to);
if (s < 0)
goto out_close;
rv = booth_transport[TCP].send(s, buf, buflen);
if (rv < 0) {
booth_transport[TCP].close(s);
goto out_close;
}
rv = booth_transport[TCP].recv(s, &reply,
sizeof(struct boothc_header));
if (rv < 0) {
booth_transport[TCP].close(s);
goto out_close;
}
booth_transport[TCP].close(s);
}
if (reply.result == BOOTHC_RLT_ASYNC) {
if (cmd == BOOTHC_CMD_GRANT)
log_info("grant command sent, result will be returned "
"asynchronously, you can get the result from "
"the log files");
else if (cmd == BOOTHC_CMD_REVOKE) {
log_info("revoke command sent, result will be returned "
"asynchronously, you can get the result from "
"the log files after the ticket expiry time.");
i = 0;
/* FIXME: if we access the server then get the actual
* remaining time the waiting will be shorter, for now,
* client is just waiting the expiry time.
*/
read_config(cl.configfile);
while (i < booth_conf->ticket_count) {
if (!strncmp(booth_conf->ticket[i].name, cl.ticket,
BOOTH_NAME_LEN)) {
expire_time = booth_conf->ticket[i].expiry;
log_info("You have to wait %d seconds to "
"ensure all timer has expired!",
expire_time);
counting_down(expire_time);
rv = 0;
break;
}
i++;
/* no ticket found in conf file */
if( i == booth_conf->ticket_count ) {
log_error("check your config file, "
"ticket %s not found", cl.ticket);
log_error("your booth's config file may "
"not be the same!");
break;
rv = -1;
}
}
}
else
log_error("internal error when reading reply result!");
rv = 0;
} else if (reply.result == BOOTHC_RLT_SYNC_SUCC) {
if (cmd == BOOTHC_CMD_GRANT)
log_info("grant succeeded!");
else if (cmd == BOOTHC_CMD_REVOKE)
log_info("revoke succeeded!");
rv = 0;
} else if (reply.result == BOOTHC_RLT_SYNC_FAIL) {
if (cmd == BOOTHC_CMD_GRANT)
log_info("grant failed!");
else if (cmd == BOOTHC_CMD_REVOKE)
log_info("revoke failed!");
rv = 0;
} else {
log_error("internal error!");
rv = -1;
}
out_close:
close(fd);
out_free:
free(buf);
out:
return rv;
}
static int do_grant(void)
{
return do_command(BOOTHC_CMD_GRANT);
}
static int do_revoke(void)
{
return do_command(BOOTHC_CMD_REVOKE);
}
static int lockfile(void)
{
char buf[16];
struct flock lock;
int fd, rv;
fd = open(cl.lockfile, O_CREAT|O_WRONLY, 0666);
if (fd < 0) {
log_error("lockfile open error %s: %s",
cl.lockfile, strerror(errno));
return -1;
}
lock.l_type = F_WRLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
rv = fcntl(fd, F_SETLK, &lock);
if (rv < 0) {
log_error("lockfile setlk error %s: %s",
cl.lockfile, strerror(errno));
goto fail;
}
rv = ftruncate(fd, 0);
if (rv < 0) {
log_error("lockfile truncate error %s: %s",
cl.lockfile, strerror(errno));
goto fail;
}
memset(buf, 0, sizeof(buf));
snprintf(buf, sizeof(buf), "%d\n", getpid());
rv = write(fd, buf, strlen(buf));
if (rv <= 0) {
log_error("lockfile write error %s: %s",
cl.lockfile, strerror(errno));
goto fail;
}
return fd;
fail:
close(fd);
return -1;
}
static void unlink_lockfile(int fd)
{
unlink(cl.lockfile);
close(fd);
}
static void print_usage(void)
{
printf("Usage:\n");
printf("booth <type> <operation> [options]\n");
printf("\n");
printf("Types:\n");
printf(" arbitrator: daemon running on arbitrator\n");
printf(" site: daemon running on cluster site\n");
printf(" client: command running from client\n");
printf("\n");
printf("Operations:\n");
printf("Please note that operations are valid iff type is client!\n");
printf(" list: List all the tickets\n");
printf(" grant: Grant ticket T(-t T) to site S(-s S)\n");
printf(" revoke: Revoke ticket T(-t T) from site S(-s S)\n");
printf("\n");
printf("Options:\n");
printf(" -c FILE Specify config file [default " BOOTH_DEFAULT_CONF "]\n");
printf(" -l LOCKFILE Specify lock file [default " BOOTH_DEFAULT_LOCKFILE "]\n");
printf(" -D Enable debugging to stderr and don't fork\n");
printf(" -t ticket name\n");
printf(" -s site name\n");
- printf(" -f ticket attribute: force, only valid when "
- "granting\n");
printf(" -h Print this help, then exit\n");
}
-#define OPTION_STRING "c:Dl:t:s:fh"
+#define OPTION_STRING "c:Dl:t:s:h"
static char *logging_entity = NULL;
void safe_copy(char *dest, char *value, size_t buflen, const char *description) {
if (strlen(value) >= buflen) {
fprintf(stderr, "'%s' exceeds maximum %s length of %ld\n",
value, description, (long)(buflen - 1));
exit(EXIT_FAILURE);
}
strncpy(dest, value, buflen - 1);
}
static int read_arguments(int argc, char **argv)
{
int optchar;
char *arg1 = argv[1];
char *op = NULL;
if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") ||
!strcmp(arg1, "-h")) {
print_usage();
exit(EXIT_SUCCESS);
}
if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") ||
!strcmp(arg1, "-V")) {
printf("%s %s (built %s %s)\n",
argv[0], RELEASE_VERSION, __DATE__, __TIME__);
exit(EXIT_SUCCESS);
}
if (!strcmp(arg1, "arbitrator")) {
cl.type = ACT_ARBITRATOR;
logging_entity = (char *) DAEMON_NAME "-arbitrator";
optind = 2;
} else if (!strcmp(arg1, "site")) {
cl.type = ACT_SITE;
logging_entity = (char *) DAEMON_NAME "-site";
optind = 2;
} else if (!strcmp(arg1, "client")) {
cl.type = ACT_CLIENT;
if (argc < 3) {
print_usage();
exit(EXIT_FAILURE);
}
op = argv[2];
optind = 3;
} else {
cl.type = ACT_CLIENT;
op = argv[1];
optind = 2;
}
switch (cl.type) {
case ACT_ARBITRATOR:
break;
case ACT_SITE:
break;
case ACT_CLIENT:
if (!strcmp(op, "list"))
cl.op = OP_LIST;
else if (!strcmp(op, "grant"))
cl.op = OP_GRANT;
else if (!strcmp(op, "revoke"))
cl.op = OP_REVOKE;
else {
fprintf(stderr, "client operation \"%s\" is unknown\n",
op);
exit(EXIT_FAILURE);
}
break;
}
while (optind < argc) {
optchar = getopt(argc, argv, OPTION_STRING);
switch (optchar) {
case 'c':
safe_copy(cl.configfile, optarg, sizeof(cl.configfile), "config file");
break;
case 'D':
daemonize = 1;
debug_level = 1;
log_logfile_priority = LOG_DEBUG;
log_syslog_priority = LOG_DEBUG;
break;
case 'l':
safe_copy(cl.lockfile, optarg, sizeof(cl.lockfile), "lock file");
break;
case 't':
if (cl.op == OP_GRANT || cl.op == OP_REVOKE) {
safe_copy(cl.ticket, optarg, sizeof(cl.ticket), "ticket name");
} else {
print_usage();
exit(EXIT_FAILURE);
}
break;
case 's':
if (cl.op == OP_GRANT || cl.op == OP_REVOKE) {
safe_copy(cl.site, optarg, sizeof(cl.ticket), "site name");
} else {
print_usage();
exit(EXIT_FAILURE);
}
break;
- case 'f':
- if (cl.op == OP_GRANT)
- cl.force = 1;
- else {
- print_usage();
- exit(EXIT_FAILURE);
- }
- break;
-
case 'h':
print_usage();
exit(EXIT_SUCCESS);
break;
case ':':
case '?':
fprintf(stderr, "Please use '-h' for usage.\n");
exit(EXIT_FAILURE);
break;
default:
fprintf(stderr, "unknown option: %s\n", argv[optind]);
exit(EXIT_FAILURE);
break;
};
}
return 0;
}
static void set_scheduler(void)
{
struct sched_param sched_param;
struct rlimit rlimit;
int rv;
rlimit.rlim_cur = RLIM_INFINITY;
rlimit.rlim_max = RLIM_INFINITY;
setrlimit(RLIMIT_MEMLOCK, &rlimit);
rv = mlockall(MCL_CURRENT | MCL_FUTURE);
if (rv < 0) {
log_error("mlockall failed");
}
rv = sched_get_priority_max(SCHED_RR);
if (rv != -1) {
sched_param.sched_priority = rv;
rv = sched_setscheduler(0, SCHED_RR, &sched_param);
if (rv == -1)
log_error("could not set SCHED_RR priority %d: %s (%d)",
sched_param.sched_priority,
strerror(errno), errno);
} else {
log_error("could not get maximum scheduler priority err %d",
errno);
}
}
static void set_oom_adj(int val)
{
FILE *fp;
fp = fopen("/proc/self/oom_adj", "w");
if (!fp)
return;
fprintf(fp, "%i", val);
fclose(fp);
}
static int do_server(int type)
{
int fd = -1;
int rv = -1;
rv = setup(type);
if (rv < 0)
goto out;
if (!daemonize) {
if (daemon(0, 0) < 0) {
perror("daemon error");
exit(EXIT_FAILURE);
}
}
/*
The lock cannot be obtained before the call to daemon(), otherwise
the lockfile would contain the pid of the parent, not the daemon.
*/
fd = lockfile();
if (fd < 0)
return fd;
if (type == ARBITRATOR)
log_info("BOOTH arbitrator daemon started");
else if (type == SITE)
log_info("BOOTH cluster site daemon started");
set_scheduler();
set_oom_adj(-16);
rv = loop();
out:
if (fd >= 0)
unlink_lockfile(fd);
return rv;
}
static int do_client(void)
{
int rv = -1;
switch (cl.op) {
case OP_LIST:
rv = do_list();
break;
case OP_GRANT:
rv = do_grant();
break;
case OP_REVOKE:
rv = do_revoke();
break;
}
return rv;
}
int main(int argc, char *argv[])
{
int rv;
memset(&cl, 0, sizeof(cl));
strncpy(cl.configfile, BOOTH_DEFAULT_CONF, BOOTH_PATH_LEN - 1);
strncpy(cl.lockfile, BOOTH_DEFAULT_LOCKFILE, BOOTH_PATH_LEN - 1);
rv = read_arguments(argc, argv);
if (rv < 0)
goto out;
if (cl.type == ACT_CLIENT) {
cl_log_enable_stderr(TRUE);
cl_log_set_facility(0);
} else {
cl_log_set_entity(logging_entity);
cl_log_enable_stderr(debug_level ? TRUE : FALSE);
cl_log_set_facility(HA_LOG_FACILITY);
}
cl_inherit_logging_environment(0);
switch (cl.type) {
case ACT_ARBITRATOR:
rv = do_server(ARBITRATOR);
break;
case ACT_SITE:
rv = do_server(SITE);
break;
case ACT_CLIENT:
rv = do_client();
break;
}
out:
return rv ? EXIT_FAILURE : EXIT_SUCCESS;
}
diff --git a/src/ticket.c b/src/ticket.c
index 61451e3..1ead7b8 100644
--- a/src/ticket.c
+++ b/src/ticket.c
@@ -1,591 +1,580 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <assert.h>
#include <time.h>
#include "ticket.h"
#include "config.h"
#include "pacemaker.h"
#include "list.h"
#include "log.h"
#include "booth.h"
#include "timer.h"
#include "paxos_lease.h"
#include "paxos.h"
#define PAXOS_MAGIC 0xDB12
#define TK_LINE 256
struct booth_msghdr {
uint16_t magic;
uint16_t checksum;
uint32_t len;
} __attribute__((packed));
struct ticket_msg {
char id[BOOTH_NAME_LEN+1];
uint32_t owner;
uint32_t expiry;
uint32_t ballot;
uint32_t result;
} __attribute__((packed));
struct ticket {
char id[BOOTH_NAME_LEN+1];
pl_handle_t handle;
int owner;
int expiry;
int ballot;
unsigned long long expires;
struct list_head list;
};
static LIST_HEAD(ticket_list);
static unsigned char *role;
int check_ticket(char *ticket)
{
int i;
if (!booth_conf)
return 0;
for (i = 0; i < booth_conf->ticket_count; i++) {
if (!strcmp(booth_conf->ticket[i].name, ticket))
return 1;
}
return 0;
}
int check_site(char *site, int *local)
{
int i;
if (!booth_conf)
return 0;
for (i = 0; i < booth_conf->node_count; i++) {
if (booth_conf->node[i].type == SITE
&& !strcmp(booth_conf->node[i].addr, site)) {
*local = booth_conf->node[i].local;
return 1;
}
}
return 0;
}
static int * ticket_priority(int i)
{
int j;
/* TODO: need more precise check */
for (j = 0; j < booth_conf->node_count; j++) {
if (booth_conf->ticket[i].weight[j] == 0)
return NULL;
}
return booth_conf->ticket[i].weight;
}
static int ticket_get_myid(void)
{
return booth_transport[booth_conf->proto].get_myid();
}
static void end_acquire(pl_handle_t handle, int result)
{
struct ticket *tk;
int found = 0;
if (result == 0) {
list_for_each_entry(tk, &ticket_list, list) {
if (tk->handle == handle) {
found = 1;
break;
}
}
if (!found)
log_error("BUG: ticket handle %ld does not exist",
handle);
log_info("ticket %s was granted/reovked successfully (site %d)",
tk->id, ticket_get_myid());
}
}
static int ticket_send(unsigned long id, void *value, int len)
{
int i, rv = -1;
struct booth_node *to = NULL;
struct booth_msghdr *hdr;
void *buf;
for (i = 0; i < booth_conf->node_count; i++) {
if (booth_conf->node[i].nodeid == id)
to = &booth_conf->node[i];
}
if (!to)
return rv;
buf = malloc(sizeof(struct booth_msghdr) + len);
if (!buf)
return -ENOMEM;
memset(buf, 0, sizeof(struct booth_msghdr) + len);
hdr = buf;
hdr->magic = htons(PAXOS_MAGIC);
hdr->len = htonl(sizeof(struct booth_msghdr) + len);
memcpy((char *)buf + sizeof(struct booth_msghdr), value, len);
rv = booth_transport[booth_conf->proto].send(
(unsigned long)to, buf, sizeof(struct booth_msghdr) + len);
free(buf);
return rv;
}
static int ticket_broadcast(void *value, int len)
{
void *buf;
struct booth_msghdr *hdr;
int rv;
buf = malloc(sizeof(struct booth_msghdr) + len);
if (!buf)
return -ENOMEM;
memset(buf, 0, sizeof(struct booth_msghdr) + len);
hdr = buf;
hdr->magic = htons(PAXOS_MAGIC);
hdr->len = htonl(sizeof(struct booth_msghdr) + len);
memcpy((char *)buf + sizeof(struct booth_msghdr), value, len);
rv = booth_transport[booth_conf->proto].broadcast(
buf, sizeof(struct booth_msghdr) + len);
free(buf);
return rv;
}
#if 0
static int ticket_read(const void *name, int *owner, int *ballot,
unsigned long long *expires)
{
struct ticket *tk;
int found = 0;
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, name)) {
found = 1;
break;
}
}
if (!found) {
log_error("BUG: ticket_read failed (ticket %s does not exist)",
(char *)name);
return -1;
}
pcmk_handler.load_ticket(tk->id, &tk->owner, &tk->ballot, &tk->expires);
*owner = tk->owner;
*expires = tk->expires;
*ballot = tk->ballot;
return 0;
}
#endif
static int ticket_parse(struct ticket_msg *tmsg)
{
struct ticket *tk;
int found = 0;
if (!tmsg->result)
return -1;
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, tmsg->id)) {
tk->owner = tmsg->owner;
tk->expires = current_time() + tmsg->expiry;
tk->ballot = tmsg->ballot;
found = 1;
break;
}
}
if (!found)
return -1;
else
return 0;
}
static int ticket_catchup(const void *name, int *owner, int *ballot,
unsigned long long *expires)
{
struct ticket *tk;
int i, s, buflen, rv = 0;
char *buf = NULL;
struct boothc_header *h;
struct ticket_msg *tmsg;
int myid = ticket_get_myid();
if (booth_conf->node[myid].type != ARBITRATOR) {
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, name)) {
pcmk_handler.load_ticket(tk->id,
&tk->owner,
&tk->ballot,
&tk->expires);
if (current_time() >= tk->expires) {
tk->owner = -1;
tk->expires = 0;
}
}
}
}
buflen = sizeof(struct boothc_header) + sizeof(struct ticket_msg);
buf = malloc(buflen);
if (!buf)
return -ENOMEM;
memset(buf, 0, buflen);
h = (struct boothc_header *)buf;
h->magic = BOOTHC_MAGIC;
h->version = BOOTHC_VERSION;
h->cmd = BOOTHC_CMD_CATCHUP;
h->len = sizeof(struct ticket_msg);
tmsg = (struct ticket_msg *)(buf + sizeof(struct boothc_header));
for (i = 0; i < booth_conf->node_count; i++) {
if (booth_conf->node[i].type == SITE &&
!(booth_conf->node[i].local)) {
strncpy(tmsg->id, name, BOOTH_NAME_LEN + 1);
log_debug("attempting catchup from %s", booth_conf->node[i].addr);
s = booth_transport[TCP].open(&booth_conf->node[i]);
if (s < 0)
continue;
log_debug("connected to %s", booth_conf->node[i].addr);
rv = booth_transport[TCP].send(s, buf, buflen);
if (rv < 0) {
booth_transport[TCP].close(s);
continue;
}
log_debug("sent catchup command to %s", booth_conf->node[i].addr);
memset(tmsg, 0, sizeof(struct ticket_msg));
rv = booth_transport[TCP].recv(s, buf, buflen);
if (rv < 0) {
booth_transport[TCP].close(s);
continue;
}
booth_transport[TCP].close(s);
ticket_parse(tmsg);
memset(tmsg, 0, sizeof(struct ticket_msg));
}
}
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, name)) {
if (booth_conf->node[myid].type != ARBITRATOR) {
if (current_time() >= tk->expires) {
tk->owner = -1;
tk->expires = 0;
}
pcmk_handler.store_ticket(tk->id,
tk->owner,
tk->ballot,
tk->expires);
if (tk->owner == myid)
pcmk_handler.grant_ticket(tk->id);
else
pcmk_handler.revoke_ticket(tk->id);
}
*owner = tk->owner;
*expires = tk->expires;
*ballot = tk->ballot;
}
}
free(buf);
return rv;
}
static int ticket_write(pl_handle_t handle, struct paxos_lease_result *result)
{
struct ticket *tk;
int found = 0;
list_for_each_entry(tk, &ticket_list, list) {
if (tk->handle == handle) {
found = 1;
break;
}
}
if (!found) {
log_error("BUG: ticket_write failed "
"(ticket handle %ld does not exist)", handle);
return -1;
}
tk->owner = result->owner;
tk->expires = result->expires;
tk->ballot = result->ballot;
if (tk->owner == ticket_get_myid()) {
pcmk_handler.store_ticket(tk->id, tk->owner, tk->ballot, tk->expires);
pcmk_handler.grant_ticket(tk->id);
} else if (tk->owner == -1) {
pcmk_handler.store_ticket(tk->id, tk->owner, tk->ballot, tk->expires);
pcmk_handler.revoke_ticket(tk->id);
} else
pcmk_handler.store_ticket(tk->id, tk->owner, tk->ballot, tk->expires);
return 0;
}
static void ticket_status_recovery(pl_handle_t handle)
{
paxos_lease_status_recovery(handle);
}
int ticket_recv(void *msg, int msglen)
{
struct booth_msghdr *hdr;
char *data;
hdr = msg;
if (ntohs(hdr->magic) != PAXOS_MAGIC ||
ntohl(hdr->len) != msglen) {
log_error("message received error");
return -1;
}
data = (char *)msg + sizeof(struct booth_msghdr);
return paxos_lease_on_receive(data,
msglen - sizeof(struct booth_msghdr));
}
-int grant_ticket(char *ticket, int force)
+int grant_ticket(char *ticket)
{
struct ticket *tk;
int found = 0;
- if (force) {
- pcmk_handler.store_ticket(ticket, ticket_get_myid(), 0, -1);
- pcmk_handler.grant_ticket(ticket);
- return BOOTHC_RLT_SYNC_SUCC;
- }
-
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, ticket)) {
found = 1;
break;
}
}
if (!found) {
log_error("ticket %s does not exist", ticket);
return BOOTHC_RLT_SYNC_FAIL;
}
if (tk->owner == ticket_get_myid())
return BOOTHC_RLT_SYNC_SUCC;
else {
paxos_lease_acquire(tk->handle, 1, end_acquire);
return BOOTHC_RLT_ASYNC;
}
}
-int revoke_ticket(char *ticket, int force)
+int revoke_ticket(char *ticket)
{
struct ticket *tk;
int found = 0;
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, ticket)) {
found = 1;
break;
}
}
if (!found) {
log_error("ticket %s does not exist", ticket);
return BOOTHC_RLT_SYNC_FAIL;
}
- if (force) {
- pcmk_handler.store_ticket(tk->id, -1, 0, 0);
- pcmk_handler.revoke_ticket(tk->id);
- }
-
if (tk->owner == -1)
return BOOTHC_RLT_SYNC_SUCC;
else {
paxos_lease_release(tk->handle);
return BOOTHC_RLT_ASYNC;
}
}
int get_ticket_info(char *name, int *owner, int *expires)
{
struct ticket *tk;
list_for_each_entry(tk, &ticket_list, list) {
if (!strncmp(tk->id, name, BOOTH_NAME_LEN + 1)) {
if(owner)
*owner = tk->owner;
if(expires)
*expires = tk->expires;
return 0;
}
}
return -1;
}
int list_ticket(char **pdata, unsigned int *len)
{
struct ticket *tk;
char timeout_str[100];
char node_name[BOOTH_NAME_LEN];
char tmp[TK_LINE];
*pdata = NULL;
*len = 0;
list_for_each_entry(tk, &ticket_list, list) {
memset(tmp, 0, TK_LINE);
strncpy(timeout_str, "INF", sizeof(timeout_str));
strncpy(node_name, "None", sizeof(node_name));
if (tk->owner < MAX_NODES && tk->owner > -1)
strncpy(node_name, booth_conf->node[tk->owner].addr,
sizeof(node_name));
if (tk->expires != 0)
strftime(timeout_str, sizeof(timeout_str), "%Y/%m/%d %H:%M:%S",
localtime((time_t *)&tk->expires));
snprintf(tmp, TK_LINE, "ticket: %s, owner: %s, expires: %s\n",
tk->id, node_name, timeout_str);
*pdata = realloc(*pdata, *len + TK_LINE);
if (*pdata == NULL)
return -ENOMEM;
memset(*pdata + *len, 0, TK_LINE);
memcpy(*pdata + *len, tmp, TK_LINE);
*len += TK_LINE;
}
return 0;
}
int catchup_ticket(char **pdata, unsigned int len)
{
struct ticket_msg *tmsg;
struct ticket *tk;
assert(len == sizeof(struct ticket_msg));
tmsg = (struct ticket_msg *)(*pdata);
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, tmsg->id) && tk->owner == ticket_get_myid()
&& current_time() < tk->expires) {
tmsg->owner = tk->owner;
tmsg->expiry = tk->expires - current_time();
tmsg->ballot = tk->ballot;
tmsg->result = 1;
break;
}
}
if (!tmsg->result)
memset(*pdata, 0, len);
return 0;
}
const struct paxos_lease_operations ticket_operations = {
.get_myid = ticket_get_myid,
.send = ticket_send,
.broadcast = ticket_broadcast,
.catchup = ticket_catchup,
.notify = ticket_write,
};
int setup_ticket(void)
{
struct ticket *tk, *tmp;
int i, rv;
pl_handle_t plh;
int myid;
role = malloc(booth_conf->node_count * sizeof(unsigned char));
if (!role)
return -ENOMEM;
memset(role, 0, booth_conf->node_count * sizeof(unsigned char));
for (i = 0; i < booth_conf->node_count; i++) {
if (booth_conf->node[i].type == SITE)
role[i] = PROPOSER | ACCEPTOR | LEARNER;
else if (booth_conf->node[i].type == ARBITRATOR)
role[i] = ACCEPTOR | LEARNER;
}
for (i = 0; i < booth_conf->ticket_count; i++) {
tk = malloc(sizeof(struct ticket));
if (!tk) {
rv = -ENOMEM;
goto out;
}
memset(tk, 0, sizeof(struct ticket));
strcpy(tk->id, booth_conf->ticket[i].name);
tk->owner = -1;
tk->expiry = booth_conf->ticket[i].expiry;
if (!tk->expiry)
tk->expiry = DEFAULT_TICKET_EXPIRY;
list_add_tail(&tk->list, &ticket_list);
plh = paxos_lease_init(tk->id,
BOOTH_NAME_LEN,
tk->expiry,
booth_conf->node_count,
1,
role,
ticket_priority(i),
&ticket_operations);
if (plh <= 0) {
log_error("paxos lease initialization failed");
rv = plh;
goto out;
}
tk->handle = plh;
}
myid = ticket_get_myid();
assert(myid < booth_conf->node_count);
if (role[myid] & ACCEPTOR) {
list_for_each_entry(tk, &ticket_list, list) {
ticket_status_recovery(tk->handle);
}
}
return 0;
out:
list_for_each_entry_safe(tk, tmp, &ticket_list, list) {
list_del(&tk->list);
}
free(role);
return rv;
}
diff --git a/src/ticket.h b/src/ticket.h
index 92928ab..407af19 100644
--- a/src/ticket.h
+++ b/src/ticket.h
@@ -1,34 +1,34 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _TICKET_H
#define _TICKET_H
#define DEFAULT_TICKET_EXPIRY 600
int check_ticket(char *ticket);
int check_site(char *site, int *local);
-int grant_ticket(char *ticket, int force);
-int revoke_ticket(char *ticket, int force);
+int grant_ticket(char *ticket);
+int revoke_ticket(char *ticket);
int list_ticket(char **pdata, unsigned int *len);
int catchup_ticket(char **pdata, unsigned int len);
int ticket_recv(void *msg, int msglen);
int setup_ticket(void);
int get_ticket_info(char *name, int *owner, int *expires);
#endif /* _TICKET_H */
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Tue, Feb 25, 2:54 AM (1 d, 4 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1464625
Default Alt Text
(43 KB)
Attached To
Mode
rB Booth
Attached
Detach File
Event Timeline
Log In to Comment