Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F3152159
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
40 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/src/config.c b/src/config.c
index f262a5e..1080252 100644
--- a/src/config.c
+++ b/src/config.c
@@ -1,346 +1,346 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include "booth.h"
#include "config.h"
#include "ticket.h"
#include "log.h"
static int ticket_size = 0;
static int ticket_realloc(void)
{
void *p;
booth_conf = realloc(booth_conf, sizeof(struct booth_config)
+ (ticket_size + TICKET_ALLOC)
* sizeof(struct ticket_config));
if (!booth_conf) {
log_error("can't alloc more booth config");
return -ENOMEM;
}
p = (char *) booth_conf + sizeof(struct booth_config)
+ ticket_size * sizeof(struct ticket_config);
memset(p, 0, TICKET_ALLOC * sizeof(struct ticket_config));
ticket_size += TICKET_ALLOC;
return 0;
}
int add_node(char *address, int type);
int add_node(char *addr_string, int type)
{
int rv;
struct booth_node *node;
rv = 1;
if (booth_conf->node_count == MAX_NODES) {
log_error("too many nodes");
goto out;
}
- if (strlen(addr_string)+1 >= sizeof(booth_conf->node[0].addr)) {
+ if (strlen(addr_string)+1 >= sizeof(booth_conf->node[0].addr_string)) {
log_error("node address \"%s\" too long", addr_string);
goto out;
}
node = booth_conf->node+booth_conf->node_count;
node->family = BOOTH_PROTO_FAMILY;
node->type = type;
node->nodeid = booth_conf->node_count;
- strcpy(node->addr, addr_string);
+ strcpy(node->addr_string, addr_string);
node->tcp_fd = -1;
booth_conf->node_count++;
memset(&node->in6, 0, sizeof(node->in6));
if (node->family == AF_INET) {
- if (inet_pton(AF_INET, node->addr, &node->in4) < 0) {
+ if (inet_pton(AF_INET, node->addr_string, &node->in4) < 0) {
addr_bad:
- log_error("Address string \"%s\" is bad", node->addr);
+ log_error("Address string \"%s\" is bad", node->addr_string);
goto out;
}
node->addrlen = sizeof(struct in_addr);
} else if (node->family == AF_INET6) {
- if (inet_pton(AF_INET6, node->addr, &node->in6) < 0)
+ if (inet_pton(AF_INET6, node->addr_string, &node->in6) < 0)
goto addr_bad;
node->addrlen = sizeof(struct in6_addr);
} else {
log_error("invalid INET family");
goto out;
}
rv = 0;
out:
return rv;
}
int read_config(const char *path)
{
char line[1024];
FILE *fp;
char *s, *key, *val, *expiry, *weight, *c;
int in_quotes, got_equals, got_quotes, i;
int lineno = 0;
int got_transport = 0;
fp = fopen(path, "r");
if (!fp) {
log_error("failed to open %s: %s", path, strerror(errno));
return -1;
}
booth_conf = malloc(sizeof(struct booth_config)
+ TICKET_ALLOC * sizeof(struct ticket_config));
if (!booth_conf) {
log_error("failed to alloc memory for booth config");
return -ENOMEM;
}
memset(booth_conf, 0, sizeof(struct booth_config)
+ TICKET_ALLOC * sizeof(struct ticket_config));
ticket_size = TICKET_ALLOC;
while (fgets(line, sizeof(line), fp)) {
lineno++;
s = line;
while (*s == ' ')
s++;
if (*s == '#' || *s == '\n')
continue;
if (*s == '-' || *s == '.' || *s =='/'
|| *s == '+' || *s == '(' || *s == ')'
|| *s == ':' || *s == ',' || *s == '@'
|| *s == '=' || *s == '"') {
log_error("invalid key name in config file "
"('%c', line %d char %ld)", *s, lineno, (long)(s - line));
goto out;
}
key = s; /* will point to the key on the left hand side */
val = NULL; /* will point to the value on the right hand side */
in_quotes = 0; /* true iff we're inside a double-quoted string */
got_equals = 0; /* true iff we're on the RHS of the = assignment */
got_quotes = 0; /* true iff the RHS is quoted */
while (*s != '\n' && *s != '\0') {
if (!(*s >='a' && *s <= 'z')
&& !(*s >= 'A' && *s <= 'Z')
&& !(*s >= '0' && *s <= '9')
&& !(*s == '_')
&& !(*s == '-')
&& !(*s == '.')
&& !(*s == '/')
&& !(*s == ' ')
&& !(*s == '+')
&& !(*s == '(')
&& !(*s == ')')
&& !(*s == ':')
&& !(*s == ';')
&& !(*s == ',')
&& !(*s == '@')
&& !(*s == '=')
&& !(*s == '"')) {
log_error("invalid character ('%c', line %d char %ld)"
" in config file", *s, lineno, (long)(s - line));
goto out;
}
if (*s == '=' && !got_equals) {
got_equals = 1;
*s = '\0';
val = s + 1;
} else if ((*s == '=' || *s == '_' || *s == '-' || *s == '.')
&& got_equals && !in_quotes) {
log_error("invalid config file format: unquoted '%c' "
"(line %d char %ld)", *s, lineno, (long)(s - line));
goto out;
} else if ((*s == '/' || *s == '+'
|| *s == '(' || *s == ')' || *s == ':'
|| *s == ',' || *s == '@') && !in_quotes) {
log_error("invalid config file format: unquoted '%c' "
"(line %d char %ld)", *s, lineno, (long)(s - line));
goto out;
} else if ((*s == ' ')
&& !in_quotes && !got_quotes) {
log_error("invalid config file format: unquoted whitespace "
"(line %d char %ld)", lineno, (long)(s - line));
goto out;
} else if (*s == '"' && !got_equals) {
log_error("invalid config file format: unexpected quotes "
"(line %d char %ld)", lineno, (long)(s - line));
goto out;
} else if (*s == '"' && !in_quotes) {
in_quotes = 1;
if (val) {
val++;
got_quotes = 1;
}
} else if (*s == '"' && in_quotes) {
in_quotes = 0;
*s = '\0';
}
s++;
}
if (!got_equals) {
log_error("invalid config file format: missing '=' (lineno %d)",
lineno);
goto out;
}
if (in_quotes) {
log_error("invalid config file format: unterminated quotes (lineno %d)",
lineno);
goto out;
}
if (!got_quotes)
*s = '\0';
if (strlen(key) > BOOTH_NAME_LEN
|| strlen(val) > BOOTH_NAME_LEN) {
log_error("key/value too long");
goto out;
}
if (!strcmp(key, "transport")) {
if (!strcmp(val, "UDP"))
booth_conf->proto = UDP;
else if (!strcmp(val, "SCTP"))
booth_conf->proto = SCTP;
else {
log_error("invalid transport protocol");
goto out;
}
got_transport = 1;
}
if (!strcmp(key, "port"))
booth_conf->port = atoi(val);
if (!strcmp(key, "site")) {
if (add_node(val, SITE))
goto out;
}
if (!strcmp(key, "arbitrator")) {
if (add_node(val, ARBITRATOR))
goto out;
}
if (!strcmp(key, "ticket")) {
int count = booth_conf->ticket_count;
if (booth_conf->ticket_count == ticket_size) {
if (ticket_realloc() < 0)
goto out;
}
expiry = index(val, ';');
weight = rindex(val, ';');
if (!expiry) {
strcpy(booth_conf->ticket[count].name, val);
booth_conf->ticket[count].expiry = DEFAULT_TICKET_EXPIRY;
log_info("expire is not set in %s."
" Set the default value %ds.",
booth_conf->ticket[count].name,
DEFAULT_TICKET_EXPIRY);
}
else if (expiry && expiry == weight) {
*expiry++ = '\0';
while (*expiry == ' ')
expiry++;
strcpy(booth_conf->ticket[count].name, val);
booth_conf->ticket[count].expiry = atoi(expiry);
} else {
*expiry++ = '\0';
*weight++ = '\0';
while (*expiry == ' ')
expiry++;
while (*weight == ' ')
weight++;
strcpy(booth_conf->ticket[count].name, val);
booth_conf->ticket[count].expiry = atoi(expiry);
i = 0;
while ((c = index(weight, ','))) {
*c++ = '\0';
booth_conf->ticket[count].weight[i++]
= atoi(weight);
while (*c == ' ')
c++;
weight = c;
if (i == MAX_NODES) {
log_error("too many weights");
break;
}
}
}
booth_conf->ticket_count++;
}
}
if (!got_transport) {
log_error("config file was missing transport line");
goto out;
}
return 0;
out:
free(booth_conf);
return -1;
}
int check_config(int type)
{
// int i;
if (!booth_conf)
return -1;
/* for (i = 0; i < booth_conf->node_count; i++) {
if (booth_conf->node[i].local && booth_conf->node[i].type ==
type)
return 0;
}
return -1;*/
return 0;
}
int find_site_in_config(unsigned char *site, struct booth_node **node)
{
struct booth_node *n;
int i;
if (!booth_conf)
return 0;
for (i = 0; i < booth_conf->node_count; i++) {
n = booth_conf->node + i;
if (n->type == SITE &&
- strcmp(n->addr, site) == 0) {
+ strcmp(n->addr_string, site) == 0) {
*node = n;
return 1;
}
}
return 0;
}
diff --git a/src/ticket.c b/src/ticket.c
index a667a49..fa4d4d6 100644
--- a/src/ticket.c
+++ b/src/ticket.c
@@ -1,626 +1,626 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <assert.h>
#include <time.h>
#include "ticket.h"
#include "config.h"
#include "pacemaker.h"
#include "list.h"
#include "log.h"
#include "booth.h"
#include "timer.h"
#include "paxos_lease.h"
#include "paxos.h"
#define PAXOS_MAGIC 0xDB12
#define TK_LINE 256
#define CATCHED_VALID_TMSG 1
struct booth_msghdr {
uint16_t magic;
uint16_t checksum;
uint32_t len;
char data[0];
} __attribute__((packed));
struct ticket_msg {
char id[BOOTH_NAME_LEN+1];
uint32_t owner;
uint32_t expiry;
uint32_t ballot;
uint32_t result;
} __attribute__((packed));
struct ticket {
char id[BOOTH_NAME_LEN+1];
pl_handle_t handle;
int owner;
int expiry;
int ballot;
unsigned long long expires;
struct list_head list;
};
static LIST_HEAD(ticket_list);
static unsigned char *role;
/* Untrusted input, must fit (incl. \0) in a buffer of max chars. */
int check_max_len_valid(char *s, int max)
{
int i;
for(i=0; i<BOOTH_NAME_LEN; i++)
if (s[i] == 0)
return 1;
return 0;
}
int check_ticket(char *ticket)
{
int i;
if (!booth_conf)
return 0;
if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name)))
return 0;
for (i = 0; i < booth_conf->ticket_count; i++) {
if (!strcmp(booth_conf->ticket[i].name, ticket))
return 1;
}
return 0;
}
int check_site(char *site, int *local)
{
struct booth_node *node;
- if (!check_max_len_valid(site, sizeof(node->addr)))
+ if (!check_max_len_valid(site, sizeof(node->addr_string)))
return 0;
if (find_site_in_config(site, &node)) {
*local = node->local;
return 1;
}
return 0;
}
static int * ticket_priority(int i)
{
int j;
/* TODO: need more precise check */
for (j = 0; j < booth_conf->node_count; j++) {
if (booth_conf->ticket[i].weight[j] == 0)
return NULL;
}
return booth_conf->ticket[i].weight;
}
static int ticket_get_myid(void)
{
return transport()->get_myid();
}
static void end_acquire(pl_handle_t handle, int error)
{
struct ticket *tk;
int found = 0;
log_debug("enter end_acquire");
list_for_each_entry(tk, &ticket_list, list) {
if (tk->handle == handle) {
found = 1;
break;
}
}
if (!found) {
log_error("BUG: ticket handle %ld does not exist", handle);
return;
}
if (error)
log_info("ticket %s was granted failed (site %d), error:%s",
tk->id, ticket_get_myid(), strerror(error));
else
log_info("ticket %s was granted successfully (site %d)",
tk->id, ticket_get_myid());
log_debug("exit end_acquire");
}
static void end_release(pl_handle_t handle, int error)
{
struct ticket *tk;
int found = 0;
log_debug("enter end_release");
list_for_each_entry(tk, &ticket_list, list) {
if (tk->handle == handle) {
found = 1;
break;
}
}
if (!found) {
log_error("BUG: ticket handle %ld does not exist", handle);
return;
}
if (error)
log_info("ticket %s was reovked failed (site %d), error:%s",
tk->id, ticket_get_myid(), strerror(error));
else
log_info("ticket %s was reovked successfully (site %d)",
tk->id, ticket_get_myid());
log_debug("exit end_release");
}
static int ticket_send(unsigned long id, void *value, int len)
{
int i, rv = -1;
struct booth_node *to = NULL;
struct booth_msghdr *hdr;
void *buf;
for (i = 0; i < booth_conf->node_count; i++) {
if (booth_conf->node[i].nodeid == id)
to = booth_conf->node+i;
}
if (!to)
return rv;
buf = malloc(sizeof(struct booth_msghdr) + len);
if (!buf)
return -ENOMEM;
memset(buf, 0, sizeof(struct booth_msghdr) + len);
hdr = buf;
hdr->magic = htons(PAXOS_MAGIC);
hdr->len = htonl(sizeof(struct booth_msghdr) + len);
memcpy((char *)buf + sizeof(struct booth_msghdr), value, len);
rv = transport()->send(to, buf, sizeof(struct booth_msghdr) + len);
free(buf);
return rv;
}
static int ticket_broadcast(void *value, int vlen)
{
struct booth_msghdr *hdr;
int tlen = sizeof(*hdr) + vlen;
char buf[tlen];
hdr = (void*)buf;
hdr->magic = htons(PAXOS_MAGIC);
hdr->len = htonl(tlen);
memcpy(hdr->data, value, vlen);
return transport()->broadcast(hdr, tlen);
}
#if 0
static int ticket_read(const void *name, int *owner, int *ballot,
unsigned long long *expires)
{
struct ticket *tk;
int found = 0;
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, name)) {
found = 1;
break;
}
}
if (!found) {
log_error("BUG: ticket_read failed (ticket %s does not exist)",
(char *)name);
return -1;
}
pcmk_handler.load_ticket(tk->id, &tk->owner, &tk->ballot, &tk->expires);
*owner = tk->owner;
*expires = tk->expires;
*ballot = tk->ballot;
return 0;
}
#endif
static int ticket_parse(struct ticket_msg *tmsg)
{
struct ticket *tk;
int found = 0;
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, tmsg->id)) {
found = 1;
if (tk->ballot < tmsg->ballot)
tk->ballot = tmsg->ballot;
if (CATCHED_VALID_TMSG == tmsg->result) {
tk->owner = tmsg->owner;
tk->expires = current_time() + tmsg->expiry;
}
break;
}
}
if (!found)
return -1;
else
return 0;
}
static int ticket_catchup(const void *name, int *owner, int *ballot,
unsigned long long *expires)
{
struct ticket *tk;
int i, buflen, rv = 0;
char *buf = NULL;
struct boothc_header *h;
struct booth_node *node;
struct ticket_msg *tmsg;
int myid = ticket_get_myid();
if (booth_conf->node[myid].type != ARBITRATOR) {
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, name)) {
pcmk_handler.load_ticket(tk->id,
&tk->owner,
&tk->ballot,
&tk->expires);
if (current_time() >= tk->expires) {
tk->owner = -1;
tk->expires = 0;
}
}
}
}
buflen = sizeof(struct boothc_header) + sizeof(struct ticket_msg);
buf = malloc(buflen);
if (!buf)
return -ENOMEM;
memset(buf, 0, buflen);
h = (struct boothc_header *)buf;
h->magic = BOOTHC_MAGIC;
h->version = BOOTHC_VERSION;
h->cmd = BOOTHC_CMD_CATCHUP;
h->len = sizeof(struct ticket_msg);
tmsg = (struct ticket_msg *)(buf + sizeof(struct boothc_header));
for (i = 0; i < booth_conf->node_count; i++) {
node = booth_conf->node + i;
if (node->type == SITE &&
!(node->local)) {
strncpy(tmsg->id, name, BOOTH_NAME_LEN + 1);
- log_debug("attempting catchup from %s", node->addr);
+ log_debug("attempting catchup from %s", node->addr_string);
rv = booth_transport[TCP].open(node);
if (rv < 0)
continue;
- log_debug("connected to %s", node->addr);
+ log_debug("connected to %s", node->addr_string);
rv = booth_transport[TCP].send(node, buf, buflen);
if (rv < 0) {
goto close;
}
- log_debug("sent catchup command to %s", node->addr);
+ log_debug("sent catchup command to %s", node->addr_string);
memset(tmsg, 0, sizeof(struct ticket_msg));
rv = booth_transport[TCP].recv(node, buf, buflen);
if (rv < 0) {
booth_transport[TCP].close(node);
continue;
}
ticket_parse(tmsg);
close:
booth_transport[TCP].close(node);
memset(tmsg, 0, sizeof(struct ticket_msg));
}
}
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, name)) {
if (booth_conf->node[myid].type != ARBITRATOR) {
if (current_time() >= tk->expires) {
tk->owner = -1;
tk->expires = 0;
}
pcmk_handler.store_ticket(tk->id,
tk->owner,
tk->ballot,
tk->expires);
if (tk->owner == myid)
pcmk_handler.grant_ticket(tk->id);
else
pcmk_handler.revoke_ticket(tk->id);
}
*owner = tk->owner;
*expires = tk->expires;
*ballot = tk->ballot;
}
}
free(buf);
return rv;
}
static int ticket_write(pl_handle_t handle, struct paxos_lease_result *result)
{
struct ticket *tk;
int found = 0;
list_for_each_entry(tk, &ticket_list, list) {
if (tk->handle == handle) {
found = 1;
break;
}
}
if (!found) {
log_error("BUG: ticket_write failed "
"(ticket handle %ld does not exist)", handle);
return -1;
}
tk->owner = result->owner;
tk->expires = result->expires;
tk->ballot = result->ballot;
if (tk->owner == ticket_get_myid()) {
pcmk_handler.store_ticket(tk->id, tk->owner, tk->ballot, tk->expires);
pcmk_handler.grant_ticket(tk->id);
} else if (tk->owner == -1) {
pcmk_handler.store_ticket(tk->id, tk->owner, tk->ballot, tk->expires);
pcmk_handler.revoke_ticket(tk->id);
} else
pcmk_handler.store_ticket(tk->id, tk->owner, tk->ballot, tk->expires);
return 0;
}
static void ticket_status_recovery(pl_handle_t handle)
{
paxos_lease_status_recovery(handle);
}
int ticket_recv(void *msg, int msglen)
{
struct booth_msghdr *hdr;
char *data;
hdr = msg;
if (ntohs(hdr->magic) != PAXOS_MAGIC ||
ntohl(hdr->len) != msglen) {
log_error("message received error");
return -1;
}
data = (char *)msg + sizeof(struct booth_msghdr);
return paxos_lease_on_receive(data,
msglen - sizeof(struct booth_msghdr));
}
int grant_ticket(char *ticket)
{
struct ticket *tk;
int found = 0;
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, ticket)) {
found = 1;
break;
}
}
if (!found) {
log_error("ticket %s does not exist", ticket);
return BOOTHC_RLT_SYNC_FAIL;
}
if (tk->owner == ticket_get_myid())
return BOOTHC_RLT_SYNC_SUCC;
else {
int ret = paxos_lease_acquire(tk->handle, CLEAR_RELEASE,
1, end_acquire);
if (ret >= 0)
tk->ballot = ret;
return (ret < 0)? BOOTHC_RLT_SYNC_FAIL: BOOTHC_RLT_ASYNC;
}
}
int revoke_ticket(char *ticket)
{
struct ticket *tk;
int found = 0;
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, ticket)) {
found = 1;
break;
}
}
if (!found) {
log_error("ticket %s does not exist", ticket);
return BOOTHC_RLT_SYNC_FAIL;
}
if (tk->owner == -1)
return BOOTHC_RLT_SYNC_SUCC;
else {
int ret = paxos_lease_release(tk->handle, end_release);
if (ret >= 0)
tk->ballot = ret;
return (ret < 0)? BOOTHC_RLT_SYNC_FAIL: BOOTHC_RLT_ASYNC;
}
}
int get_ticket_info(char *name, int *owner, int *expires)
{
struct ticket *tk;
list_for_each_entry(tk, &ticket_list, list) {
if (!strncmp(tk->id, name, BOOTH_NAME_LEN + 1)) {
if(owner)
*owner = tk->owner;
if(expires)
*expires = tk->expires;
return 0;
}
}
return -1;
}
int list_ticket(char **pdata, unsigned int *len)
{
struct ticket *tk;
char timeout_str[100];
char node_name[BOOTH_NAME_LEN];
char tmp[TK_LINE];
*pdata = NULL;
*len = 0;
list_for_each_entry(tk, &ticket_list, list) {
memset(tmp, 0, TK_LINE);
strncpy(timeout_str, "INF", sizeof(timeout_str));
strncpy(node_name, "None", sizeof(node_name));
if (tk->owner < MAX_NODES && tk->owner > -1)
- strncpy(node_name, booth_conf->node[tk->owner].addr,
+ strncpy(node_name, booth_conf->node[tk->owner].addr_string,
sizeof(node_name));
if (tk->expires != 0)
strftime(timeout_str, sizeof(timeout_str), "%Y/%m/%d %H:%M:%S",
localtime((time_t *)&tk->expires));
snprintf(tmp, TK_LINE, "ticket: %s, owner: %s, expires: %s\n",
tk->id, node_name, timeout_str);
*pdata = realloc(*pdata, *len + TK_LINE);
if (*pdata == NULL)
return -ENOMEM;
memset(*pdata + *len, 0, TK_LINE);
memcpy(*pdata + *len, tmp, TK_LINE);
*len += TK_LINE;
}
return 0;
}
int catchup_ticket(char **pdata, unsigned int len)
{
struct ticket_msg *tmsg;
struct ticket *tk;
assert(len == sizeof(struct ticket_msg));
tmsg = (struct ticket_msg *)(*pdata);
list_for_each_entry(tk, &ticket_list, list) {
if (strcmp(tk->id, tmsg->id))
continue;
tmsg->ballot = tk->ballot;
if (tk->owner == ticket_get_myid()
&& current_time() < tk->expires) {
tmsg->result = CATCHED_VALID_TMSG;
tmsg->expiry = tk->expires - current_time();
tmsg->owner = tk->owner;
}
}
return 0;
}
const struct paxos_lease_operations ticket_operations = {
.get_myid = ticket_get_myid,
.send = ticket_send,
.broadcast = ticket_broadcast,
.catchup = ticket_catchup,
.notify = ticket_write,
};
int setup_ticket(void)
{
struct ticket *tk, *tmp;
int i, rv;
pl_handle_t plh;
int myid;
role = malloc(booth_conf->node_count * sizeof(unsigned char));
if (!role)
return -ENOMEM;
memset(role, 0, booth_conf->node_count * sizeof(unsigned char));
for (i = 0; i < booth_conf->node_count; i++) {
if (booth_conf->node[i].type == SITE)
role[i] = PROPOSER | ACCEPTOR | LEARNER;
else if (booth_conf->node[i].type == ARBITRATOR)
role[i] = ACCEPTOR | LEARNER;
}
for (i = 0; i < booth_conf->ticket_count; i++) {
tk = malloc(sizeof(struct ticket));
if (!tk) {
rv = -ENOMEM;
goto out;
}
memset(tk, 0, sizeof(struct ticket));
strcpy(tk->id, booth_conf->ticket[i].name);
tk->owner = -1;
tk->expiry = booth_conf->ticket[i].expiry;
list_add_tail(&tk->list, &ticket_list);
plh = paxos_lease_init(tk->id,
BOOTH_NAME_LEN,
tk->expiry,
booth_conf->node_count,
1,
role,
ticket_priority(i),
&ticket_operations);
if (plh <= 0) {
log_error("paxos lease initialization failed");
rv = plh;
goto out;
}
tk->handle = plh;
}
myid = ticket_get_myid();
assert(myid < booth_conf->node_count);
if (role[myid] & ACCEPTOR) {
list_for_each_entry(tk, &ticket_list, list) {
ticket_status_recovery(tk->handle);
}
}
return 0;
out:
list_for_each_entry_safe(tk, tmp, &ticket_list, list) {
list_del(&tk->list);
}
free(role);
return rv;
}
diff --git a/src/transport.c b/src/transport.c
index 090d303..d00cf12 100644
--- a/src/transport.c
+++ b/src/transport.c
@@ -1,674 +1,674 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <string.h>
#include <stdlib.h>
#include <net/if.h>
#include <asm/types.h>
#include <linux/rtnetlink.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <poll.h>
#include "list.h"
#include "booth.h"
#include "log.h"
#include "config.h"
#include "paxos_lease.h"
#include "transport.h"
#define BOOTH_IPADDR_LEN (sizeof(struct in6_addr))
#define NETLINK_BUFSIZE 16384
#define SOCKET_BUFFER_SIZE 160000
#define FRAME_SIZE_MAX 10000
extern struct client *client;
extern struct pollfd *pollfd;
static struct booth_node local;
struct tcp_conn {
int s;
struct sockaddr to;
struct list_head list;
};
static LIST_HEAD(tcp);
struct udp_context {
int s;
struct iovec iov_recv;
char iov_buffer[FRAME_SIZE_MAX];
} udp;
static int (*deliver_fn) (void *msg, int msglen);
static int ipaddr_to_sockaddr(struct booth_node *node,
uint16_t port,
struct sockaddr_storage *saddr,
int *addrlen)
{
int rv = -1;
if (node->family == AF_INET) {
struct in_addr addr;
struct sockaddr_in *sin = (struct sockaddr_in *)saddr;
memset(sin, 0, sizeof(struct sockaddr_in));
sin->sin_family = node->family;
sin->sin_port = htons(port);
- inet_pton(AF_INET, node->addr, &addr);
+ inet_pton(AF_INET, node->addr_string, &addr);
memcpy(&sin->sin_addr, &addr, sizeof(struct in_addr));
*addrlen = sizeof(struct sockaddr_in);
rv = 0;
}
if (node->family == AF_INET6) {
struct in6_addr addr;
struct sockaddr_in6 *sin = (struct sockaddr_in6 *)saddr;
memset(sin, 0, sizeof(struct sockaddr_in6));
sin->sin6_family = node->family;
sin->sin6_port = htons(port);
sin->sin6_scope_id = 2;
- inet_pton(AF_INET6, node->addr, &addr);
+ inet_pton(AF_INET6, node->addr_string, &addr);
memcpy(&sin->sin6_addr, &addr, sizeof(struct in6_addr));
*addrlen = sizeof(struct sockaddr_in6);
rv = 0;
}
return rv;
}
static void parse_rtattr(struct rtattr *tb[],
int max, struct rtattr *rta, int len)
{
while (RTA_OK(rta, len)) {
if (rta->rta_type <= max)
tb[rta->rta_type] = rta;
rta = RTA_NEXT(rta,len);
}
}
static int find_address(unsigned char ipaddr[BOOTH_IPADDR_LEN],
int family, int prefixlen,
int fuzzy_allowed,
struct booth_node **me)
{
int i;
struct booth_node *node;
int bytes, bits_left, mask;
unsigned char node_bits, ip_bits;
bytes = prefixlen / 8;
bits_left = prefixlen % 8;
/* One bit left to check means ignore 7 lowest bits. */
mask = ~( (1 << (8 - bits_left)) -1);
for (i = 0; i < booth_conf->node_count; i++) {
node = booth_conf->node + i;
if (family != node->family)
continue;
if (memcmp(ipaddr, &node->in6, node->addrlen) == 0) {
found:
*me = node;
return 1;
}
if (!fuzzy_allowed)
continue;
// assert(bytes <= node->addrlen);
//#include <stdio.h>
// printf("node->addr %s, fam %d, prefix %d; %llx vs %llx, bytes %d\n", node->addr, node->family, prefixlen, *((long long*)&node->in6), *((long long*)ipaddr), bytes);
/* Check prefix, whole bytes */
if (memcmp(ipaddr, &node->in6, bytes) != 0)
continue;
//printf("bits %d\n", bits_left);
if (!bits_left)
goto found;
node_bits = node->in6.s6_addr[bytes];
ip_bits = ipaddr[bytes];
//printf("nodebits %x ip %x mask %x\n", node_bits, ip_bits, mask);
if (((node_bits ^ ip_bits) & mask) == 0)
goto found;
}
return 0;
}
int find_myself(struct booth_node **me, int fuzzy_allowed)
{
int fd;
struct sockaddr_nl nladdr;
unsigned char ipaddr[BOOTH_IPADDR_LEN];
static char rcvbuf[NETLINK_BUFSIZE];
struct {
struct nlmsghdr nlh;
struct rtgenmsg g;
} req;
*me = NULL;
fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE);
if (fd < 0) {
log_error("failed to create netlink socket");
return 0;
}
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf));
memset(&nladdr, 0, sizeof(nladdr));
nladdr.nl_family = AF_NETLINK;
memset(&req, 0, sizeof(req));
req.nlh.nlmsg_len = sizeof(req);
req.nlh.nlmsg_type = RTM_GETADDR;
req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST;
req.nlh.nlmsg_pid = 0;
req.nlh.nlmsg_seq = 1;
req.g.rtgen_family = AF_INET;
if (sendto(fd, (void *)&req, sizeof(req), 0,
(struct sockaddr*)&nladdr, sizeof(nladdr)) < 0) {
close(fd);
log_error("failed to send data to netlink socket");
return 0;
}
while (1) {
int status;
struct nlmsghdr *h;
struct iovec iov = { rcvbuf, sizeof(rcvbuf) };
struct msghdr msg = {
(void *)&nladdr, sizeof(nladdr),
&iov, 1,
NULL, 0,
0
};
status = recvmsg(fd, &msg, 0);
if (!status) {
close(fd);
log_error("failed to recvmsg from netlink socket");
return 0;
}
h = (struct nlmsghdr *)rcvbuf;
if (h->nlmsg_type == NLMSG_DONE)
break;
if (h->nlmsg_type == NLMSG_ERROR) {
close(fd);
log_error("netlink socket recvmsg error");
return 0;
}
while (NLMSG_OK(h, status)) {
if (h->nlmsg_type == RTM_NEWADDR) {
struct ifaddrmsg *ifa = NLMSG_DATA(h);
struct rtattr *tb[IFA_MAX+1];
int len = h->nlmsg_len
- NLMSG_LENGTH(sizeof(*ifa));
memset(tb, 0, sizeof(tb));
parse_rtattr(tb, IFA_MAX, IFA_RTA(ifa), len);
memset(ipaddr, 0, BOOTH_IPADDR_LEN);
memcpy(ipaddr, RTA_DATA(tb[IFA_ADDRESS]),
BOOTH_IPADDR_LEN);
if (find_address(ipaddr,
ifa->ifa_family, ifa->ifa_prefixlen,
fuzzy_allowed, me))
goto out;
}
h = NLMSG_NEXT(h, status);
}
}
out:
close(fd);
return *me != NULL;
}
static int load_myid(void)
{
struct booth_node *me;
if (find_myself(&me, 0)) {
me->local = 1;
if (!local.family)
memcpy(&local, me, sizeof(struct booth_node));
return me->nodeid;
}
return -1;
}
static int booth_get_myid(void)
{
if (local.local)
return local.nodeid;
else
return -1;
}
static void process_dead(int ci)
{
struct tcp_conn *conn, *safe;
list_for_each_entry_safe(conn, safe, &tcp, list) {
if (conn->s == client[ci].fd) {
list_del(&conn->list);
free(conn);
break;
}
}
close(client[ci].fd);
client[ci].workfn = NULL;
client[ci].fd = -1;
pollfd[ci].fd = -1;
}
static void process_tcp_listener(int ci)
{
int fd, i, one = 1;
socklen_t addrlen = sizeof(struct sockaddr);
struct sockaddr addr;
struct tcp_conn *conn;
fd = accept(client[ci].fd, &addr, &addrlen);
if (fd < 0) {
log_error("process_tcp_listener: accept error %d %d",
fd, errno);
return;
}
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one));
conn = malloc(sizeof(struct tcp_conn));
if (!conn) {
log_error("failed to alloc mem");
return;
}
memset(conn, 0, sizeof(struct tcp_conn));
conn->s = fd;
memcpy(&conn->to, &addr, sizeof(struct sockaddr));
list_add_tail(&conn->list, &tcp);
i = client_add(fd, process_connection, process_dead);
log_debug("client connection %d fd %d", i, fd);
}
static int setup_tcp_listener(void)
{
struct sockaddr_storage sockaddr;
int s, addrlen, rv;
s = socket(local.family, SOCK_STREAM, 0);
if (s == -1) {
log_error("failed to create tcp socket %s", strerror(errno));
return s;
}
ipaddr_to_sockaddr(&local, booth_conf->port, &sockaddr, &addrlen);
rv = bind(s, (struct sockaddr *)&sockaddr, addrlen);
if (rv == -1) {
log_error("failed to bind socket %s", strerror(errno));
return rv;
}
rv = listen(s, 5);
if (rv == -1) {
log_error("failed to listen on socket %s", strerror(errno));
return rv;
}
return s;
}
static int booth_tcp_init(void * unused __attribute__((unused)))
{
int rv;
if (!local.local)
return -1;
rv = setup_tcp_listener();
if (rv < 0)
return rv;
client_add(rv, process_tcp_listener, NULL);
return 0;
}
static int connect_nonb(int sockfd, const struct sockaddr *saptr,
socklen_t salen, int sec)
{
int flags, n, error;
socklen_t len;
fd_set rset, wset;
struct timeval tval;
flags = fcntl(sockfd, F_GETFL, 0);
fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
error = 0;
if ( (n = connect(sockfd, saptr, salen)) < 0)
if (errno != EINPROGRESS)
return -1;
if (n == 0)
goto done; /* connect completed immediately */
FD_ZERO(&rset);
FD_SET(sockfd, &rset);
wset = rset;
tval.tv_sec = sec;
tval.tv_usec = 0;
if ((n = select(sockfd + 1, &rset, &wset, NULL,
sec ? &tval : NULL)) == 0) {
/* leave outside function to close */
/* timeout */
/* close(sockfd); */
errno = ETIMEDOUT;
return -1;
}
if (FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) {
len = sizeof(error);
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0)
return -1; /* Solaris pending error */
} else {
log_error("select error: sockfd not set");
return -1;
}
done:
fcntl(sockfd, F_SETFL, flags); /* restore file status flags */
if (error) {
/* leave outside function to close */
/* close(sockfd); */
errno = error;
return -1;
}
return 0;
}
static int booth_tcp_open(struct booth_node *to)
{
int s, rv;
if (to->tcp_fd >= 0)
goto found;
s = socket(BOOTH_PROTO_FAMILY, SOCK_STREAM, 0);
if (s == -1)
return -1;
rv = connect_nonb(s, (struct sockaddr *)&to->in6, to->addrlen, 10);
if (rv == -1) {
if( errno == ETIMEDOUT)
- log_error("connection to %s timeout", to->addr);
+ log_error("connection to %s timeout", to->addr_string);
else
- log_error("connection to %s error %s", to->addr,
+ log_error("connection to %s error %s", to->addr_string,
strerror(errno));
goto error;
}
to->tcp_fd = s;
found:
return 1;
error:
if (s >= 0)
close(s);
return -1;
}
static int booth_tcp_send(struct booth_node *to, void *buf, int len)
{
return do_write(to->tcp_fd, buf, len);
}
static int booth_tcp_recv(struct booth_node *from, void *buf, int len)
{
return do_read(from->tcp_fd, buf, len);
}
static int booth_tcp_close(struct booth_node *to)
{
if (to->tcp_fd >= 0) {
close(to->tcp_fd);
to->tcp_fd = -1;
}
return 0;
}
static int booth_tcp_exit(void)
{
return 0;
}
static int setup_udp_server(void)
{
struct sockaddr_storage sockaddr;
int addrlen, rv;
unsigned int recvbuf_size;
udp.s = socket(local.family, SOCK_DGRAM, 0);
if (udp.s == -1) {
log_error("failed to create udp socket %s", strerror(errno));
return -1;
}
rv = fcntl(udp.s, F_SETFL, O_NONBLOCK);
if (rv == -1) {
log_error("failed to set non-blocking operation "
"on udp socket: %s", strerror(errno));
close(udp.s);
return -1;
}
ipaddr_to_sockaddr(&local, booth_conf->port, &sockaddr, &addrlen);
rv = bind(udp.s, (struct sockaddr *)&sockaddr, addrlen);
if (rv == -1) {
log_error("failed to bind socket %s", strerror(errno));
close(udp.s);
return -1;
}
recvbuf_size = SOCKET_BUFFER_SIZE;
rv = setsockopt(udp.s, SOL_SOCKET, SO_RCVBUF,
&recvbuf_size, sizeof(recvbuf_size));
if (rv == -1) {
log_error("failed to set recvbuf size");
close(udp.s);
return -1;
}
return udp.s;
}
static void process_recv(int ci)
{
struct msghdr msg_recv;
struct sockaddr_storage system_from;
int received;
unsigned char *msg_offset;
msg_recv.msg_name = &system_from;
msg_recv.msg_namelen = sizeof (struct sockaddr_storage);
msg_recv.msg_iov = &udp.iov_recv;
msg_recv.msg_iovlen = 1;
msg_recv.msg_control = 0;
msg_recv.msg_controllen = 0;
msg_recv.msg_flags = 0;
received = recvmsg(client[ci].fd, &msg_recv,
MSG_NOSIGNAL | MSG_DONTWAIT);
if (received == -1)
return;
msg_offset = udp.iov_recv.iov_base;
deliver_fn(msg_offset, received);
}
static int booth_udp_init(void *f)
{
int myid = -1;
memset(&local, 0, sizeof(struct booth_node));
myid = load_myid();
if (myid < 0) {
log_error("can't find myself in config file");
return -1;
}
memset(&udp, 0, sizeof(struct udp_context));
udp.iov_recv.iov_base = udp.iov_buffer;
udp.iov_recv.iov_len = FRAME_SIZE_MAX;
udp.s = setup_udp_server();
if (udp.s == -1)
return -1;
deliver_fn = f;
client_add(udp.s, process_recv, NULL);
return 0;
}
static int booth_udp_send(struct booth_node *to, void *buf, int len)
{
struct msghdr msg;
struct sockaddr_storage sockaddr;
struct iovec iovec;
unsigned int iov_len;
int addrlen = 0, rv;
iovec.iov_base = (void *)buf;
iovec.iov_len = len;
iov_len = 1;
ipaddr_to_sockaddr((struct booth_node *)to, booth_conf->port,
&sockaddr, &addrlen);
msg.msg_name = &sockaddr;
msg.msg_namelen = addrlen;
msg.msg_iov = (void *)&iovec;
msg.msg_iovlen = iov_len;
msg.msg_control = 0;
msg.msg_controllen = 0;
msg.msg_flags = 0;
rv = sendmsg(udp.s, &msg, MSG_NOSIGNAL);
if (rv < 0)
return rv;
return 0;
}
static int booth_udp_broadcast(void *buf, int len)
{
int i;
if (!booth_conf || !booth_conf->node_count)
return -1;
for (i = 0; i < booth_conf->node_count; i++)
booth_udp_send(booth_conf->node+i, buf, len);
return 0;
}
static int booth_udp_exit(void)
{
return 0;
}
/* SCTP transport layer has not been developed yet */
static int booth_sctp_init(void *f __attribute__((unused)))
{
return 0;
}
static int booth_sctp_send(struct booth_node * to __attribute__((unused)),
void *buf __attribute__((unused)),
int len __attribute__((unused)))
{
return 0;
}
static int booth_sctp_broadcast(void *buf __attribute__((unused)),
int len __attribute__((unused)))
{
return 0;
}
static int booth_sctp_exit(void)
{
return 0;
}
struct booth_transport booth_transport[] = {
{
.name = "TCP",
.init = booth_tcp_init,
.get_myid = booth_get_myid,
.open = booth_tcp_open,
.send = booth_tcp_send,
.recv = booth_tcp_recv,
.close = booth_tcp_close,
.exit = booth_tcp_exit
},
{
.name = "UDP",
.init = booth_udp_init,
.get_myid = booth_get_myid,
.send = booth_udp_send,
.broadcast = booth_udp_broadcast,
.exit = booth_udp_exit
},
{
.name = "SCTP",
.init = booth_sctp_init,
.get_myid = booth_get_myid,
.send = booth_sctp_send,
.broadcast = booth_sctp_broadcast,
.exit = booth_sctp_exit
}
};
diff --git a/src/transport.h b/src/transport.h
index add9902..2506e18 100644
--- a/src/transport.h
+++ b/src/transport.h
@@ -1,66 +1,66 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _TRANSPORT_H
#define _TRANSPORT_H
#include "booth.h"
struct booth_node {
int nodeid;
int type;
int local;
unsigned short family;
+ char addr_string[BOOTH_NAME_LEN];
int tcp_fd;
- char addr[BOOTH_NAME_LEN];
int addrlen;
union {
struct in_addr in4;
struct in6_addr in6;
};
} __attribute__((packed));
typedef enum {
TCP = 0,
UDP = 1,
SCTP = 2,
TRANSPORT_ENTRIES = 3,
} transport_layer_t;
typedef enum {
ARBITRATOR = 1,
SITE,
CLIENT,
} node_type_t;
struct booth_transport {
const char *name;
int (*init) (void *);
int (*get_myid) (void);
int (*open) (struct booth_node *);
int (*send) (struct booth_node *, void *, int);
int (*recv) (struct booth_node *, void *, int);
int (*broadcast) (void *, int);
int (*close) (struct booth_node *);
int (*exit) (void);
};
struct booth_transport booth_transport[TRANSPORT_ENTRIES];
int find_myself(struct booth_node **me, int fuzzy_allowed);
#endif /* _TRANSPORT_H */
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Mon, Feb 24, 4:54 PM (2 h, 17 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1464352
Default Alt Text
(40 KB)
Attached To
Mode
rB Booth
Attached
Detach File
Event Timeline
Log In to Comment