Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F3155038
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
37 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/src/main.c b/src/main.c
index e8d762b..8e51007 100644
--- a/src/main.c
+++ b/src/main.c
@@ -1,1045 +1,1045 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sched.h>
#include <errno.h>
#include <limits.h>
#include <sys/file.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/poll.h>
#include <fcntl.h>
#include <string.h>
#include <assert.h>
#include <error.h>
#include <sys/ioctl.h>
#include <termios.h>
#include <signal.h>
#include "log.h"
#include "booth.h"
#include "config.h"
#include "transport.h"
#include "timer.h"
#include "pacemaker.h"
#include "ticket.h"
#define RELEASE_VERSION "1.0"
#define CLIENT_NALLOC 32
int log_logfile_priority = LOG_INFO;
int log_syslog_priority = LOG_ERR;
int log_stderr_priority = LOG_ERR;
int daemonize = 0;
static int client_maxi;
static int client_size = 0;
struct client *client = NULL;
struct pollfd *pollfd = NULL;
int poll_timeout = -1;
typedef enum {
ACT_ARBITRATOR = 1,
ACT_SITE,
ACT_CLIENT,
} booth_role_t;
typedef enum {
OP_LIST = 1,
OP_GRANT,
OP_REVOKE,
} operation_t;
struct command_line {
int type; /* ACT_ */
int op; /* OP_ */
char configfile[BOOTH_PATH_LEN];
char lockfile[BOOTH_PATH_LEN];
char site[BOOTH_NAME_LEN];
char ticket[BOOTH_NAME_LEN];
};
static struct command_line cl;
int do_read(int fd, void *buf, size_t count)
{
int rv, off = 0;
while (off < count) {
rv = read(fd, (char *)buf + off, count - off);
if (rv == 0)
return -1;
if (rv == -1 && errno == EINTR)
continue;
if (rv == -1)
return -1;
off += rv;
}
return 0;
}
int do_write(int fd, void *buf, size_t count)
{
int rv, off = 0;
retry:
rv = write(fd, (char *)buf + off, count);
if (rv == -1 && errno == EINTR)
goto retry;
if (rv < 0) {
log_error("write failed: %s (%d)", strerror(errno), errno);
return rv;
}
if (rv != count) {
count -= rv;
off += rv;
goto retry;
}
return 0;
}
static int do_connect(const char *sock_path)
{
struct sockaddr_un sun;
socklen_t addrlen;
int rv, fd;
fd = socket(PF_UNIX, SOCK_STREAM, 0);
if (fd < 0) {
log_error("failed to create socket: %s (%d)", strerror(errno), errno);
goto out;
}
memset(&sun, 0, sizeof(sun));
sun.sun_family = AF_UNIX;
strcpy(&sun.sun_path[1], sock_path);
addrlen = sizeof(sa_family_t) + strlen(sun.sun_path+1) + 1;
rv = connect(fd, (struct sockaddr *) &sun, addrlen);
if (rv < 0) {
if (errno == ECONNREFUSED)
log_error("Connection to boothd was refused; "
"please ensure that you are on a "
"machine which has boothd running.");
else
log_error("failed to connect: %s (%d)", strerror(errno), errno);
close(fd);
fd = rv;
}
out:
return fd;
}
static void init_header(struct boothc_header *h, int cmd,
int result, int data_len)
{
memset(h, 0, sizeof(struct boothc_header));
h->magic = BOOTHC_MAGIC;
h->version = BOOTHC_VERSION;
h->len = data_len;
h->cmd = cmd;
h->result = result;
}
static void client_alloc(void)
{
int i;
if (!client) {
client = malloc(CLIENT_NALLOC * sizeof(struct client));
pollfd = malloc(CLIENT_NALLOC * sizeof(struct pollfd));
} else {
client = realloc(client, (client_size + CLIENT_NALLOC) *
sizeof(struct client));
pollfd = realloc(pollfd, (client_size + CLIENT_NALLOC) *
sizeof(struct pollfd));
if (!pollfd)
log_error("can't alloc for pollfd");
}
if (!client || !pollfd)
log_error("can't alloc for client array");
for (i = client_size; i < client_size + CLIENT_NALLOC; i++) {
client[i].workfn = NULL;
client[i].deadfn = NULL;
client[i].fd = -1;
pollfd[i].fd = -1;
pollfd[i].revents = 0;
}
client_size += CLIENT_NALLOC;
}
static void client_dead(int ci)
{
close(client[ci].fd);
client[ci].workfn = NULL;
client[ci].fd = -1;
pollfd[ci].fd = -1;
}
int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci))
{
int i;
if (!client)
client_alloc();
again:
for (i = 0; i < client_size; i++) {
if (client[i].fd == -1) {
client[i].workfn = workfn;
if (deadfn)
client[i].deadfn = deadfn;
else
client[i].deadfn = client_dead;
client[i].fd = fd;
pollfd[i].fd = fd;
pollfd[i].events = POLLIN;
if (i > client_maxi)
client_maxi = i;
return i;
}
}
client_alloc();
goto again;
}
static int setup_listener(const char *sock_path)
{
struct sockaddr_un addr;
socklen_t addrlen;
int rv, s;
s = socket(AF_LOCAL, SOCK_STREAM, 0);
if (s < 0) {
log_error("socket error %d: %s (%d)", s, strerror(errno), errno);
return s;
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_LOCAL;
strcpy(&addr.sun_path[1], sock_path);
addrlen = sizeof(sa_family_t) + strlen(addr.sun_path+1) + 1;
rv = bind(s, (struct sockaddr *) &addr, addrlen);
if (rv < 0) {
log_error("bind error %d: %s (%d)", rv, strerror(errno), errno);
close(s);
return rv;
}
rv = listen(s, 5);
if (rv < 0) {
log_error("listen error %d: %s (%d)", rv, strerror(errno), errno);
close(s);
return rv;
}
return s;
}
void process_connection(int ci)
{
struct boothc_header h;
char *data = NULL;
char *site, *ticket;
int ticket_owner;
int local, rv;
void (*deadfn) (int ci);
rv = do_read(client[ci].fd, &h, sizeof(h));
if (rv < 0) {
if (errno == ECONNRESET)
log_debug("client %d connection reset for fd %d",
ci, client[ci].fd);
deadfn = client[ci].deadfn;
if(deadfn) {
deadfn(ci);
}
return;
}
if (h.magic != BOOTHC_MAGIC) {
log_error("connection %d magic error %x", ci, h.magic);
return;
}
if (h.version != BOOTHC_VERSION) {
log_error("connection %d version error %x", ci, h.version);
return;
}
if (h.len) {
data = malloc(h.len);
if (!data) {
log_error("process_connection no mem %u", h.len);
return;
}
memset(data, 0, h.len);
rv = do_read(client[ci].fd, data, h.len);
if (rv < 0) {
log_error("connection %d read data error %d", ci, rv);
goto out;
}
}
switch (h.cmd) {
case BOOTHC_CMD_LIST:
assert(!data);
h.result = list_ticket(&data, &h.len);
break;
case BOOTHC_CMD_GRANT:
h.len = 0;
site = data;
ticket = data + BOOTH_NAME_LEN;
if (!check_ticket(ticket)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (get_ticket_info(ticket, &ticket_owner, NULL) == 0) {
if (ticket_owner > -1) {
log_error("client want to get an granted "
"ticket %s", ticket);
h.result = BOOTHC_RLT_OVERGRANT;
goto reply;
}
} else {
log_error("can not get ticket %s's info", ticket);
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (!check_site(site, &local)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (local)
h.result = grant_ticket(ticket);
else
h.result = BOOTHC_RLT_REMOTE_OP;
break;
case BOOTHC_CMD_REVOKE:
h.len = 0;
site = data;
ticket = data + BOOTH_NAME_LEN;
if (!check_ticket(ticket)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (!check_site(site, &local)) {
h.result = BOOTHC_RLT_INVALID_ARG;
goto reply;
}
if (local)
h.result = revoke_ticket(ticket);
else
h.result = BOOTHC_RLT_REMOTE_OP;
break;
case BOOTHC_CMD_CATCHUP:
h.result = catchup_ticket(&data, h.len);
break;
default:
log_error("connection %d cmd %x unknown", ci, h.cmd);
break;
}
reply:
rv = do_write(client[ci].fd, &h, sizeof(h));
if (rv < 0)
log_error("connection %d write error %d", ci, rv);
if (h.len) {
rv = do_write(client[ci].fd, data, h.len);
if (rv < 0)
log_error("connection %d write error %d", ci, rv);
}
out:
free(data);
}
static void process_listener(int ci)
{
int fd, i;
fd = accept(client[ci].fd, NULL, NULL);
if (fd < 0) {
log_error("process_listener: accept error for fd %d: %s (%d)",
fd, strerror(errno), errno);
return;
}
i = client_add(fd, process_connection, NULL);
log_debug("add client connection %d fd %d", i, fd);
}
static int setup_config(int type)
{
int rv;
rv = read_config(cl.configfile);
if (rv < 0)
goto out;
rv = check_config(type);
if (rv < 0)
goto out;
out:
return rv;
}
static int setup_transport(void)
{
int rv;
transport_layer_t proto = booth_conf->proto;
rv = booth_transport[proto].init(ticket_recv);
if (rv < 0) {
log_error("failed to init booth_transport[%d]", proto);
goto out;
}
rv = booth_transport[TCP].init(NULL);
if (rv < 0) {
log_error("failed to init booth_transport[TCP]");
goto out;
}
out:
return rv;
}
static int setup_timer(void)
{
return timerlist_init();
}
static int setup(int type)
{
int rv;
rv = setup_config(type);
if (rv < 0)
goto fail;
rv = setup_timer();
if (rv < 0)
goto fail;
rv = setup_transport();
if (rv < 0)
goto fail;
rv = setup_ticket();
if (rv < 0)
goto fail;
rv = setup_listener(BOOTHC_SOCK_PATH);
if (rv < 0)
goto fail;
client_add(rv, process_listener, NULL);
return 0;
fail:
return -1;
}
static int loop(void)
{
void (*workfn) (int ci);
void (*deadfn) (int ci);
int rv, i;
while (1) {
rv = poll(pollfd, client_maxi + 1, poll_timeout);
if (rv == -1 && errno == EINTR)
continue;
if (rv < 0) {
log_error("poll failed: %s (%d)", strerror(errno), errno);
goto fail;
}
for (i = 0; i <= client_maxi; i++) {
if (client[i].fd < 0)
continue;
if (pollfd[i].revents & POLLIN) {
workfn = client[i].workfn;
if (workfn)
workfn(i);
}
if (pollfd[i].revents &
(POLLERR | POLLHUP | POLLNVAL)) {
deadfn = client[i].deadfn;
if (deadfn)
deadfn(i);
}
}
process_timerlist();
}
return 0;
fail:
return -1;
}
static int do_list(void)
{
struct boothc_header h, *rh;
char *reply = NULL, *data;
int data_len;
int fd, rv;
init_header(&h, BOOTHC_CMD_LIST, 0, 0);
fd = do_connect(BOOTHC_SOCK_PATH);
if (fd < 0) {
rv = fd;
goto out;
}
rv = do_write(fd, &h, sizeof(h));
if (rv < 0)
goto out_close;
reply = malloc(sizeof(struct boothc_header));
if (!reply) {
rv = -ENOMEM;
goto out_close;
}
rv = do_read(fd, reply, sizeof(struct boothc_header));
if (rv < 0)
goto out_free;
rh = (struct boothc_header *)reply;
data_len = rh->len;
reply = realloc(reply, sizeof(struct boothc_header) + data_len);
if (!reply) {
rv = -ENOMEM;
goto out_free;
}
data = reply + sizeof(struct boothc_header);
rv = do_read(fd, data, data_len);
if (rv < 0)
goto out_free;
do_write(STDOUT_FILENO, data, data_len);
rv = 0;
out_free:
free(reply);
out_close:
close(fd);
out:
return rv;
}
static int do_command(cmd_request_t cmd)
{
char *buf;
struct boothc_header *h, reply;
int buflen;
int fd, rv;
buflen = sizeof(struct boothc_header) +
sizeof(cl.site) + sizeof(cl.ticket);
buf = malloc(buflen);
if (!buf) {
rv = -ENOMEM;
goto out;
}
h = (struct boothc_header *)buf;
init_header(h, cmd, 0, sizeof(cl.site) + sizeof(cl.ticket));
strcpy(buf + sizeof(struct boothc_header), cl.site);
strcpy(buf + sizeof(struct boothc_header) + sizeof(cl.site), cl.ticket);
fd = do_connect(BOOTHC_SOCK_PATH);
if (fd < 0) {
rv = fd;
goto out_free;
}
rv = do_write(fd, buf, buflen);
if (rv < 0)
goto out_close;
rv = do_read(fd, &reply, sizeof(struct boothc_header));
if (rv < 0)
goto out_close;
if (reply.result == BOOTHC_RLT_INVALID_ARG) {
log_info("invalid argument!");
rv = -1;
goto out_close;
}
if (reply.result == BOOTHC_RLT_OVERGRANT) {
- log_info("You're granting a granted ticket"
+ log_info("You're granting a granted ticket "
"If you wanted to migrate a ticket,"
"use revoke first, then use grant");
rv = -1;
goto out_close;
}
if (reply.result == BOOTHC_RLT_REMOTE_OP) {
struct booth_node to;
int s;
memset(&to, 0, sizeof(struct booth_node));
to.family = BOOTH_PROTO_FAMILY;
strcpy(to.addr, cl.site);
s = booth_transport[TCP].open(&to);
if (s < 0)
goto out_close;
rv = booth_transport[TCP].send(s, buf, buflen);
if (rv < 0) {
booth_transport[TCP].close(s);
goto out_close;
}
rv = booth_transport[TCP].recv(s, &reply,
sizeof(struct boothc_header));
if (rv < 0) {
booth_transport[TCP].close(s);
goto out_close;
}
booth_transport[TCP].close(s);
}
if (reply.result == BOOTHC_RLT_ASYNC) {
if (cmd == BOOTHC_CMD_GRANT)
log_info("grant command sent, result will be returned "
"asynchronously, you can get the result from "
"the log files");
else if (cmd == BOOTHC_CMD_REVOKE)
log_info("revoke command sent, result will be returned "
"asynchronously, you can get the result from "
"the log files.");
else
log_error("internal error reading reply result!");
rv = 0;
} else if (reply.result == BOOTHC_RLT_SYNC_SUCC) {
if (cmd == BOOTHC_CMD_GRANT)
log_info("grant succeeded!");
else if (cmd == BOOTHC_CMD_REVOKE)
log_info("revoke succeeded!");
rv = 0;
} else if (reply.result == BOOTHC_RLT_SYNC_FAIL) {
if (cmd == BOOTHC_CMD_GRANT)
log_info("grant failed!");
else if (cmd == BOOTHC_CMD_REVOKE)
log_info("revoke failed!");
rv = -1;
} else {
log_error("internal error!");
rv = -1;
}
out_close:
close(fd);
out_free:
free(buf);
out:
return rv;
}
static int do_grant(void)
{
return do_command(BOOTHC_CMD_GRANT);
}
static int do_revoke(void)
{
return do_command(BOOTHC_CMD_REVOKE);
}
static int lockfile(void)
{
char buf[16];
struct flock lock;
int fd, rv;
fd = open(cl.lockfile, O_CREAT|O_WRONLY, 0666);
if (fd < 0) {
log_error("lockfile open error %s: %s",
cl.lockfile, strerror(errno));
return -1;
}
lock.l_type = F_WRLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
rv = fcntl(fd, F_SETLK, &lock);
if (rv < 0) {
log_error("lockfile setlk error %s: %s",
cl.lockfile, strerror(errno));
goto fail;
}
rv = ftruncate(fd, 0);
if (rv < 0) {
log_error("lockfile truncate error %s: %s",
cl.lockfile, strerror(errno));
goto fail;
}
memset(buf, 0, sizeof(buf));
snprintf(buf, sizeof(buf), "%d\n", getpid());
rv = write(fd, buf, strlen(buf));
if (rv <= 0) {
log_error("lockfile write error %s: %s",
cl.lockfile, strerror(errno));
goto fail;
}
return fd;
fail:
close(fd);
return -1;
}
static void unlink_lockfile(int fd)
{
unlink(cl.lockfile);
close(fd);
}
static void print_usage(void)
{
printf("Usage:\n");
printf("booth <type> <operation> [options]\n");
printf("\n");
printf("Types:\n");
printf(" arbitrator: daemon running on arbitrator\n");
printf(" site: daemon running on cluster site\n");
printf(" client: command running from client\n");
printf("\n");
printf("Operations:\n");
printf("Please note that operations are valid iff type is client!\n");
printf(" list: List all the tickets\n");
printf(" grant: Grant ticket T(-t T) to site S(-s S)\n");
printf(" revoke: Revoke ticket T(-t T) from site S(-s S)\n");
printf("\n");
printf("Options:\n");
printf(" -c FILE Specify config file [default " BOOTH_DEFAULT_CONF "]\n");
printf(" -l LOCKFILE Specify lock file [default " BOOTH_DEFAULT_LOCKFILE "]\n");
printf(" -D Enable debugging to stderr and don't fork\n");
printf(" -t ticket name\n");
printf(" -s site name\n");
printf(" -h Print this help, then exit\n");
}
#define OPTION_STRING "c:Dl:t:s:h"
static char *logging_entity = NULL;
void safe_copy(char *dest, char *value, size_t buflen, const char *description) {
if (strlen(value) >= buflen) {
fprintf(stderr, "'%s' exceeds maximum %s length of %ld\n",
value, description, (long)(buflen - 1));
exit(EXIT_FAILURE);
}
strncpy(dest, value, buflen - 1);
}
static int read_arguments(int argc, char **argv)
{
int optchar;
char *arg1 = argv[1];
char *op = NULL;
if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") ||
!strcmp(arg1, "-h")) {
print_usage();
exit(EXIT_SUCCESS);
}
if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") ||
!strcmp(arg1, "-V")) {
printf("%s %s (built %s %s)\n",
argv[0], RELEASE_VERSION, __DATE__, __TIME__);
exit(EXIT_SUCCESS);
}
if (!strcmp(arg1, "arbitrator")) {
cl.type = ACT_ARBITRATOR;
logging_entity = (char *) DAEMON_NAME "-arbitrator";
optind = 2;
} else if (!strcmp(arg1, "site")) {
cl.type = ACT_SITE;
logging_entity = (char *) DAEMON_NAME "-site";
optind = 2;
} else if (!strcmp(arg1, "client")) {
cl.type = ACT_CLIENT;
if (argc < 3) {
print_usage();
exit(EXIT_FAILURE);
}
op = argv[2];
optind = 3;
} else {
cl.type = ACT_CLIENT;
op = argv[1];
optind = 2;
}
switch (cl.type) {
case ACT_ARBITRATOR:
break;
case ACT_SITE:
break;
case ACT_CLIENT:
if (!strcmp(op, "list"))
cl.op = OP_LIST;
else if (!strcmp(op, "grant"))
cl.op = OP_GRANT;
else if (!strcmp(op, "revoke"))
cl.op = OP_REVOKE;
else {
fprintf(stderr, "client operation \"%s\" is unknown\n",
op);
exit(EXIT_FAILURE);
}
break;
}
while (optind < argc) {
optchar = getopt(argc, argv, OPTION_STRING);
switch (optchar) {
case 'c':
safe_copy(cl.configfile, optarg, sizeof(cl.configfile), "config file");
break;
case 'D':
daemonize = 1;
debug_level = 1;
log_logfile_priority = LOG_DEBUG;
log_syslog_priority = LOG_DEBUG;
break;
case 'l':
safe_copy(cl.lockfile, optarg, sizeof(cl.lockfile), "lock file");
break;
case 't':
if (cl.op == OP_GRANT || cl.op == OP_REVOKE) {
safe_copy(cl.ticket, optarg, sizeof(cl.ticket), "ticket name");
} else {
print_usage();
exit(EXIT_FAILURE);
}
break;
case 's':
if (cl.op == OP_GRANT || cl.op == OP_REVOKE) {
safe_copy(cl.site, optarg, sizeof(cl.ticket), "site name");
} else {
print_usage();
exit(EXIT_FAILURE);
}
break;
case 'h':
print_usage();
exit(EXIT_SUCCESS);
break;
case ':':
case '?':
fprintf(stderr, "Please use '-h' for usage.\n");
exit(EXIT_FAILURE);
break;
default:
fprintf(stderr, "unknown option: %s\n", argv[optind]);
exit(EXIT_FAILURE);
break;
};
}
return 0;
}
static void set_scheduler(void)
{
struct sched_param sched_param;
struct rlimit rlimit;
int rv;
rlimit.rlim_cur = RLIM_INFINITY;
rlimit.rlim_max = RLIM_INFINITY;
setrlimit(RLIMIT_MEMLOCK, &rlimit);
rv = mlockall(MCL_CURRENT | MCL_FUTURE);
if (rv < 0) {
log_error("mlockall failed");
}
rv = sched_get_priority_max(SCHED_RR);
if (rv != -1) {
sched_param.sched_priority = rv;
rv = sched_setscheduler(0, SCHED_RR, &sched_param);
if (rv == -1)
log_error("could not set SCHED_RR priority %d: %s (%d)",
sched_param.sched_priority,
strerror(errno), errno);
} else {
log_error("could not get maximum scheduler priority err %d",
errno);
}
}
static void set_oom_adj(int val)
{
FILE *fp;
fp = fopen("/proc/self/oom_adj", "w");
if (!fp)
return;
fprintf(fp, "%i", val);
fclose(fp);
}
static int do_server(int type)
{
int fd = -1;
int rv = -1;
rv = setup(type);
if (rv < 0)
goto out;
if (!daemonize) {
if (daemon(0, 0) < 0) {
perror("daemon error");
exit(EXIT_FAILURE);
}
}
/*
The lock cannot be obtained before the call to daemon(), otherwise
the lockfile would contain the pid of the parent, not the daemon.
*/
fd = lockfile();
if (fd < 0)
return fd;
if (type == ARBITRATOR)
log_info("BOOTH arbitrator daemon started");
else if (type == SITE)
log_info("BOOTH cluster site daemon started");
set_scheduler();
set_oom_adj(-16);
rv = loop();
out:
if (fd >= 0)
unlink_lockfile(fd);
return rv;
}
static int do_client(void)
{
int rv = -1;
switch (cl.op) {
case OP_LIST:
rv = do_list();
break;
case OP_GRANT:
rv = do_grant();
break;
case OP_REVOKE:
rv = do_revoke();
break;
}
return rv;
}
int main(int argc, char *argv[])
{
int rv;
memset(&cl, 0, sizeof(cl));
strncpy(cl.configfile, BOOTH_DEFAULT_CONF, BOOTH_PATH_LEN - 1);
strncpy(cl.lockfile, BOOTH_DEFAULT_LOCKFILE, BOOTH_PATH_LEN - 1);
rv = read_arguments(argc, argv);
if (rv < 0)
goto out;
if (cl.type == ACT_CLIENT) {
cl_log_enable_stderr(TRUE);
cl_log_set_facility(0);
} else {
cl_log_set_entity(logging_entity);
cl_log_enable_stderr(debug_level ? TRUE : FALSE);
cl_log_set_facility(HA_LOG_FACILITY);
}
cl_inherit_logging_environment(0);
switch (cl.type) {
case ACT_ARBITRATOR:
rv = do_server(ARBITRATOR);
break;
case ACT_SITE:
rv = do_server(SITE);
break;
case ACT_CLIENT:
rv = do_client();
break;
}
out:
return rv ? EXIT_FAILURE : EXIT_SUCCESS;
}
diff --git a/src/transport.c b/src/transport.c
index b60bde8..8a9de53 100644
--- a/src/transport.c
+++ b/src/transport.c
@@ -1,655 +1,655 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <string.h>
#include <stdlib.h>
#include <net/if.h>
#include <asm/types.h>
#include <linux/rtnetlink.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <poll.h>
#include "list.h"
#include "booth.h"
#include "log.h"
#include "config.h"
#include "paxos_lease.h"
#include "transport.h"
#define BOOTH_IPADDR_LEN (sizeof(struct in6_addr))
#define NETLINK_BUFSIZE 16384
#define SOCKET_BUFFER_SIZE 160000
#define FRAME_SIZE_MAX 10000
extern struct client *client;
extern struct pollfd *pollfd;
static struct booth_node local;
struct tcp_conn {
int s;
struct sockaddr to;
struct list_head list;
};
static LIST_HEAD(tcp);
struct udp_context {
int s;
struct iovec iov_recv;
char iov_buffer[FRAME_SIZE_MAX];
} udp;
static int (*deliver_fn) (void *msg, int msglen);
static int ipaddr_to_sockaddr(struct booth_node *ipaddr,
uint16_t port,
struct sockaddr_storage *saddr,
int *addrlen)
{
int rv = -1;
if (ipaddr->family == AF_INET) {
struct in_addr addr;
struct sockaddr_in *sin = (struct sockaddr_in *)saddr;
memset(sin, 0, sizeof(struct sockaddr_in));
sin->sin_family = ipaddr->family;
sin->sin_port = htons(port);
inet_pton(AF_INET, ipaddr->addr, &addr);
memcpy(&sin->sin_addr, &addr, sizeof(struct in_addr));
*addrlen = sizeof(struct sockaddr_in);
rv = 0;
}
if (ipaddr->family == AF_INET6) {
struct in6_addr addr;
struct sockaddr_in6 *sin = (struct sockaddr_in6 *)saddr;
memset(sin, 0, sizeof(struct sockaddr_in6));
sin->sin6_family = ipaddr->family;
sin->sin6_port = htons(port);
sin->sin6_scope_id = 2;
inet_pton(AF_INET6, ipaddr->addr, &addr);
memcpy(&sin->sin6_addr, &addr, sizeof(struct in6_addr));
*addrlen = sizeof(struct sockaddr_in6);
rv = 0;
}
return rv;
}
static void parse_rtattr(struct rtattr *tb[],
int max, struct rtattr *rta, int len)
{
while (RTA_OK(rta, len)) {
if (rta->rta_type <= max)
tb[rta->rta_type] = rta;
rta = RTA_NEXT(rta,len);
}
}
static int find_myself(struct booth_node *node)
{
int fd, addrlen, found = 0;
struct sockaddr_nl nladdr;
unsigned char ndaddr[BOOTH_IPADDR_LEN];
unsigned char ipaddr[BOOTH_IPADDR_LEN];
static char rcvbuf[NETLINK_BUFSIZE];
struct {
struct nlmsghdr nlh;
struct rtgenmsg g;
} req;
memset(ipaddr, 0, BOOTH_IPADDR_LEN);
memset(ndaddr, 0, BOOTH_IPADDR_LEN);
if (node->family == AF_INET) {
inet_pton(AF_INET, node->addr, ndaddr);
addrlen = sizeof(struct in_addr);
} else if (node->family == AF_INET6) {
inet_pton(AF_INET6, node->addr, ndaddr);
addrlen = sizeof(struct in6_addr);
} else {
log_error("invalid INET family");
return 0;
}
fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE);
if (fd < 0) {
log_error("failed to create netlink socket");
return 0;
}
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf));
memset(&nladdr, 0, sizeof(nladdr));
nladdr.nl_family = AF_NETLINK;
memset(&req, 0, sizeof(req));
req.nlh.nlmsg_len = sizeof(req);
req.nlh.nlmsg_type = RTM_GETADDR;
req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST;
req.nlh.nlmsg_pid = 0;
req.nlh.nlmsg_seq = 1;
req.g.rtgen_family = AF_INET;
if (sendto(fd, (void *)&req, sizeof(req), 0,
(struct sockaddr*)&nladdr, sizeof(nladdr)) < 0) {
close(fd);
log_error("failed to send data to netlink socket");
return 0;
}
while (1) {
int status;
struct nlmsghdr *h;
struct iovec iov = { rcvbuf, sizeof(rcvbuf) };
struct msghdr msg = {
(void *)&nladdr, sizeof(nladdr),
&iov, 1,
NULL, 0,
0
};
status = recvmsg(fd, &msg, 0);
if (!status) {
close(fd);
log_error("failed to recvmsg from netlink socket");
return 0;
}
h = (struct nlmsghdr *)rcvbuf;
if (h->nlmsg_type == NLMSG_DONE)
break;
if (h->nlmsg_type == NLMSG_ERROR) {
close(fd);
log_error("netlink socket recvmsg error");
return 0;
}
while (NLMSG_OK(h, status)) {
if (h->nlmsg_type == RTM_NEWADDR) {
struct ifaddrmsg *ifa = NLMSG_DATA(h);
struct rtattr *tb[IFA_MAX+1];
int len = h->nlmsg_len
- NLMSG_LENGTH(sizeof(*ifa));
memset(tb, 0, sizeof(tb));
parse_rtattr(tb, IFA_MAX, IFA_RTA(ifa), len);
memcpy(ipaddr, RTA_DATA(tb[IFA_ADDRESS]),
BOOTH_IPADDR_LEN);
if (!memcmp(ipaddr, ndaddr, addrlen)) {
found = 1;
goto out;
}
}
h = NLMSG_NEXT(h, status);
}
}
out:
close(fd);
return found;
}
static int load_myid(void)
{
int i;
for (i = 0; i < booth_conf->node_count; i++) {
if (find_myself(&booth_conf->node[i])) {
booth_conf->node[i].local = 1;
if (!local.family)
memcpy(&local, &booth_conf->node[i],
sizeof(struct booth_node));
return booth_conf->node[i].nodeid;
}
}
return -1;
}
static int booth_get_myid(void)
{
if (local.local)
return local.nodeid;
else
return -1;
}
static void process_dead(int ci)
{
struct tcp_conn *conn, *safe;
list_for_each_entry_safe(conn, safe, &tcp, list) {
if (conn->s == client[ci].fd) {
list_del(&conn->list);
free(conn);
break;
}
}
close(client[ci].fd);
client[ci].workfn = NULL;
client[ci].fd = -1;
pollfd[ci].fd = -1;
}
static void process_tcp_listener(int ci)
{
int fd, i, one = 1;
- socklen_t addrlen;
+ socklen_t addrlen = sizeof(struct sockaddr);
struct sockaddr addr;
struct tcp_conn *conn;
fd = accept(client[ci].fd, &addr, &addrlen);
if (fd < 0) {
log_error("process_tcp_listener: accept error %d %d",
fd, errno);
return;
}
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one));
conn = malloc(sizeof(struct tcp_conn));
if (!conn) {
log_error("failed to alloc mem");
return;
}
memset(conn, 0, sizeof(struct tcp_conn));
conn->s = fd;
memcpy(&conn->to, &addr, sizeof(struct sockaddr));
list_add_tail(&conn->list, &tcp);
i = client_add(fd, process_connection, process_dead);
log_debug("client connection %d fd %d", i, fd);
}
static int setup_tcp_listener(void)
{
struct sockaddr_storage sockaddr;
int s, addrlen, rv;
s = socket(local.family, SOCK_STREAM, 0);
if (s == -1) {
log_error("failed to create tcp socket %s", strerror(errno));
return s;
}
ipaddr_to_sockaddr(&local, BOOTH_CMD_PORT, &sockaddr, &addrlen);
rv = bind(s, (struct sockaddr *)&sockaddr, addrlen);
if (rv == -1) {
log_error("failed to bind socket %s", strerror(errno));
return rv;
}
rv = listen(s, 5);
if (rv == -1) {
log_error("failed to listen on socket %s", strerror(errno));
return rv;
}
return s;
}
static int booth_tcp_init(void * unused __attribute__((unused)))
{
int rv;
if (!local.local)
return -1;
rv = setup_tcp_listener();
if (rv < 0)
return rv;
client_add(rv, process_tcp_listener, NULL);
return 0;
}
static int connect_nonb(int sockfd, const struct sockaddr *saptr,
socklen_t salen, int sec)
{
int flags, n, error;
socklen_t len;
fd_set rset, wset;
struct timeval tval;
flags = fcntl(sockfd, F_GETFL, 0);
fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
error = 0;
if ( (n = connect(sockfd, saptr, salen)) < 0)
if (errno != EINPROGRESS)
return -1;
if (n == 0)
goto done; /* connect completed immediately */
FD_ZERO(&rset);
FD_SET(sockfd, &rset);
wset = rset;
tval.tv_sec = sec;
tval.tv_usec = 0;
if ((n = select(sockfd + 1, &rset, &wset, NULL,
sec ? &tval : NULL)) == 0) {
/* leave outside function to close */
/* timeout */
/* close(sockfd); */
errno = ETIMEDOUT;
return -1;
}
if (FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) {
len = sizeof(error);
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0)
return -1; /* Solaris pending error */
} else {
log_error("select error: sockfd not set");
return -1;
}
done:
fcntl(sockfd, F_SETFL, flags); /* restore file status flags */
if (error) {
/* leave outside function to close */
/* close(sockfd); */
errno = error;
return -1;
}
return 0;
}
static int booth_tcp_open(struct booth_node *to)
{
struct sockaddr_storage sockaddr;
struct tcp_conn *conn;
int addrlen, rv, s, found = 0;
ipaddr_to_sockaddr(to, BOOTH_CMD_PORT, &sockaddr, &addrlen);
list_for_each_entry(conn, &tcp, list) {
if (!memcmp(&conn->to, &sockaddr, sizeof(sockaddr))) {
found = 1;
break;
}
}
if (!found) {
s = socket(BOOTH_PROTO_FAMILY, SOCK_STREAM, 0);
if (s == -1)
return -1;
rv = connect_nonb(s, (struct sockaddr *)&sockaddr, addrlen, 10);
if (rv == -1) {
if( errno == ETIMEDOUT)
log_error("connection to %s timeout", to->addr);
else
log_error("connection to %s error %s", to->addr,
strerror(errno));
close(s);
return rv;
}
conn = malloc(sizeof(struct tcp_conn));
if (!conn) {
log_error("failed to alloc mem");
close(s);
return -ENOMEM;
}
memset(conn, 0, sizeof(struct tcp_conn));
conn->s = s;
memcpy(&conn->to, &sockaddr, sizeof(struct sockaddr));
list_add_tail(&conn->list, &tcp);
}
return conn->s;
}
static int booth_tcp_send(unsigned long to, void *buf, int len)
{
return do_write(to, buf, len);
}
static int booth_tcp_recv(unsigned long from, void *buf, int len)
{
return do_read(from, buf, len);
}
static int booth_tcp_close(unsigned long s)
{
struct tcp_conn *conn;
list_for_each_entry(conn, &tcp, list) {
if (conn->s == s) {
list_del(&conn->list);
close(s);
free(conn);
goto out;
}
}
out:
return 0;
}
static int booth_tcp_exit(void)
{
return 0;
}
static int setup_udp_server(void)
{
struct sockaddr_storage sockaddr;
int addrlen, rv;
unsigned int recvbuf_size;
udp.s = socket(local.family, SOCK_DGRAM, 0);
if (udp.s == -1) {
log_error("failed to create udp socket %s", strerror(errno));
return -1;
}
rv = fcntl(udp.s, F_SETFL, O_NONBLOCK);
if (rv == -1) {
log_error("failed to set non-blocking operation "
"on udp socket: %s", strerror(errno));
close(udp.s);
return -1;
}
ipaddr_to_sockaddr(&local, booth_conf->port, &sockaddr, &addrlen);
rv = bind(udp.s, (struct sockaddr *)&sockaddr, addrlen);
if (rv == -1) {
log_error("failed to bind socket %s", strerror(errno));
close(udp.s);
return -1;
}
recvbuf_size = SOCKET_BUFFER_SIZE;
rv = setsockopt(udp.s, SOL_SOCKET, SO_RCVBUF,
&recvbuf_size, sizeof(recvbuf_size));
if (rv == -1) {
log_error("failed to set recvbuf size");
close(udp.s);
return -1;
}
return udp.s;
}
static void process_recv(int ci)
{
struct msghdr msg_recv;
struct sockaddr_storage system_from;
int received;
unsigned char *msg_offset;
msg_recv.msg_name = &system_from;
msg_recv.msg_namelen = sizeof (struct sockaddr_storage);
msg_recv.msg_iov = &udp.iov_recv;
msg_recv.msg_iovlen = 1;
msg_recv.msg_control = 0;
msg_recv.msg_controllen = 0;
msg_recv.msg_flags = 0;
received = recvmsg(client[ci].fd, &msg_recv,
MSG_NOSIGNAL | MSG_DONTWAIT);
if (received == -1)
return;
msg_offset = udp.iov_recv.iov_base;
deliver_fn(msg_offset, received);
}
static int booth_udp_init(void *f)
{
int myid = -1;
memset(&local, 0, sizeof(struct booth_node));
myid = load_myid();
if (myid < 0) {
log_error("can't find myself in config file");
return -1;
}
memset(&udp, 0, sizeof(struct udp_context));
udp.iov_recv.iov_base = udp.iov_buffer;
udp.iov_recv.iov_len = FRAME_SIZE_MAX;
udp.s = setup_udp_server();
if (udp.s == -1)
return -1;
deliver_fn = f;
client_add(udp.s, process_recv, NULL);
return 0;
}
static int booth_udp_send(unsigned long to, void *buf, int len)
{
struct msghdr msg;
struct sockaddr_storage sockaddr;
struct iovec iovec;
unsigned int iov_len;
int addrlen = 0, rv;
iovec.iov_base = (void *)buf;
iovec.iov_len = len;
iov_len = 1;
ipaddr_to_sockaddr((struct booth_node *)to, booth_conf->port,
&sockaddr, &addrlen);
msg.msg_name = &sockaddr;
msg.msg_namelen = addrlen;
msg.msg_iov = (void *)&iovec;
msg.msg_iovlen = iov_len;
msg.msg_control = 0;
msg.msg_controllen = 0;
msg.msg_flags = 0;
rv = sendmsg(udp.s, &msg, MSG_NOSIGNAL);
if (rv < 0)
return rv;
return 0;
}
static int booth_udp_broadcast(void *buf, int len)
{
int i;
if (!booth_conf || !booth_conf->node_count)
return -1;
for (i = 0; i < booth_conf->node_count; i++)
booth_udp_send((unsigned long)&booth_conf->node[i], buf, len);
return 0;
}
static int booth_udp_exit(void)
{
return 0;
}
/* SCTP transport layer has not been developed yet */
static int booth_sctp_init(void *f __attribute__((unused)))
{
return 0;
}
static int booth_sctp_send(unsigned long to __attribute__((unused)),
void *buf __attribute__((unused)),
int len __attribute__((unused)))
{
return 0;
}
static int booth_sctp_broadcast(void *buf __attribute__((unused)),
int len __attribute__((unused)))
{
return 0;
}
static int booth_sctp_exit(void)
{
return 0;
}
struct booth_transport booth_transport[] = {
{
.name = "TCP",
.init = booth_tcp_init,
.get_myid = booth_get_myid,
.open = booth_tcp_open,
.send = booth_tcp_send,
.recv = booth_tcp_recv,
.close = booth_tcp_close,
.exit = booth_tcp_exit
},
{
.name = "UDP",
.init = booth_udp_init,
.get_myid = booth_get_myid,
.send = booth_udp_send,
.broadcast = booth_udp_broadcast,
.exit = booth_udp_exit
},
{
.name = "SCTP",
.init = booth_sctp_init,
.get_myid = booth_get_myid,
.send = booth_sctp_send,
.broadcast = booth_sctp_broadcast,
.exit = booth_sctp_exit
}
};
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Wed, Feb 26, 6:57 PM (1 h, 59 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1465660
Default Alt Text
(37 KB)
Attached To
Mode
rB Booth
Attached
Detach File
Event Timeline
Log In to Comment