Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/src/attr.c b/src/attr.c
index 34df335..3faa888 100644
--- a/src/attr.c
+++ b/src/attr.c
@@ -1,476 +1,493 @@
/*
* Copyright (C) 2015 Dejan Muhamedagic <dejan@hello-penguin.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include <stdio.h>
#include <string.h>
#include "attr.h"
#include "booth.h"
#include "ticket.h"
#include "pacemaker.h"
void print_geostore_usage(void)
{
printf(
"Usage:\n"
" geostore {list|set|get|delete} [-t ticket] [options] attr [value]\n"
"\n"
" list: List all attributes\n"
" set: Set attribute to a value\n"
" get: Get attribute's value\n"
" delete: Delete attribute\n"
"\n"
" -t <ticket> Ticket where attribute resides\n"
" (required, if more than one ticket is configured)\n"
"\n"
"Options:\n"
" -c FILE Specify config file [default " BOOTH_DEFAULT_CONF "]\n"
" Can be a path or just a name without \".conf\" suffix\n"
" -s <site> Connect to a different site\n"
" -h Print this help\n"
"\n"
"Examples:\n"
"\n"
" # geostore list -t ticket-A -s 10.121.8.183\n"
" # geostore set -s 10.121.8.183 sr_status ACTIVE\n"
" # geostore get -t ticket-A -s 10.121.8.183 sr_status\n"
" # geostore delete -s 10.121.8.183 sr_status\n"
"\n"
"See the geostore(8) man page for more details.\n"
);
}
/*
* the client side
*/
/* cl has all the input parameters:
* ticket, attr name, attr value
*/
int test_attr_reply(cmd_result_t reply_code, cmd_request_t cmd)
{
int rv = 0;
const char *op_str = NULL;
switch (cmd) {
case ATTR_SET: op_str = "set"; break;
case ATTR_GET: op_str = "get"; break;
case ATTR_LIST: op_str = "list"; break;
case ATTR_DEL: op_str = "delete"; break;
default:
log_error("internal error reading reply result!");
return -1;
}
switch (reply_code) {
case RLT_ASYNC:
log_info("%s command sent, result will be returned "
"asynchronously.", op_str);
rv = 0;
break;
case RLT_SYNC_SUCC:
case RLT_SUCCESS:
if (cmd == ATTR_SET)
log_info("%s succeeded!", op_str);
rv = 0;
break;
case RLT_SYNC_FAIL:
log_info("%s failed!", op_str);
rv = -1;
break;
case RLT_INVALID_ARG:
log_error("ticket \"%s\" does not exist",
cl.attr_msg.attr.tkt_id);
rv = 1;
break;
case RLT_NO_SUCH_ATTR:
log_error("attribute \"%s\" not set",
cl.attr_msg.attr.name);
rv = 1;
break;
case RLT_AUTH:
log_error("authentication error");
rv = -1;
break;
default:
log_error("got an error code: %x", rv);
rv = -1;
}
return rv;
}
/* read the server's reply
* need to first get the header which contains the length of the
* reply
* return codes:
* -2: header not received
* -1: header received, but message too short
* >=0: success
*/
static int read_server_reply(
struct booth_transport const *tpt, struct booth_site *site,
char *msg)
{
struct boothc_header *header;
int rv;
int len;
header = (struct boothc_header *)msg;
rv = tpt->recv(site, header, sizeof(*header));
if (rv < 0) {
return -2;
}
len = ntohl(header->length);
rv = tpt->recv(site, msg+sizeof(*header), len-sizeof(*header));
if (rv < 0) {
return -1;
}
return rv;
}
int do_attr_command(struct booth_config *conf, cmd_request_t cmd)
{
struct booth_site *site = NULL;
struct boothc_header *header;
struct booth_transport const *tpt = NULL;
int len, rv = -1;
char *msg = NULL;
if (!*cl.site)
site = local;
else {
if (!find_site_by_name(conf, cl.site, &site, 1)) {
log_error("Site \"%s\" not configured.", cl.site);
goto out_close;
}
}
if (site->type == ARBITRATOR) {
if (site == local) {
log_error("We're just an arbitrator, no attributes here.");
} else {
log_error("%s is just an arbitrator, no attributes there.", cl.site);
}
goto out_close;
}
tpt = booth_transport + TCP;
init_header(&cl.attr_msg.header, cmd, 0, cl.options, 0, 0,
sizeof(cl.attr_msg));
rv = tpt->open(site);
if (rv < 0)
goto out_close;
- rv = tpt->send(site, &cl.attr_msg, sendmsglen(&cl.attr_msg));
- if (rv < 0)
+ rv = tpt->send(conf, site, &cl.attr_msg, sendmsglen(&cl.attr_msg));
+ if (rv < 0) {
goto out_close;
+ }
msg = malloc(MAX_MSG_LEN);
if (!msg) {
log_error("out of memory");
rv = -1;
goto out_close;
}
rv = read_server_reply(tpt, site, msg);
header = (struct boothc_header *)msg;
if (rv < 0) {
if (rv == -1)
(void)test_attr_reply(ntohl(header->result), cmd);
goto out_close;
}
len = ntohl(header->length);
if (check_boothc_header(header, len) < 0) {
log_error("message from %s receive error", site_string(site));
rv = -1;
goto out_close;
}
if (check_auth(site, msg, len)) {
log_error("%s failed to authenticate", site_string(site));
rv = -1;
goto out_close;
}
rv = test_attr_reply(ntohl(header->result), cmd);
out_close:
if (tpt && site)
tpt->close(site);
if (msg)
free(msg);
return rv;
}
/*
* the server side
*/
/* need to invert gboolean, our success is 0
*/
#define gbool2rlt(i) (i ? RLT_SUCCESS : RLT_SYNC_FAIL)
static void free_geo_attr(gpointer data)
{
struct geo_attr *a = (struct geo_attr *)data;
if (!a)
return;
g_free(a->val);
g_free(a);
}
int store_geo_attr(struct ticket_config *tk, const char *name,
const char *val, int notime)
{
struct geo_attr *a;
GDestroyNotify free_geo_attr_notify = free_geo_attr;
if (!tk)
return -1;
/*
* allocate new, if attr doesn't already exist
* copy the attribute value
* send status
*/
if (!tk->attr)
tk->attr = g_hash_table_new_full(g_str_hash, g_str_equal,
g_free, free_geo_attr_notify);
if (!tk->attr) {
log_error("out of memory");
return -1;
}
if (strnlen(name, BOOTH_NAME_LEN) == BOOTH_NAME_LEN)
tk_log_warn("name of the attribute too long (%d+ bytes), skipped",
BOOTH_NAME_LEN);
else if (strnlen(val, BOOTH_ATTRVAL_LEN) == BOOTH_ATTRVAL_LEN)
tk_log_warn("value of the attribute too long (%d+ bytes), skipped",
BOOTH_ATTRVAL_LEN);
else {
a = (struct geo_attr *)calloc(1, sizeof(struct geo_attr));
if (!a) {
log_error("out of memory");
return -1;
}
a->val = g_strdup(val);
if (!notime)
get_time(&a->update_ts);
g_hash_table_insert(tk->attr,
g_strdup(name), a);
}
return 0;
}
static cmd_result_t attr_set(struct ticket_config *tk, struct boothc_attr_msg *msg)
{
int rc;
rc = store_geo_attr(tk, msg->attr.name, msg->attr.val, 0);
if (rc) {
return RLT_SYNC_FAIL;
}
(void)pcmk_handler.set_attr(tk, msg->attr.name, msg->attr.val);
return RLT_SUCCESS;
}
static cmd_result_t attr_del(struct ticket_config *tk, struct boothc_attr_msg *msg)
{
gboolean rv;
gpointer orig_key, value;
/*
* lookup attr
* deallocate, if found
* send status
*/
if (!tk->attr)
return RLT_NO_SUCH_ATTR;
rv = g_hash_table_lookup_extended(tk->attr, msg->attr.name,
&orig_key, &value);
if (!rv)
return RLT_NO_SUCH_ATTR;
rv = g_hash_table_remove(tk->attr, msg->attr.name);
(void)pcmk_handler.del_attr(tk, msg->attr.name);
return gbool2rlt(rv);
}
static void
append_attr(gpointer key, gpointer value, gpointer user_data)
{
char *attr_name = (char *)key;
struct geo_attr *a = (struct geo_attr *)value;
GString *data = (GString *)user_data;
char time_str[64];
time_t ts;
if (is_time_set(&a->update_ts)) {
ts = wall_ts(&a->update_ts);
strftime(time_str, sizeof(time_str), "%F %T",
localtime(&ts));
} else {
time_str[0] = '\0';
}
g_string_append_printf(data, "%s %s %s\n",
attr_name, a->val, time_str);
}
-static cmd_result_t attr_get(struct ticket_config *tk, int fd, struct boothc_attr_msg *msg)
+static cmd_result_t attr_get(struct booth_config *conf, struct ticket_config *tk,
+ int fd, struct boothc_attr_msg *msg)
{
cmd_result_t rv = RLT_SUCCESS;
struct boothc_hdr_msg hdr;
struct geo_attr *a;
GString *attr_val;
/*
* lookup attr
* send value
*/
- if (!tk->attr)
+ if (!tk->attr) {
return RLT_NO_SUCH_ATTR;
+ }
a = (struct geo_attr *)g_hash_table_lookup(tk->attr, msg->attr.name);
- if (!a)
+ if (!a) {
return RLT_NO_SUCH_ATTR;
+ }
+
attr_val = g_string_new(NULL);
if (!attr_val) {
log_error("out of memory");
return RLT_SYNC_FAIL;
}
g_string_printf(attr_val, "%s\n", a->val);
init_header(&hdr.header, ATTR_GET, 0, 0, RLT_SUCCESS, 0,
sizeof(hdr) + attr_val->len);
- if (send_header_plus(fd, &hdr, attr_val->str, attr_val->len))
+
+ if (send_header_plus(conf, fd, &hdr, attr_val->str, attr_val->len)) {
rv = RLT_SYNC_FAIL;
- if (attr_val)
+ }
+
+ if (attr_val) {
g_string_free(attr_val, TRUE);
+ }
+
return rv;
}
-static cmd_result_t attr_list(struct ticket_config *tk, int fd, struct boothc_attr_msg *msg)
+static cmd_result_t attr_list(struct booth_config *conf, struct ticket_config *tk,
+ int fd, struct boothc_attr_msg *msg)
{
GString *data;
cmd_result_t rv;
struct boothc_hdr_msg hdr;
/*
* list all attributes for the ticket
* send the list
*/
data = g_string_sized_new(512);
if (!data) {
log_error("out of memory");
return RLT_SYNC_FAIL;
}
if (tk->attr) {
g_hash_table_foreach(tk->attr, append_attr, data);
}
init_header(&hdr.header, ATTR_LIST, 0, 0, RLT_SUCCESS, 0,
sizeof(hdr) + data->len);
- rv = send_header_plus(fd, &hdr, data->str, data->len);
+ rv = send_header_plus(conf, fd, &hdr, data->str, data->len);
- if (data)
+ if (data) {
g_string_free(data, TRUE);
+ }
+
return rv;
}
int process_attr_request(struct booth_config *conf, struct client *req_client,
void *buf)
{
cmd_result_t rv = RLT_SYNC_FAIL;
struct ticket_config *tk;
int cmd;
struct boothc_attr_msg *msg;
struct boothc_hdr_msg hdr;
msg = (struct boothc_attr_msg *)buf;
cmd = ntohl(msg->header.cmd);
if (!check_ticket(conf, msg->attr.tkt_id, &tk)) {
log_warn("client referenced unknown ticket %s",
msg->attr.tkt_id);
rv = RLT_INVALID_ARG;
goto reply_now;
}
switch (cmd) {
case ATTR_LIST:
- rv = attr_list(tk, req_client->fd, msg);
- if (rv)
+ rv = attr_list(conf, tk, req_client->fd, msg);
+ if (rv) {
goto reply_now;
+ }
+
return 1;
case ATTR_GET:
- rv = attr_get(tk, req_client->fd, msg);
- if (rv)
+ rv = attr_get(conf, tk, req_client->fd, msg);
+ if (rv) {
goto reply_now;
+ }
+
return 1;
case ATTR_SET:
rv = attr_set(tk, msg);
break;
case ATTR_DEL:
rv = attr_del(tk, msg);
break;
}
reply_now:
init_header(&hdr.header, CL_RESULT, 0, 0, rv, 0, sizeof(hdr));
- send_header_plus(req_client->fd, &hdr, NULL, 0);
+ send_header_plus(conf, req_client->fd, &hdr, NULL, 0);
return 1;
}
/* read attr message from another site */
/* this is a NOOP and it should never be invoked
* only clients retrieve/manage attributes and they connect
* directly to the target site
*/
int attr_recv(struct booth_config *conf, void *buf, struct booth_site *source)
{
struct boothc_attr_msg *msg;
struct ticket_config *tk;
msg = (struct boothc_attr_msg *)buf;
log_warn("unexpected attribute message from %s",
site_string(source));
if (!check_ticket(conf, msg->attr.tkt_id, &tk)) {
log_warn("got invalid ticket name %s from %s",
msg->attr.tkt_id, site_string(source));
source->invalid_cnt++;
return -1;
}
return 0;
}
diff --git a/src/booth.h b/src/booth.h
index 799cdf9..3f0c220 100644
--- a/src/booth.h
+++ b/src/booth.h
@@ -1,390 +1,398 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _BOOTH_H
#define _BOOTH_H
#include <stdint.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <glib.h>
#include <limits.h>
#include "timer.h"
#define BOOTH_RUN_DIR "/var/run/booth/"
#define BOOTH_LOG_DIR "/var/log"
#define BOOTH_LOGFILE_NAME "booth.log"
#define BOOTH_DEFAULT_CONF_DIR "/etc/booth/"
#define BOOTH_DEFAULT_CONF_NAME "booth"
#define BOOTH_DEFAULT_CONF_EXT ".conf"
#define BOOTH_DEFAULT_CONF \
BOOTH_DEFAULT_CONF_DIR BOOTH_DEFAULT_CONF_NAME BOOTH_DEFAULT_CONF_EXT
#define DAEMON_NAME "boothd"
#define BOOTH_PATH_LEN PATH_MAX
#define BOOTH_MAX_KEY_LEN 64
#define BOOTH_MIN_KEY_LEN 8
/* hash size is 160 bits (sha1), but add a bit more space in case
* stronger hashes are required */
#define BOOTH_MAC_SIZE 24
/* tolerate packets which are not older than 10 minutes */
#define BOOTH_DEFAULT_MAX_TIME_SKEW 600
#define BOOTH_DEFAULT_PORT 9929
#define BOOTHC_MAGIC 0x5F1BA08C
#define BOOTHC_VERSION 0x00010003
/** Timeout value for poll().
* Determines frequency of periodic jobs, eg. when send-retries are done.
* See process_tickets(). */
#define POLL_TIMEOUT 100
/** @{ */
/** The on-network data structures and constants. */
#define BOOTH_NAME_LEN 64
#define BOOTH_ATTRVAL_LEN 128
#define CHAR2CONST(a,b,c,d) ((a << 24) | (b << 16) | (c << 8) | d)
/* Says that the ticket shouldn't be active anywhere.
* NONE wouldn't be specific enough. */
#define NO_ONE ((uint32_t)-1)
/* Says that another one should recover. */
#define TICKET_LOST CHAR2CONST('L', 'O', 'S', 'T')
struct booth_config;
typedef void (*workfn_t)(struct booth_config *conf, int ci);
typedef char boothc_site[BOOTH_NAME_LEN];
typedef char boothc_ticket[BOOTH_NAME_LEN];
typedef char boothc_attr[BOOTH_NAME_LEN];
typedef char boothc_attr_value[BOOTH_ATTRVAL_LEN];
/* message option bits */
enum {
BOOTH_OPT_AUTH = 1, /* authentication */
BOOTH_OPT_ATTR = 4, /* attr message type, otherwise ticket */
};
struct boothc_header {
/** Various options, message type, authentication
*/
uint32_t opts;
/** Generation info (used for authentication)
* This is something that would need to be monotone
* incremental. CLOCK_MONOTONIC should fit the purpose. On
* failover, however, it may happen that the new host has a
* clock which is significantly behind the clock of old host.
* We'll need to relax a bit for the nodes which are starting
* (just accept all OP_STATUS).
*/
uint32_t secs; /* seconds */
uint32_t usecs; /* microseconds */
/** BOOTHC_MAGIC */
uint32_t magic;
/** BOOTHC_VERSION */
uint32_t version;
/** Packet source; site_id. See add_site(). */
uint32_t from;
/** Length including header */
uint32_t length;
/** The command respectively protocol state. See cmd_request_t. */
uint32_t cmd;
/** The matching request (what do we reply to). See cmd_request_t. */
uint32_t request;
/** Command options. */
uint32_t options;
/** The reason for this RPC. */
uint32_t reason;
/** Result of operation. 0 == OK */
uint32_t result;
char data[0];
} __attribute__((packed));
struct ticket_msg {
/** Ticket name. */
boothc_ticket id;
/** Current leader. May be NO_ONE. See add_site().
* For a OP_REQ_VOTE this is */
uint32_t leader;
/** Current term. */
uint32_t term;
uint32_t term_valid_for;
/* Perhaps we need to send a status along, too - like
* starting, running, stopping, error, ...? */
} __attribute__((packed));
struct attr_msg {
/** Ticket name. */
boothc_ticket tkt_id;
/** Attribute name. */
boothc_attr name;
/** The value. */
boothc_attr_value val;
} __attribute__((packed));
/* GEO attributes
* attributes should be regularly updated.
*/
struct geo_attr {
/** Update timestamp. */
timetype update_ts;
/** The value. */
char *val;
/** Who set it (currently unused)
struct booth_site *origin;
*/
};
struct hmac {
/** hash id, currently set to constant BOOTH_HASH */
uint32_t hid;
/** the calculated hash, BOOTH_MAC_SIZE is big enough to
* accommodate the hash of type hid */
unsigned char hash[BOOTH_MAC_SIZE];
} __attribute__((packed));
struct boothc_hdr_msg {
struct boothc_header header;
struct hmac hmac;
} __attribute__((packed));
struct boothc_ticket_msg {
struct boothc_header header;
struct ticket_msg ticket;
struct hmac hmac;
} __attribute__((packed));
struct boothc_attr_msg {
struct boothc_header header;
struct attr_msg attr;
struct hmac hmac;
} __attribute__((packed));
typedef enum {
/* 0x43 = "C"ommands */
CMD_LIST = CHAR2CONST('C', 'L', 's', 't'),
CMD_GRANT = CHAR2CONST('C', 'G', 'n', 't'),
CMD_REVOKE = CHAR2CONST('C', 'R', 'v', 'k'),
CMD_PEERS = CHAR2CONST('P', 'e', 'e', 'r'),
/* Replies */
CL_RESULT = CHAR2CONST('R', 's', 'l', 't'),
CL_LIST = CHAR2CONST('R', 'L', 's', 't'),
CL_GRANT = CHAR2CONST('R', 'G', 'n', 't'),
CL_REVOKE = CHAR2CONST('R', 'R', 'v', 'k'),
/* get status from another server */
OP_STATUS = CHAR2CONST('S', 't', 'a', 't'),
OP_MY_INDEX = CHAR2CONST('M', 'I', 'd', 'x'), /* reply to status */
/* Raft */
OP_REQ_VOTE = CHAR2CONST('R', 'V', 'o', 't'), /* start election */
OP_VOTE_FOR = CHAR2CONST('V', 't', 'F', 'r'), /* reply to REQ_VOTE */
OP_HEARTBEAT= CHAR2CONST('H', 'r', 't', 'B'), /* Heartbeat */
OP_ACK = CHAR2CONST('A', 'c', 'k', '.'), /* Ack for heartbeats and revokes */
OP_UPDATE = CHAR2CONST('U', 'p', 'd', 'E'), /* Update ticket */
OP_REVOKE = CHAR2CONST('R', 'e', 'v', 'k'), /* Revoke ticket */
OP_REJECTED = CHAR2CONST('R', 'J', 'C', '!'),
/* Attributes */
ATTR_SET = CHAR2CONST('A', 'S', 'e', 't'),
ATTR_GET = CHAR2CONST('A', 'G', 'e', 't'),
ATTR_DEL = CHAR2CONST('A', 'D', 'e', 'l'),
ATTR_LIST = CHAR2CONST('A', 'L', 's', 't'),
} cmd_request_t;
typedef enum {
/* for compatibility with other functions */
RLT_SUCCESS = 0,
RLT_ASYNC = CHAR2CONST('A', 's', 'y', 'n'),
RLT_MORE = CHAR2CONST('M', 'o', 'r', 'e'),
RLT_SYNC_SUCC = CHAR2CONST('S', 'c', 'c', 's'),
RLT_SYNC_FAIL = CHAR2CONST('F', 'a', 'i', 'l'),
RLT_INVALID_ARG = CHAR2CONST('I', 'A', 'r', 'g'),
RLT_NO_SUCH_ATTR = CHAR2CONST('N', 'A', 't', 'r'),
RLT_CIB_PENDING = CHAR2CONST('P', 'e', 'n', 'd'),
RLT_EXT_FAILED = CHAR2CONST('X', 'P', 'r', 'g'),
RLT_ATTR_PREREQ = CHAR2CONST('A', 'P', 'r', 'q'),
RLT_TICKET_IDLE = CHAR2CONST('T', 'i', 'd', 'l'),
RLT_OVERGRANT = CHAR2CONST('O', 'v', 'e', 'r'),
RLT_PROBABLY_SUCCESS = CHAR2CONST('S', 'u', 'c', '?'),
RLT_BUSY = CHAR2CONST('B', 'u', 's', 'y'),
RLT_AUTH = CHAR2CONST('A', 'u', 't', 'h'),
RLT_TERM_OUTDATED = CHAR2CONST('T', 'O', 'd', 't'),
RLT_TERM_STILL_VALID = CHAR2CONST('T', 'V', 'l', 'd'),
RLT_YOU_OUTDATED = CHAR2CONST('O', 'u', 't', 'd'),
RLT_REDIRECT = CHAR2CONST('R', 'e', 'd', 'r'),
} cmd_result_t;
typedef enum {
/* for compatibility with other functions */
OR_JUST_SO = 0,
OR_AGAIN = CHAR2CONST('A', 'a', 'a', 'a'),
OR_TKT_LOST = CHAR2CONST('T', 'L', 's', 't'),
OR_REACQUIRE = CHAR2CONST('R', 'a', 'c', 'q'),
OR_ADMIN = CHAR2CONST('A', 'd', 'm', 'n'),
OR_LOCAL_FAIL = CHAR2CONST('L', 'o', 'c', 'F'),
OR_STEPDOWN = CHAR2CONST('S', 'p', 'd', 'n'),
OR_SPLIT = CHAR2CONST('S', 'p', 'l', 't'),
} cmd_reason_t;
/* bitwise command options
*/
typedef enum {
OPT_IMMEDIATE = 1, /* immediate grant */
OPT_WAIT = 2, /* wait for the elections' outcome */
OPT_WAIT_COMMIT = 4, /* wait for the ticket commit to CIB */
} cmd_options_t;
/** @} */
/** @{ */
struct booth_site {
/** Calculated ID. See add_site(). */
int site_id;
int type;
int local;
/** Roles, like ACCEPTOR, PROPOSER, or LEARNER. Not really used ATM. */
int role;
boothc_site addr_string;
int tcp_fd;
int udp_fd;
/* 0-based, used for indexing into per-ticket weights.
* -1 for no_leader. */
int index;
uint64_t bitmask;
unsigned short family;
union {
struct sockaddr_in sa4;
struct sockaddr_in6 sa6;
};
int saddrlen;
int addrlen;
/** statistics */
time_t last_recv;
unsigned int sent_cnt;
unsigned int sent_err_cnt;
unsigned int resend_cnt;
unsigned int recv_cnt;
unsigned int recv_err_cnt;
unsigned int sec_cnt;
unsigned int invalid_cnt;
/** last timestamp seen from this site */
uint32_t last_secs;
uint32_t last_usecs;
};
extern struct booth_site *local;
extern struct booth_site *const no_leader;
/** @} */
struct booth_transport;
struct client {
int fd;
const struct booth_transport *transport;
struct boothc_ticket_msg *msg;
int offset; /* bytes read so far into msg */
workfn_t workfn;
void (*deadfn)(int);
};
extern struct client *clients;
extern struct pollfd *pollfds;
int client_add(int fd, const struct booth_transport *tpt,
workfn_t workfn, void (*deadfn)(int ci));
int find_client_by_fd(int fd);
void safe_copy(char *dest, char *value, size_t buflen, const char *description);
int update_authkey(void);
-void list_peers(int fd);
+
+/**
+ * @internal
+ * Response to "get all servers we know about"
+ *
+ * @param[in,out] conf config object to refer to
+ * @param[in] fd file descriptor of the socket to respond to
+ */
+void list_peers(struct booth_config *conf, int fd);
struct command_line {
int type; /* ACT_ */
int op; /* OP_ */
int options; /* OPT_ */
char configfile[BOOTH_PATH_LEN];
char lockfile[BOOTH_PATH_LEN];
char site[BOOTH_NAME_LEN];
struct boothc_ticket_msg msg;
struct boothc_attr_msg attr_msg;
};
extern struct command_line cl;
/* http://gcc.gnu.org/onlinedocs/gcc/Typeof.html */
#define min(a__,b__) \
({ typeof (a__) _a = (a__); \
typeof (b__) _b = (b__); \
_a < _b ? _a : _b; })
#define max(a__,b__) \
({ typeof (a__) _a = (a__); \
typeof (b__) _b = (b__); \
_a > _b ? _a : _b; })
#endif /* _BOOTH_H */
diff --git a/src/main.c b/src/main.c
index 7d93296..cd69322 100644
--- a/src/main.c
+++ b/src/main.c
@@ -1,1698 +1,1701 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "b_config.h"
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sched.h>
#include <errno.h>
#include <limits.h>
#include <sys/file.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/poll.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <string.h>
#include <ctype.h>
#include <assert.h>
#include <signal.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <crm/services.h>
#if HAVE_LIBGNUTLS
#include <gnutls/gnutls.h>
#endif
#if HAVE_LIBGCRYPT
#include <gcrypt.h>
#endif
#ifndef NAMETAG_LIBSYSTEMD
#include <clplumbing/setproctitle.h>
#else
#include "alt/nametag_libsystemd.h"
#endif
#ifdef COREDUMP_NURSING
#include <sys/prctl.h>
#include <clplumbing/coredumps.h>
#endif
#include "log.h"
#include "booth.h"
#include "config.h"
#include "transport.h"
#include "inline-fn.h"
#include "pacemaker.h"
#include "ticket.h"
#include "request.h"
#include "attr.h"
#include "handler.h"
#define RELEASE_STR VERSION
#define CLIENT_NALLOC 32
static int daemonize = 1;
int enable_stderr = 0;
timetype start_time;
/** Structure for "clients".
* Filehandles with incoming data get registered here (and in pollfds),
* along with their callbacks.
* Because these can be reallocated with every new fd, addressing
* happens _only_ by their numeric index. */
struct client *clients = NULL;
struct pollfd *pollfds = NULL;
static int client_maxi;
static int client_size = 0;
static const struct booth_site _no_leader = {
.addr_string = "none",
.site_id = NO_ONE,
.index = -1,
};
struct booth_site *const no_leader = (struct booth_site*) &_no_leader;
typedef enum
{
BOOTHD_STARTED=0,
BOOTHD_STARTING
} BOOTH_DAEMON_STATE;
int poll_timeout;
struct booth_config *booth_conf;
struct command_line cl;
/*
* Global signal handlers variables
*/
static int sig_exit_handler_called = 0;
static int sig_exit_handler_sig = 0;
static int sig_usr1_handler_called = 0;
static int sig_chld_handler_called = 0;
static const char *state_string(BOOTH_DAEMON_STATE st)
{
if (st == BOOTHD_STARTED) {
return "started";
} else if (st == BOOTHD_STARTING) {
return "starting";
} else {
return "invalid";
}
}
static void client_alloc(void)
{
int i;
if (!(clients = realloc(
clients, (client_size + CLIENT_NALLOC) * sizeof(*clients))
) || !(pollfds = realloc(
pollfds, (client_size + CLIENT_NALLOC) * sizeof(*pollfds))
)) {
log_error("can't alloc for client array");
exit(1);
}
for (i = client_size; i < client_size + CLIENT_NALLOC; i++) {
clients[i].workfn = NULL;
clients[i].deadfn = NULL;
clients[i].fd = -1;
pollfds[i].fd = -1;
pollfds[i].revents = 0;
}
client_size += CLIENT_NALLOC;
}
static void client_dead(int ci)
{
struct client *c = clients + ci;
if (c->fd != -1) {
log_debug("removing client %d", c->fd);
close(c->fd);
}
c->fd = -1;
c->workfn = NULL;
if (c->msg) {
free(c->msg);
c->msg = NULL;
c->offset = 0;
}
pollfds[ci].fd = -1;
}
int client_add(int fd, const struct booth_transport *tpt,
workfn_t workfn, void (*deadfn)(int ci))
{
int i;
struct client *c;
if (client_size - 1 <= client_maxi ) {
client_alloc();
}
for (i = 0; i < client_size; i++) {
c = clients + i;
if (c->fd != -1)
continue;
c->workfn = workfn;
if (deadfn)
c->deadfn = deadfn;
else
c->deadfn = client_dead;
c->transport = tpt;
c->fd = fd;
c->msg = NULL;
c->offset = 0;
pollfds[i].fd = fd;
pollfds[i].events = POLLIN;
if (i > client_maxi)
client_maxi = i;
return i;
}
assert(!"no client");
}
int find_client_by_fd(int fd)
{
int i;
if (fd < 0)
return -1;
for (i = 0; i <= client_maxi; i++) {
if (clients[i].fd == fd)
return i;
}
return -1;
}
static int format_peers(char **pdata, unsigned int *len)
{
struct booth_site *s;
char *data, *cp;
char time_str[64];
int i, alloc;
*pdata = NULL;
*len = 0;
alloc = booth_conf->site_count * (BOOTH_NAME_LEN + 256);
data = malloc(alloc);
if (!data)
return -ENOMEM;
cp = data;
_FOREACH_NODE(i, s) {
if (s == local)
continue;
strftime(time_str, sizeof(time_str), "%F %T",
localtime(&s->last_recv));
cp += snprintf(cp,
alloc - (cp - data),
"%-12s %s, last recv: %s\n",
type_to_string(s->type),
s->addr_string,
time_str);
cp += snprintf(cp,
alloc - (cp - data),
"\tSent pkts:%u error:%u resends:%u\n",
s->sent_cnt,
s->sent_err_cnt,
s->resend_cnt);
cp += snprintf(cp,
alloc - (cp - data),
"\tRecv pkts:%u error:%u authfail:%u invalid:%u\n\n",
s->recv_cnt,
s->recv_err_cnt,
s->sec_cnt,
s->invalid_cnt);
if (alloc - (cp - data) <= 0) {
free(data);
return -ENOMEM;
}
}
*pdata = data;
*len = cp - data;
return 0;
}
-void list_peers(int fd)
+void list_peers(struct booth_config *conf, int fd)
{
char *data;
unsigned int olen;
struct boothc_hdr_msg hdr;
- if (format_peers(&data, &olen) < 0)
+ if (format_peers(&data, &olen) < 0) {
goto out;
+ }
init_header(&hdr.header, CL_LIST, 0, 0, RLT_SUCCESS, 0, sizeof(hdr) + olen);
- (void)send_header_plus(fd, &hdr, data, olen);
+ send_header_plus(conf, fd, &hdr, data, olen);
out:
- if (data)
+ if (data) {
free(data);
+ }
}
/* trim trailing spaces if the key is ascii
*/
static void trim_key()
{
char *p;
int i;
for (i=0, p=booth_conf->authkey; i < booth_conf->authkey_len; i++, p++)
if (!isascii(*p))
return;
p = booth_conf->authkey;
while (booth_conf->authkey_len > 0 && isspace(*p)) {
p++;
booth_conf->authkey_len--;
}
memmove(booth_conf->authkey, p, booth_conf->authkey_len);
p = booth_conf->authkey + booth_conf->authkey_len - 1;
while (booth_conf->authkey_len > 0 && isspace(*p)) {
booth_conf->authkey_len--;
p--;
}
}
static int read_authkey()
{
int fd;
booth_conf->authkey[0] = '\0';
fd = open(booth_conf->authfile, O_RDONLY);
if (fd < 0) {
log_error("cannot open %s: %s",
booth_conf->authfile, strerror(errno));
return -1;
}
if (fstat(fd, &booth_conf->authstat) < 0) {
log_error("cannot stat authentication file %s (%d): %s",
booth_conf->authfile, fd, strerror(errno));
close(fd);
return -1;
}
if (booth_conf->authstat.st_mode & (S_IRGRP | S_IROTH)) {
log_error("%s: file shall not be readable for anyone but the owner",
booth_conf->authfile);
close(fd);
return -1;
}
booth_conf->authkey_len = read(fd, booth_conf->authkey, BOOTH_MAX_KEY_LEN);
close(fd);
trim_key();
log_debug("read key of size %d in authfile %s",
booth_conf->authkey_len, booth_conf->authfile);
/* make sure that the key is of minimum length */
return (booth_conf->authkey_len >= BOOTH_MIN_KEY_LEN) ? 0 : -1;
}
int update_authkey()
{
struct stat statbuf;
if (stat(booth_conf->authfile, &statbuf) < 0) {
log_error("cannot stat authentication file %s: %s",
booth_conf->authfile, strerror(errno));
return -1;
}
if (statbuf.st_mtime > booth_conf->authstat.st_mtime) {
return read_authkey();
}
return 0;
}
static int setup_config(struct booth_config **conf, int type)
{
int rv;
assert(conf != NULL);
rv = read_config(conf, cl.configfile, type);
if (rv < 0) {
goto out;
}
if (booth_conf->authfile[0] != '\0') {
rv = read_authkey();
if (rv < 0)
goto out;
#if HAVE_LIBGCRYPT
if (!gcry_check_version(NULL)) {
log_error("gcry_check_version");
rv = -ENOENT;
goto out;
}
gcry_control(GCRYCTL_DISABLE_SECMEM, 0);
gcry_control(GCRYCTL_INITIALIZATION_FINISHED, 0);
#endif
#if HAVE_LIBGNUTLS
if (gnutls_global_init() != 0) {
log_error("Cannot initialize GnuTLS");
rv = -EINVAL;
goto out;
};
#endif
}
/* Set "local" pointer, ignoring errors. */
if (cl.type == DAEMON && cl.site[0]) {
if (!find_site_by_name(booth_conf, cl.site, &local, 1)) {
log_error("Cannot find \"%s\" in the configuration.",
cl.site);
return -EINVAL;
}
local->local = 1;
} else {
find_myself(booth_conf, NULL, type == CLIENT || type == GEOSTORE);
}
rv = check_config(booth_conf, type);
if (rv < 0)
goto out;
/* Per default the PID file name is derived from the
* configuration name. */
if (!cl.lockfile[0]) {
snprintf(cl.lockfile, sizeof(cl.lockfile) - 1,
"%s/%s.pid", BOOTH_RUN_DIR, (*conf)->name);
}
out:
return rv;
}
static int setup_transport(void)
{
int rv;
rv = transport()->init(message_recv);
if (rv < 0) {
log_error("failed to init booth_transport %s", transport()->name);
goto out;
}
rv = booth_transport[TCP].init(NULL);
if (rv < 0) {
log_error("failed to init booth_transport[TCP]");
goto out;
}
out:
return rv;
}
static int write_daemon_state(int fd, int state)
{
char *buffer;
int rv, size;
rv = asprintf(&buffer, "booth_pid=%d booth_state=%s booth_type=%s "
"booth_cfg_name='%s' booth_id=%d "
"booth_addr_string='%s' booth_port=%d\n",
getpid(), state_string(state), type_to_string(local->type),
booth_conf->name, get_local_id(), site_string(local),
site_port(local));
if (rv < 0) {
log_error("Buffer write failed in write_daemon_state().");
return -1;
}
size = rv;
rv = ftruncate(fd, 0);
if (rv < 0) {
log_error("lockfile %s truncate error %d: %s",
cl.lockfile, errno, strerror(errno));
free(buffer);
return rv;
}
rv = lseek(fd, 0, SEEK_SET);
if (rv < 0) {
log_error("lseek set fd(%d) offset to 0 error, return(%d), message(%s)",
fd, rv, strerror(errno));
free(buffer);
return -1;
}
rv = write(fd, buffer, size);
if (rv != size) {
log_error("write to fd(%d, %d) returned %d, errno %d, message(%s)",
fd, size, rv, errno, strerror(errno));
free(buffer);
return -1;
}
free(buffer);
return 0;
}
static int process_signals(void)
{
if (sig_exit_handler_called) {
log_info("caught signal %d", sig_exit_handler_sig);
return 1;
}
if (sig_usr1_handler_called) {
sig_usr1_handler_called = 0;
tickets_log_info(booth_conf);
}
if (sig_chld_handler_called) {
sig_chld_handler_called = 0;
wait_child(SIGCHLD);
}
return 0;
}
static int loop(int fd)
{
workfn_t workfn;
void (*deadfn) (int ci);
int rv, i;
rv = setup_transport();
if (rv < 0)
goto fail;
rv = setup_ticket(booth_conf);
if (rv < 0) {
goto fail;
}
rv = write_daemon_state(fd, BOOTHD_STARTED);
if (rv != 0) {
log_error("write daemon state %d to lockfile error %s: %s",
BOOTHD_STARTED, cl.lockfile, strerror(errno));
goto fail;
}
log_info("BOOTH %s daemon started, node id is 0x%08X (%d).",
type_to_string(local->type),
local->site_id, local->site_id);
while (1) {
rv = poll(pollfds, client_maxi + 1, poll_timeout);
if (rv == -1 && errno == EINTR)
continue;
if (rv < 0) {
log_error("poll failed: %s (%d)", strerror(errno), errno);
goto fail;
}
for (i = 0; i <= client_maxi; i++) {
if (clients[i].fd < 0)
continue;
if (pollfds[i].revents & POLLIN) {
workfn = clients[i].workfn;
if (workfn) {
workfn(booth_conf, i);
}
}
if (pollfds[i].revents &
(POLLERR | POLLHUP | POLLNVAL)) {
deadfn = clients[i].deadfn;
if (deadfn)
deadfn(i);
}
}
process_tickets(booth_conf);
if (process_signals() != 0) {
return 0;
}
}
return 0;
fail:
return -1;
}
static int test_reply(cmd_result_t reply_code, cmd_request_t cmd)
{
int rv = 0;
const char *op_str = NULL;
if (cmd == CMD_GRANT)
op_str = "grant";
else if (cmd == CMD_REVOKE)
op_str = "revoke";
else if (cmd == CMD_LIST)
op_str = "list";
else if (cmd == CMD_PEERS)
op_str = "peers";
else {
log_error("internal error reading reply result!");
return -1;
}
switch (reply_code) {
case RLT_OVERGRANT:
log_info("You're granting a granted ticket. "
"If you wanted to migrate a ticket, "
"use revoke first, then use grant.");
rv = -1;
break;
case RLT_TICKET_IDLE:
log_info("ticket is not owned");
rv = 0;
break;
case RLT_ASYNC:
log_info("%s command sent, result will be returned "
"asynchronously. Please use \"booth list\" to "
"see the outcome.", op_str);
rv = 0;
break;
case RLT_CIB_PENDING:
log_info("%s succeeded (CIB commit pending)", op_str);
/* wait for the CIB commit? */
rv = (cl.options & OPT_WAIT_COMMIT) ? 3 : 0;
break;
case RLT_MORE:
rv = 2;
break;
case RLT_SYNC_SUCC:
case RLT_SUCCESS:
if (cmd != CMD_LIST && cmd != CMD_PEERS)
log_info("%s succeeded!", op_str);
rv = 0;
break;
case RLT_SYNC_FAIL:
log_info("%s failed!", op_str);
rv = -1;
break;
case RLT_INVALID_ARG:
log_error("ticket \"%s\" does not exist",
cl.msg.ticket.id);
rv = -1;
break;
case RLT_AUTH:
log_error("authentication error");
rv = -1;
break;
case RLT_EXT_FAILED:
log_error("before-acquire-handler for ticket \"%s\" failed, grant denied",
cl.msg.ticket.id);
rv = -1;
break;
case RLT_ATTR_PREREQ:
log_error("attr-prereq for ticket \"%s\" failed, grant denied",
cl.msg.ticket.id);
rv = -1;
break;
case RLT_REDIRECT:
/* talk to another site */
rv = 1;
break;
default:
log_error("got an error code: %x", rv);
rv = -1;
}
return rv;
}
static int query_get_string_answer(cmd_request_t cmd)
{
struct booth_site *site;
struct boothc_hdr_msg reply;
struct boothc_header *header;
char *data;
int data_len;
int rv;
struct booth_transport const *tpt;
int (*test_reply_f) (cmd_result_t reply_code, cmd_request_t cmd);
size_t msg_size;
void *request;
if (cl.type == GEOSTORE) {
test_reply_f = test_attr_reply;
msg_size = sizeof(cl.attr_msg);
request = &cl.attr_msg;
} else {
test_reply_f = test_reply;
msg_size = sizeof(cl.msg);
request = &cl.msg;
}
header = (struct boothc_header *)request;
data = NULL;
init_header(header, cmd, 0, cl.options, 0, 0, msg_size);
if (!*cl.site)
site = local;
else if (!find_site_by_name(booth_conf, cl.site, &site, 1)) {
log_error("cannot find site \"%s\"", cl.site);
rv = ENOENT;
goto out;
}
tpt = booth_transport + TCP;
rv = tpt->open(site);
if (rv < 0)
goto out_close;
- rv = tpt->send(site, request, msg_size);
+ rv = tpt->send(booth_conf, site, request, msg_size);
if (rv < 0)
goto out_close;
rv = tpt->recv_auth(site, &reply, sizeof(reply));
if (rv < 0)
goto out_close;
data_len = ntohl(reply.header.length) - rv;
/* no attribute, or no ticket found */
if (!data_len) {
goto out_test_reply;
}
data = malloc(data_len+1);
if (!data) {
rv = -ENOMEM;
goto out_close;
}
rv = tpt->recv(site, data, data_len);
if (rv < 0)
goto out_close;
*(data + data_len) = '\0';
(void)fputs(data, stdout);
fflush(stdout);
out_test_reply:
rv = test_reply_f(ntohl(reply.header.result), cmd);
out_close:
tpt->close(site);
out:
if (data)
free(data);
return rv;
}
static int do_command(cmd_request_t cmd)
{
struct booth_site *site;
struct boothc_ticket_msg reply;
struct booth_transport const *tpt;
uint32_t leader_id;
int rv;
int reply_cnt = 0, msg_logged = 0;
const char *op_str = "";
if (cmd == CMD_GRANT)
op_str = "grant";
else if (cmd == CMD_REVOKE)
op_str = "revoke";
rv = -1;
site = NULL;
/* Always use TCP for client - at least for now. */
tpt = booth_transport + TCP;
if (!*cl.site)
site = local;
else {
if (!find_site_by_name(booth_conf, cl.site, &site, 1)) {
log_error("Site \"%s\" not configured.", cl.site);
goto out_close;
}
}
if (site->type == ARBITRATOR) {
if (site == local) {
log_error("We're just an arbitrator, cannot grant/revoke tickets here.");
} else {
log_error("%s is just an arbitrator, cannot grant/revoke tickets there.", cl.site);
}
goto out_close;
}
assert(site->type == SITE);
/* We don't check for existence of ticket, so that asking can be
* done without local configuration, too.
* Although, that means that the UDP port has to be specified, too. */
if (!cl.msg.ticket.id[0]) {
/* If the loaded configuration has only a single ticket defined, use that. */
if (booth_conf->ticket_count == 1) {
strncpy(cl.msg.ticket.id, booth_conf->ticket[0].name,
sizeof(cl.msg.ticket.id));
} else {
log_error("No ticket given.");
goto out_close;
}
}
redirect:
init_header(&cl.msg.header, cmd, 0, cl.options, 0, 0, sizeof(cl.msg));
rv = tpt->open(site);
if (rv < 0)
goto out_close;
- rv = tpt->send(site, &cl.msg, sendmsglen(&cl.msg));
- if (rv < 0)
+ rv = tpt->send(booth_conf, site, &cl.msg, sendmsglen(&cl.msg));
+ if (rv < 0) {
goto out_close;
+ }
read_more:
rv = tpt->recv_auth(site, &reply, sizeof(reply));
if (rv < 0) {
/* print any errors depending on the code sent by the
* server */
(void)test_reply(ntohl(reply.header.result), cmd);
goto out_close;
}
rv = test_reply(ntohl(reply.header.result), cmd);
if (rv == 1) {
tpt->close(site);
leader_id = ntohl(reply.ticket.leader);
if (!find_site_by_id(booth_conf, leader_id, &site)) {
log_error("Message with unknown redirect site %x received", leader_id);
rv = -1;
goto out_close;
}
goto redirect;
} else if (rv == 2 || rv == 3) {
/* the server has more to say */
/* don't wait too long */
if (reply_cnt > 1 && !(cl.options & OPT_WAIT)) {
rv = 0;
log_info("Giving up on waiting for the definite result. "
"Please use \"booth list\" later to "
"see the outcome.");
goto out_close;
}
if (reply_cnt == 0) {
log_info("%s request sent, "
"waiting for the result ...", op_str);
msg_logged++;
} else if (rv == 3 && msg_logged < 2) {
log_info("waiting for the CIB commit ...");
msg_logged++;
}
reply_cnt++;
goto read_more;
}
out_close:
if (site)
tpt->close(site);
return rv;
}
static int _lockfile(int mode, int *fdp, pid_t *locked_by)
{
struct flock lock;
int fd, rv;
/* After reboot the directory may not yet exist.
* Try to create it, but ignore errors. */
if (strncmp(cl.lockfile, BOOTH_RUN_DIR,
strlen(BOOTH_RUN_DIR)) == 0)
(void)mkdir(BOOTH_RUN_DIR, 0775);
if (locked_by)
*locked_by = 0;
*fdp = -1;
fd = open(cl.lockfile, mode, 0664);
if (fd < 0)
return errno;
*fdp = fd;
lock.l_type = F_WRLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
lock.l_pid = 0;
if (fcntl(fd, F_SETLK, &lock) == 0)
return 0;
rv = errno;
if (locked_by)
if (fcntl(fd, F_GETLK, &lock) == 0)
*locked_by = lock.l_pid;
return rv;
}
static inline int is_root(void)
{
return geteuid() == 0;
}
static int create_lockfile(void)
{
int rv, fd;
fd = -1;
rv = _lockfile(O_CREAT | O_WRONLY, &fd, NULL);
if (fd == -1) {
log_error("lockfile %s open error %d: %s",
cl.lockfile, rv, strerror(rv));
return -1;
}
if (rv < 0) {
log_error("lockfile %s setlk error %d: %s",
cl.lockfile, rv, strerror(rv));
goto fail;
}
rv = write_daemon_state(fd, BOOTHD_STARTING);
if (rv != 0) {
log_error("write daemon state %d to lockfile error %s: %s",
BOOTHD_STARTING, cl.lockfile, strerror(errno));
goto fail;
}
if (is_root()) {
if (fchown(fd, booth_conf->uid, booth_conf->gid) < 0)
log_error("fchown() on lockfile said %d: %s",
errno, strerror(errno));
}
return fd;
fail:
close(fd);
return -1;
}
static void unlink_lockfile(int fd)
{
unlink(cl.lockfile);
close(fd);
}
static void print_usage(void)
{
printf(
"Usage:\n"
" booth list [options]\n"
" booth {grant|revoke} [options] <ticket>\n"
" booth status [options]\n"
"\n"
" list: List all tickets\n"
" grant: Grant ticket to site\n"
" revoke: Revoke ticket\n"
"\n"
"Options:\n"
" -c FILE Specify config file [default " BOOTH_DEFAULT_CONF "]\n"
" Can be a path or just a name without \".conf\" suffix\n"
" -s <site> Connect/grant to a different site\n"
" -F Try to grant the ticket immediately\n"
" even if not all sites are reachable\n"
" For manual tickets:\n"
" grant a manual ticket even if it has been already granted\n"
" -w Wait forever for the outcome of the request\n"
" -C Wait until the ticket is committed to the CIB (grant only)\n"
" -h Print this help\n"
"\n"
"Examples:\n"
"\n"
" # booth list (list tickets)\n"
" # booth grant ticket-A (grant ticket here)\n"
" # booth grant -s 10.121.8.183 ticket-A (grant ticket to site 10.121.8.183)\n"
" # booth revoke ticket-A (revoke ticket)\n"
"\n"
"See the booth(8) man page for more details.\n"
);
}
#define OPTION_STRING "c:Dl:t:s:FhSwC"
#define ATTR_OPTION_STRING "c:Dt:s:h"
void safe_copy(char *dest, char *value, size_t buflen, const char *description) {
int content_len = buflen - 1;
if (strlen(value) >= content_len) {
fprintf(stderr, "'%s' exceeds maximum %s length of %d\n",
value, description, content_len);
exit(EXIT_FAILURE);
}
strncpy(dest, value, content_len);
dest[content_len] = 0;
}
static int host_convert(char *hostname, char *ip_str, size_t ip_size)
{
struct addrinfo *result = NULL, hints = {0};
int re = -1;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM;
re = getaddrinfo(hostname, NULL, &hints, &result);
if (re == 0) {
struct in_addr addr = ((struct sockaddr_in *)result->ai_addr)->sin_addr;
const char *re_ntop = inet_ntop(AF_INET, &addr, ip_str, ip_size);
if (re_ntop == NULL) {
re = -1;
}
}
freeaddrinfo(result);
return re;
}
#define cparg(dest, descr) do { \
if (optind >= argc) \
goto missingarg; \
safe_copy(dest, argv[optind], sizeof(dest), descr); \
optind++; \
} while(0)
static int read_arguments(int argc, char **argv)
{
int optchar;
char *arg1 = argv[1];
char *op = NULL;
char *cp;
const char *opt_string = OPTION_STRING;
char site_arg[INET_ADDRSTRLEN] = {0};
int left;
cl.type = 0;
if ((cp = strstr(argv[0], ATTR_PROG)) &&
!strcmp(cp, ATTR_PROG)) {
cl.type = GEOSTORE;
op = argv[1];
optind = 2;
opt_string = ATTR_OPTION_STRING;
} else if (argc > 1 && (strcmp(arg1, "arbitrator") == 0 ||
strcmp(arg1, "site") == 0 ||
strcmp(arg1, "start") == 0 ||
strcmp(arg1, "daemon") == 0)) {
cl.type = DAEMON;
optind = 2;
} else if (argc > 1 && (strcmp(arg1, "status") == 0)) {
cl.type = STATUS;
optind = 2;
} else if (argc > 1 && (strcmp(arg1, "client") == 0)) {
cl.type = CLIENT;
if (argc < 3) {
print_usage();
exit(EXIT_FAILURE);
}
op = argv[2];
optind = 3;
}
if (!cl.type) {
cl.type = CLIENT;
op = argv[1];
optind = 2;
}
if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") ||
!strcmp(arg1, "-h")) {
if (cl.type == GEOSTORE)
print_geostore_usage();
else
print_usage();
exit(EXIT_SUCCESS);
}
if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") ||
!strcmp(arg1, "-V")) {
printf("%s %s\n", argv[0], RELEASE_STR);
exit(EXIT_SUCCESS);
}
if (cl.type == CLIENT) {
if (!strcmp(op, "list"))
cl.op = CMD_LIST;
else if (!strcmp(op, "grant"))
cl.op = CMD_GRANT;
else if (!strcmp(op, "revoke"))
cl.op = CMD_REVOKE;
else if (!strcmp(op, "peers"))
cl.op = CMD_PEERS;
else {
fprintf(stderr, "client operation \"%s\" is unknown\n",
op);
exit(EXIT_FAILURE);
}
} else if (cl.type == GEOSTORE) {
if (!strcmp(op, "list"))
cl.op = ATTR_LIST;
else if (!strcmp(op, "set"))
cl.op = ATTR_SET;
else if (!strcmp(op, "get"))
cl.op = ATTR_GET;
else if (!strcmp(op, "delete"))
cl.op = ATTR_DEL;
else {
fprintf(stderr, "attribute operation \"%s\" is unknown\n",
op);
exit(EXIT_FAILURE);
}
}
while (optind < argc) {
optchar = getopt(argc, argv, opt_string);
switch (optchar) {
case 'c':
if (strchr(optarg, '/')) {
safe_copy(cl.configfile, optarg,
sizeof(cl.configfile), "config file");
} else {
/* If no "/" in there, use with default directory. */
strcpy(cl.configfile, BOOTH_DEFAULT_CONF_DIR);
cp = cl.configfile + strlen(BOOTH_DEFAULT_CONF_DIR);
assert(cp > cl.configfile);
assert(*(cp-1) == '/');
/* Write at the \0, ie. after the "/" */
safe_copy(cp, optarg,
(sizeof(cl.configfile) -
(cp - cl.configfile) -
strlen(BOOTH_DEFAULT_CONF_EXT)),
"config name");
/* If no extension, append ".conf".
* Space is available, see -strlen() above. */
if (!strchr(cp, '.'))
strcat(cp, BOOTH_DEFAULT_CONF_EXT);
}
break;
case 'D':
debug_level++;
break;
case 'S':
daemonize = 0;
enable_stderr = 1;
break;
case 'l':
safe_copy(cl.lockfile, optarg, sizeof(cl.lockfile), "lock file");
break;
case 't':
if (cl.op == CMD_GRANT || cl.op == CMD_REVOKE) {
safe_copy(cl.msg.ticket.id, optarg,
sizeof(cl.msg.ticket.id), "ticket name");
} else if (cl.type == GEOSTORE) {
safe_copy(cl.attr_msg.attr.tkt_id, optarg,
sizeof(cl.attr_msg.attr.tkt_id), "ticket name");
} else {
print_usage();
exit(EXIT_FAILURE);
}
break;
case 's':
/* For testing and debugging: allow "-s site" also for
* daemon start, so that the address that should be used
* can be set manually.
* This makes it easier to start multiple processes
* on one machine. */
if (cl.type == CLIENT || cl.type == GEOSTORE ||
(cl.type == DAEMON && debug_level)) {
if (strcmp(optarg, OTHER_SITE) &&
host_convert(optarg, site_arg, INET_ADDRSTRLEN) == 0) {
safe_copy(cl.site, site_arg, sizeof(cl.site), "site name");
} else {
safe_copy(cl.site, optarg, sizeof(cl.site), "site name");
}
} else {
log_error("\"-s\" not allowed in daemon mode.");
exit(EXIT_FAILURE);
}
break;
case 'F':
if (cl.type != CLIENT || cl.op != CMD_GRANT) {
log_error("use \"-F\" only for client grant");
exit(EXIT_FAILURE);
}
cl.options |= OPT_IMMEDIATE;
break;
case 'w':
if (cl.type != CLIENT ||
(cl.op != CMD_GRANT && cl.op != CMD_REVOKE)) {
log_error("use \"-w\" only for grant and revoke");
exit(EXIT_FAILURE);
}
cl.options |= OPT_WAIT;
break;
case 'C':
if (cl.type != CLIENT || cl.op != CMD_GRANT) {
log_error("use \"-C\" only for grant");
exit(EXIT_FAILURE);
}
cl.options |= OPT_WAIT | OPT_WAIT_COMMIT;
break;
case 'h':
if (cl.type == GEOSTORE)
print_geostore_usage();
else
print_usage();
exit(EXIT_SUCCESS);
break;
case ':':
case '?':
fprintf(stderr, "Please use '-h' for usage.\n");
exit(EXIT_FAILURE);
break;
case -1:
/* No more parameters on cmdline, only arguments. */
goto extra_args;
default:
goto unknown;
};
}
return 0;
extra_args:
if (cl.type == CLIENT && !cl.msg.ticket.id[0]) {
cparg(cl.msg.ticket.id, "ticket name");
} else if (cl.type == GEOSTORE) {
if (cl.op != ATTR_LIST) {
cparg(cl.attr_msg.attr.name, "attribute name");
}
if (cl.op == ATTR_SET) {
cparg(cl.attr_msg.attr.val, "attribute value");
}
}
if (optind == argc)
return 0;
left = argc - optind;
fprintf(stderr, "Superfluous argument%s: %s%s\n",
left == 1 ? "" : "s",
argv[optind],
left == 1 ? "" : "...");
exit(EXIT_FAILURE);
unknown:
fprintf(stderr, "unknown option: %s\n", argv[optind]);
exit(EXIT_FAILURE);
missingarg:
fprintf(stderr, "not enough arguments\n");
exit(EXIT_FAILURE);
}
static void set_scheduler(void)
{
struct sched_param sched_param;
struct rlimit rlimit;
int rv;
rlimit.rlim_cur = RLIM_INFINITY;
rlimit.rlim_max = RLIM_INFINITY;
rv = setrlimit(RLIMIT_MEMLOCK, &rlimit);
if (rv < 0) {
log_error("setrlimit failed");
} else {
rv = mlockall(MCL_CURRENT | MCL_FUTURE);
if (rv < 0) {
log_error("mlockall failed");
}
}
rv = sched_get_priority_max(SCHED_RR);
if (rv != -1) {
sched_param.sched_priority = rv;
rv = sched_setscheduler(0, SCHED_RR, &sched_param);
if (rv == -1)
log_error("could not set SCHED_RR priority %d: %s (%d)",
sched_param.sched_priority,
strerror(errno), errno);
} else {
log_error("could not get maximum scheduler priority err %d",
errno);
}
}
static int set_procfs_val(const char *path, const char *val)
{
int rc = -1;
FILE *fp = fopen(path, "w");
if (fp) {
if (fprintf(fp, "%s", val) > 0)
rc = 0;
fclose(fp);
}
return rc;
}
static int do_status(struct booth_config **conf, int type)
{
pid_t pid;
int rv, status_lock_fd, ret;
const char *reason = NULL;
char lockfile_data[1024], *cp;
assert(conf != NULL);
ret = PCMK_OCF_NOT_RUNNING;
rv = setup_config(conf, type);
if (rv) {
reason = "Error reading configuration.";
ret = PCMK_OCF_UNKNOWN_ERROR;
goto quit;
}
if (!local) {
reason = "No Service IP active here.";
goto quit;
}
rv = _lockfile(O_RDWR, &status_lock_fd, &pid);
if (status_lock_fd == -1) {
reason = "No PID file.";
goto quit;
}
if (rv == 0) {
close(status_lock_fd);
reason = "PID file not locked.";
goto quit;
}
if (pid) {
fprintf(stdout, "booth_lockpid=%d ", pid);
fflush(stdout);
}
rv = read(status_lock_fd, lockfile_data, sizeof(lockfile_data) - 1);
if (rv < 4) {
close(status_lock_fd);
reason = "Cannot read lockfile data.";
ret = PCMK_LSB_UNKNOWN_ERROR;
goto quit;
}
lockfile_data[rv] = 0;
close(status_lock_fd);
/* Make sure it's only a single line */
cp = strchr(lockfile_data, '\r');
if (cp)
*cp = 0;
cp = strchr(lockfile_data, '\n');
if (cp)
*cp = 0;
rv = setup_tcp_listener(1);
if (rv == 0) {
reason = "TCP port not in use.";
goto quit;
}
fprintf(stdout, "booth_lockfile='%s' %s\n",
cl.lockfile, lockfile_data);
if (!daemonize)
fprintf(stderr, "Booth at %s port %d seems to be running.\n",
local->addr_string, site_port(local));
return 0;
quit:
log_debug("not running: %s", reason);
/* Ie. "DEBUG" */
if (!daemonize)
fprintf(stderr, "not running: %s\n", reason);
return ret;
}
static int limit_this_process(void)
{
int rv;
if (!is_root())
return 0;
if (setregid(booth_conf->gid, booth_conf->gid) < 0) {
rv = errno;
log_error("setregid() didn't work: %s", strerror(rv));
return rv;
}
if (setreuid(booth_conf->uid, booth_conf->uid) < 0) {
rv = errno;
log_error("setreuid() didn't work: %s", strerror(rv));
return rv;
}
return 0;
}
static int lock_fd = -1;
static void server_exit(void)
{
int rv;
if (lock_fd >= 0) {
/* We might not be able to delete it, but at least
* make it empty. */
rv = ftruncate(lock_fd, 0);
(void)rv;
unlink_lockfile(lock_fd);
}
log_info("exiting");
}
static void sig_exit_handler(int sig)
{
sig_exit_handler_sig = sig;
sig_exit_handler_called = 1;
}
static void sig_usr1_handler(int sig)
{
sig_usr1_handler_called = 1;
}
static void sig_chld_handler(int sig)
{
sig_chld_handler_called = 1;
}
static int do_server(struct booth_config **conf, int type)
{
int rv = -1;
static char log_ent[128] = DAEMON_NAME "-";
assert(conf != NULL);
rv = setup_config(conf, type);
if (rv < 0) {
return rv;
}
if (!local) {
log_error("Cannot find myself in the configuration.");
exit(EXIT_FAILURE);
}
if (daemonize) {
if (daemon(0, 0) < 0) {
perror("daemon error");
exit(EXIT_FAILURE);
}
}
/*
* Register signal and exit handler
*/
signal(SIGUSR1, (__sighandler_t)sig_usr1_handler);
signal(SIGTERM, (__sighandler_t)sig_exit_handler);
signal(SIGINT, (__sighandler_t)sig_exit_handler);
/* we'll handle errors there and then */
signal(SIGPIPE, SIG_IGN);
atexit(server_exit);
/* The lockfile must be written to _after_ the call to daemon(), so
* that the lockfile contains the pid of the daemon, not the parent. */
lock_fd = create_lockfile();
if (lock_fd < 0)
return lock_fd;
strcat(log_ent, type_to_string(local->type));
cl_log_set_entity(log_ent);
cl_log_enable_stderr(enable_stderr ? TRUE : FALSE);
cl_log_set_facility(HA_LOG_FACILITY);
cl_inherit_logging_environment(0);
log_info("BOOTH %s %s daemon is starting",
type_to_string(local->type), RELEASE_STR);
set_scheduler();
/* we don't want to be killed by the OOM-killer */
if (set_procfs_val("/proc/self/oom_score_adj", "-999"))
(void)set_procfs_val("/proc/self/oom_adj", "-16");
set_proc_title("%s %s %s for [%s]:%d",
DAEMON_NAME, cl.configfile, type_to_string(local->type),
local->addr_string, site_port(local));
rv = limit_this_process();
if (rv)
return rv;
#ifdef COREDUMP_NURSING
if (cl_enable_coredumps(TRUE) < 0){
log_error("enabling core dump failed");
}
cl_cdtocoredir();
prctl(PR_SET_DUMPABLE, (unsigned long)TRUE, 0UL, 0UL, 0UL);
#else
if (chdir(BOOTH_CORE_DIR) < 0) {
log_error("cannot change working directory to %s", BOOTH_CORE_DIR);
}
#endif
signal(SIGCHLD, (__sighandler_t)sig_chld_handler);
rv = loop(lock_fd);
return rv;
}
static int do_client(struct booth_config **conf)
{
int rv;
rv = setup_config(conf, CLIENT);
if (rv < 0) {
log_error("cannot read config");
goto out;
}
switch (cl.op) {
case CMD_LIST:
case CMD_PEERS:
rv = query_get_string_answer(cl.op);
break;
case CMD_GRANT:
case CMD_REVOKE:
rv = do_command(cl.op);
break;
}
out:
return rv;
}
static int do_attr(struct booth_config **conf)
{
int rv = -1;
assert(conf != NULL);
rv = setup_config(conf, GEOSTORE);
if (rv < 0) {
log_error("cannot read config");
goto out;
}
/* We don't check for existence of ticket, so that asking can be
* done without local configuration, too.
* Although, that means that the UDP port has to be specified, too. */
if (!cl.attr_msg.attr.tkt_id[0]) {
/* If the loaded configuration has only a single ticket defined, use that. */
if ((*conf)->ticket_count == 1) {
strncpy(cl.attr_msg.attr.tkt_id,
(*conf)->ticket[0].name,
sizeof(cl.attr_msg.attr.tkt_id));
} else {
rv = 1;
log_error("No ticket given.");
goto out;
}
}
switch (cl.op) {
case ATTR_LIST:
case ATTR_GET:
rv = query_get_string_answer(cl.op);
break;
case ATTR_SET:
case ATTR_DEL:
rv = do_attr_command(booth_conf, cl.op);
break;
}
out:
return rv;
}
int main(int argc, char *argv[], char *envp[])
{
int rv;
const char *cp;
#ifdef LOGGING_LIBQB
enum qb_log_target_slot i;
#endif
init_set_proc_title(argc, argv, envp);
get_time(&start_time);
memset(&cl, 0, sizeof(cl));
strncpy(cl.configfile,
BOOTH_DEFAULT_CONF, BOOTH_PATH_LEN - 1);
cl.lockfile[0] = 0;
debug_level = 0;
cp = ((cp = strstr(argv[0], ATTR_PROG)) && !strcmp(cp, ATTR_PROG)
? ATTR_PROG
: "booth");
#ifndef LOGGING_LIBQB
cl_log_set_entity(cp);
#else
qb_log_init(cp, LOG_USER, LOG_DEBUG); /* prio driven by debug_level */
for (i = QB_LOG_TARGET_START; i < QB_LOG_TARGET_MAX; i++) {
if (i == QB_LOG_SYSLOG || i == QB_LOG_BLACKBOX)
continue;
qb_log_format_set(i, "%t %H %N: [%P]: %p: %b");
}
(void) qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
QB_LOG_FILTER_FILE, "*", LOG_DEBUG);
#endif
cl_log_enable_stderr(TRUE);
cl_log_set_facility(0);
rv = read_arguments(argc, argv);
if (rv < 0)
goto out;
switch (cl.type) {
case STATUS:
rv = do_status(&booth_conf, cl.type);
break;
case ARBITRATOR:
case DAEMON:
case SITE:
rv = do_server(&booth_conf, cl.type);
break;
case CLIENT:
rv = do_client(&booth_conf);
break;
case GEOSTORE:
rv = do_attr(&booth_conf);
break;
}
out:
#if HAVE_LIBGNUTLS
gnutls_global_deinit();
#endif
#ifdef LOGGING_LIBQB
qb_log_fini();
#endif
/* Normalize values. 0x100 would be seen as "OK" by waitpid(). */
return (rv >= 0 && rv < 0x70) ? rv : 1;
}
diff --git a/src/manual.c b/src/manual.c
index 5fc5eeb..fe6ae0b 100644
--- a/src/manual.c
+++ b/src/manual.c
@@ -1,108 +1,110 @@
/*
* Copyright (C) 2017 Chris Kowalczyk <ckowalczyk@suse.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "manual.h"
#include "config.h"
#include "transport.h"
#include "ticket.h"
#include "log.h"
#include "request.h"
/* For manual tickets, manual_selection function is an equivalent
* of new_election function used for assigning automatic tickets.
* The workflow here is much simplier, as no voting is performed,
* and the current node doesn't have to wait for any responses
* from other sites.
*/
-int manual_selection(struct ticket_config *tk,
- struct booth_site *preference, int update_term, cmd_reason_t reason)
+int manual_selection(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *preference, int update_term,
+ cmd_reason_t reason)
{
- if (local->type != SITE)
+ if (local->type != SITE) {
return 0;
+ }
tk_log_debug("starting manual selection (caused by %s %s)",
state_to_string(reason),
reason == OR_AGAIN ? state_to_string(tk->election_reason) : "" );
// Manual selection is done without any delay, the leader is assigned
set_leader(tk, local);
set_state(tk, ST_LEADER);
// Manual tickets never expire, we don't specify expiration time
// Make sure that election_end field is empty
time_reset(&tk->election_end);
// Make sure that delay commit is empty, as manual tickets don't
// wait for any kind of confirmation from other nodes
time_reset(&tk->delay_commit);
save_committed_tkt(tk);
// Inform others about the new leader
- ticket_broadcast(tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0);
+ ticket_broadcast(conf, tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0);
tk->ticket_updated = 0;
return 0;
}
/* This function is called for manual tickets that were
* revoked from another site, which this site doesn't
* consider as a leader.
*/
-int process_REVOKE_for_manual_ticket (
- struct ticket_config *tk,
- struct booth_site *sender,
- struct boothc_ticket_msg *msg)
+int process_REVOKE_for_manual_ticket(struct booth_config *conf,
+ struct ticket_config *tk,
+ struct booth_site *sender,
+ struct boothc_ticket_msg *msg)
{
int rv;
// For manual tickets, we may end up having two leaders.
// If one of them is revoked, it will send information
// to all members of the GEO cluster.
-
+
// We may find ourselves here if this particular site
// has not been following the leader which had been revoked
// (and which had sent this message).
// We send the ACK, to satisfy the requestor.
- rv = send_msg(OP_ACK, tk, sender, msg);
+ rv = send_msg(conf, OP_ACK, tk, sender, msg);
// Mark this ticket as not granted to the sender anymore.
mark_ticket_as_revoked(tk, sender);
-
+
if (tk->state == ST_LEADER) {
tk_log_warn("%s wants to revoke ticket, "
"but this site is itself a leader",
site_string(sender));
// Because another leader is presumably stepping down,
// let's notify other sites that now we are the only leader.
- ticket_broadcast(tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0);
+ ticket_broadcast(conf, tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0);
} else {
tk_log_warn("%s wants to revoke ticket, "
"but this site is not following it",
site_string(sender));
}
return rv;
}
diff --git a/src/manual.h b/src/manual.h
index 923e116..db1748d 100644
--- a/src/manual.h
+++ b/src/manual.h
@@ -1,35 +1,36 @@
/*
* Copyright (C) 2017 Chris Kowalczyk <ckowalczyk@suse.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _MANUAL_H
#define _MANUAL_H
#include "booth.h"
struct ticket_config;
-int manual_selection(struct ticket_config *tk,
- struct booth_site *new_leader, int update_term, cmd_reason_t reason);
+int manual_selection(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *new_leader, int update_term,
+ cmd_reason_t reason);
-int process_REVOKE_for_manual_ticket (
- struct ticket_config *tk,
- struct booth_site *sender,
- struct boothc_ticket_msg *msg);
+int process_REVOKE_for_manual_ticket(struct booth_config *conf,
+ struct ticket_config *tk,
+ struct booth_site *sender,
+ struct boothc_ticket_msg *msg);
#endif /* _MANUAL_H */
diff --git a/src/raft.c b/src/raft.c
index ec0cf99..a91e204 100644
--- a/src/raft.c
+++ b/src/raft.c
@@ -1,1011 +1,997 @@
/*
* Copyright (C) 2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include <stdlib.h>
#include <inttypes.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include "booth.h"
#include "timer.h"
#include "config.h"
#include "transport.h"
#include "inline-fn.h"
#include "raft.h"
#include "ticket.h"
#include "request.h"
#include "log.h"
#include "manual.h"
inline static void clear_election(struct ticket_config *tk)
{
int i;
struct booth_site *site;
tk_log_debug("clear election");
tk->votes_received = 0;
_FOREACH_NODE(i, site) {
tk->votes_for[site->index] = NULL;
}
}
inline static void record_vote(struct ticket_config *tk,
struct booth_site *who,
struct booth_site *vote)
{
tk_log_debug("site %s votes for %s",
site_string(who),
site_string(vote));
if (!tk->votes_for[who->index]) {
tk->votes_for[who->index] = vote;
tk->votes_received |= who->bitmask;
} else {
if (tk->votes_for[who->index] != vote)
tk_log_warn("%s voted previously "
"for %s and now wants to vote for %s (ignored)",
site_string(who),
site_string(tk->votes_for[who->index]),
site_string(vote));
}
}
static void update_term_from_msg(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
uint32_t i;
i = ntohl(msg->ticket.term);
/* if we failed to start the election, then accept the term
* from the leader
* */
if (tk->state == ST_CANDIDATE) {
tk->current_term = i;
} else {
tk->current_term = max(i, tk->current_term);
}
}
static void set_ticket_expiry(struct ticket_config *tk,
int duration)
{
set_future_time(&tk->term_expires, duration);
}
static void update_ticket_from_msg(struct ticket_config *tk,
struct booth_site *sender,
struct boothc_ticket_msg *msg)
{
int duration;
tk_log_info("updating from %s (%d/%d)",
site_string(sender),
ntohl(msg->ticket.term), msg_term_time(msg));
duration = min(tk->term_duration, msg_term_time(msg));
set_ticket_expiry(tk, duration);
update_term_from_msg(tk, msg);
}
static void copy_ticket_from_msg(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
set_ticket_expiry(tk, msg_term_time(msg));
tk->current_term = ntohl(msg->ticket.term);
}
static void become_follower(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
copy_ticket_from_msg(tk, msg);
set_state(tk, ST_FOLLOWER);
time_reset(&tk->delay_commit);
tk->in_election = 0;
/* if we're following and the ticket was granted here
* then commit to CIB right away (we're probably restarting)
*/
if (tk->is_granted) {
disown_ticket(tk);
ticket_write(tk);
}
}
-static void won_elections(struct ticket_config *tk)
+static void won_elections(struct booth_config *conf, struct ticket_config *tk)
{
set_leader(tk, local);
set_state(tk, ST_LEADER);
set_ticket_expiry(tk, tk->term_duration);
time_reset(&tk->election_end);
tk->voted_for = NULL;
if (is_time_set(&tk->delay_commit) && all_sites_replied(tk)) {
time_reset(&tk->delay_commit);
tk_log_debug("reset delay commit as all sites replied");
}
save_committed_tkt(tk);
- ticket_broadcast(tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0);
+ ticket_broadcast(conf, tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0);
tk->ticket_updated = 0;
}
/* if more than one member got the same (and maximum within that
* election) number of votes, then that is a tie
*/
static int is_tie(struct ticket_config *tk)
{
int i;
struct booth_site *v;
int count[MAX_NODES] = { 0, };
int max_votes = 0, max_cnt = 0;
for (i = 0; i < booth_conf->site_count; i++) {
v = tk->votes_for[i];
if (!v)
continue;
count[v->index]++;
max_votes = max(max_votes, count[v->index]);
}
for (i = 0; i < booth_conf->site_count; i++) {
if (count[i] == max_votes)
max_cnt++;
}
return max_cnt > 1;
}
static struct booth_site *majority_votes(struct ticket_config *tk)
{
int i, n;
struct booth_site *v;
int count[MAX_NODES] = { 0, };
for (i = 0; i < booth_conf->site_count; i++) {
v = tk->votes_for[i];
if (!v || v == no_leader)
continue;
n = v->index;
count[n]++;
tk_log_debug("Majority: %d %s wants %d %s => %d",
i, site_string(&booth_conf->site[i]),
n, site_string(v),
count[n]);
if (count[n]*2 <= booth_conf->site_count)
continue;
tk_log_debug("Majority reached: %d of %d for %s",
count[n], booth_conf->site_count,
site_string(v));
return v;
}
return NULL;
}
-void elections_end(struct ticket_config *tk)
+void elections_end(struct booth_config *conf, struct ticket_config *tk)
{
struct booth_site *new_leader;
if (is_past(&tk->election_end)) {
/* This is previous election timed out */
tk_log_info("elections finished");
}
tk->in_election = 0;
new_leader = majority_votes(tk);
if (new_leader == local) {
- won_elections(tk);
+ won_elections(conf, tk);
tk_log_info("granted successfully here");
} else if (new_leader) {
tk_log_info("ticket granted at %s",
site_string(new_leader));
} else {
tk_log_info("nobody won elections, new elections");
tk->outcome = RLT_MORE;
- foreach_tkt_req(tk, notify_client);
- if (!new_election(tk, NULL, is_tie(tk) ? 2 : 0, OR_AGAIN)) {
+ foreach_tkt_req(conf, tk, notify_client);
+ if (!new_election(conf, tk, NULL, is_tie(tk) ? 2 : 0, OR_AGAIN)) {
ticket_activate_timeout(tk);
}
}
}
static int newer_term(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg,
int in_election)
{
uint32_t term;
/* it may happen that we hear about our newer term */
if (leader == local)
return 0;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (term > tk->current_term) {
set_state(tk, ST_FOLLOWER);
if (!in_election) {
set_leader(tk, leader);
tk_log_info("from %s: higher term %d vs. %d, following %s",
site_string(sender),
term, tk->current_term,
ticket_leader_string(tk));
} else {
tk_log_debug("from %s: higher term %d vs. %d (election)",
site_string(sender),
term, tk->current_term);
}
tk->current_term = term;
return 1;
}
return 0;
}
static int msg_term_invalid(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg)
{
uint32_t term;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (is_term_invalid(tk, term)) {
tk_log_info("got invalid term from %s "
"(%d), ignoring", site_string(sender), term);
return 1;
}
return 0;
}
-static int term_too_low(struct ticket_config *tk,
- struct booth_site *sender,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg)
+static int term_too_low(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *sender, struct booth_site *leader,
+ struct boothc_ticket_msg *msg)
{
uint32_t term;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (term < tk->current_term) {
tk_log_info("sending reject to %s, its term too low "
"(%d vs. %d)", site_string(sender),
term, tk->current_term
);
- send_reject(sender, tk, RLT_TERM_OUTDATED, msg);
+ send_reject(conf, sender, tk, RLT_TERM_OUTDATED, msg);
return 1;
}
return 0;
}
/* For follower. */
-static int answer_HEARTBEAT (
- struct ticket_config *tk,
- struct booth_site *sender,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg
- )
+static int answer_HEARTBEAT(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *sender, struct booth_site *leader,
+ struct boothc_ticket_msg *msg)
{
uint32_t term;
term = ntohl(msg->ticket.term);
tk_log_debug("heartbeat from leader: %s, have %s; term %d vs %d",
site_string(leader), ticket_leader_string(tk),
term, tk->current_term);
if (term < tk->current_term) {
if (sender == tk->leader) {
tk_log_info("trusting leader %s with a lower term (%d vs %d)",
site_string(leader), term, tk->current_term);
} else if (is_owned(tk)) {
tk_log_warn("different leader %s with a lower term "
"(%d vs %d), sending reject",
site_string(leader), term, tk->current_term);
- return send_reject(sender, tk, RLT_TERM_OUTDATED, msg);
+ return send_reject(conf, sender, tk, RLT_TERM_OUTDATED,
+ msg);
}
}
/* got heartbeat, no rejects expected anymore */
tk->expect_more_rejects = 0;
/* Needed? */
newer_term(tk, sender, leader, msg, 0);
become_follower(tk, msg);
/* Racy??? */
assert(sender == leader || !leader);
set_leader(tk, leader);
/* Ack the heartbeat (we comply). */
- return send_msg(OP_ACK, tk, sender, msg);
+ return send_msg(conf, OP_ACK, tk, sender, msg);
}
-static int process_UPDATE (
- struct ticket_config *tk,
- struct booth_site *sender,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg
- )
+static int process_UPDATE (struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *sender, struct booth_site *leader,
+ struct boothc_ticket_msg *msg)
{
if (is_owned(tk) && sender != tk->leader) {
tk_log_warn("different leader %s wants to update "
"our ticket, sending reject",
site_string(leader));
mark_ticket_as_granted(tk, sender);
- return send_reject(sender, tk, RLT_TERM_OUTDATED, msg);
+ return send_reject(conf, sender, tk, RLT_TERM_OUTDATED, msg);
}
tk_log_debug("leader %s wants to update our ticket",
site_string(leader));
become_follower(tk, msg);
set_leader(tk, leader);
ticket_write(tk);
/* run ticket_cron if the ticket expires */
set_ticket_wakeup(tk);
- return send_msg(OP_ACK, tk, sender, msg);
+ return send_msg(conf, OP_ACK, tk, sender, msg);
}
-static int process_REVOKE (
- struct ticket_config *tk,
- struct booth_site *sender,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg
- )
+static int process_REVOKE(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *sender, struct booth_site *leader,
+ struct boothc_ticket_msg *msg)
{
int rv;
if (tk->state == ST_INIT && tk->leader == no_leader) {
/* assume that our ack got lost */
- rv = send_msg(OP_ACK, tk, sender, msg);
+ rv = send_msg(conf, OP_ACK, tk, sender, msg);
} else if (tk->leader != sender) {
if (!is_manual(tk)) {
tk_log_error("%s wants to revoke ticket, "
"but it is not granted there (ignoring)",
site_string(sender));
return -1;
} else {
- rv = process_REVOKE_for_manual_ticket(tk, sender, msg);
-
+ rv = process_REVOKE_for_manual_ticket(conf, tk, sender, msg);
+
// Ticket data stored in this site is not modified. This means
// that this site will still follow another leader (the one which
// has not been revoked) or be a leader itself.
}
} else if (tk->state != ST_FOLLOWER) {
tk_log_error("unexpected ticket revoke from %s "
"(in state %s) (ignoring)",
site_string(sender),
state_to_string(tk->state));
return -1;
} else {
tk_log_info("%s revokes ticket",
site_string(tk->leader));
save_committed_tkt(tk);
reset_ticket_and_set_no_leader(tk);
ticket_write(tk);
- rv = send_msg(OP_ACK, tk, sender, msg);
+ rv = send_msg(conf, OP_ACK, tk, sender, msg);
}
return rv;
}
/* For leader. */
-static int process_ACK(
- struct ticket_config *tk,
- struct booth_site *sender,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg
- )
+static int process_ACK(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *sender, struct booth_site *leader,
+ struct boothc_ticket_msg *msg)
{
uint32_t term;
int req;
term = ntohl(msg->ticket.term);
if (newer_term(tk, sender, leader, msg, 0)) {
/* unexpected higher term */
tk_log_warn("got higher term from %s (%d vs. %d)",
site_string(sender),
term, tk->current_term);
return 0;
}
/* Don't send a reject. */
if (term < tk->current_term) {
/* Doesn't know what he's talking about - perhaps
* doesn't receive our packets? */
tk_log_warn("unexpected term "
"from %s (%d vs. %d) (ignoring)",
site_string(sender),
term, tk->current_term);
return 0;
}
/* if the ticket is to be revoked, further processing is not
* interesting (and dangerous) */
if (tk->next_state == ST_INIT || tk->state == ST_INIT)
return 0;
req = ntohl(msg->header.request);
if ((req == OP_UPDATE || req == OP_HEARTBEAT) &&
term == tk->current_term &&
leader == tk->leader) {
if (majority_of_bits(tk, tk->acks_received)) {
/* OK, at least half of the nodes are reachable;
* Update the ticket and send update messages out
*/
- return leader_update_ticket(tk);
+ return leader_update_ticket(conf, tk);
}
}
return 0;
}
-static int process_VOTE_FOR(
- struct ticket_config *tk,
- struct booth_site *sender,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg
- )
+static int process_VOTE_FOR(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *sender, struct booth_site *leader,
+ struct boothc_ticket_msg *msg)
{
if (leader == no_leader) {
/* leader wants to step down? */
if (sender == tk->leader &&
(tk->state == ST_FOLLOWER || tk->state == ST_CANDIDATE)) {
tk_log_info("%s wants to give the ticket away (ticket release)",
site_string(tk->leader));
save_committed_tkt(tk);
reset_ticket(tk);
set_state(tk, ST_FOLLOWER);
if (local->type == SITE) {
ticket_write(tk);
schedule_election(tk, OR_STEPDOWN);
}
} else {
tk_log_info("%s votes for none, ignoring (duplicate ticket release?)",
site_string(sender));
}
return 0;
}
if (tk->state != ST_CANDIDATE) {
/* lost candidate status, somebody rejected our proposal */
tk_log_info("candidate status lost, ignoring VtFr from %s",
site_string(sender));
return 0;
}
- if (term_too_low(tk, sender, leader, msg))
+ if (term_too_low(conf, tk, sender, leader, msg)) {
return 0;
+ }
if (newer_term(tk, sender, leader, msg, 0)) {
clear_election(tk);
}
record_vote(tk, sender, leader);
/* only if all voted can we take the ticket now, otherwise
* wait for timeout in ticket_cron */
if (!tk->acks_expected) {
/* §5.2 */
- elections_end(tk);
+ elections_end(conf, tk);
}
return 0;
}
static int process_REJECTED(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t rv;
rv = ntohl(msg->header.result);
if (tk->state == ST_CANDIDATE &&
leader == local) {
/* the sender has us as the leader (!)
* the elections will time out, then we can try again
*/
tk_log_warn("ticket was granted to us "
"(and we didn't know)");
tk->expect_more_rejects = 1;
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_OUTDATED) {
tk_log_warn("ticket outdated (term %d), granted to %s",
ntohl(msg->ticket.term),
site_string(leader)
);
set_leader(tk, leader);
tk->expect_more_rejects = 1;
become_follower(tk, msg);
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_STILL_VALID) {
if (tk->lost_leader == leader) {
if (tk->election_reason == OR_TKT_LOST) {
tk_log_warn("%s still has the ticket valid, "
"we'll backup a bit",
site_string(sender));
} else {
tk_log_warn("%s unexpectedly rejects elections",
site_string(sender));
}
} else {
tk_log_warn("ticket was granted to %s "
"(and we didn't know)",
site_string(leader));
}
set_leader(tk, leader);
become_follower(tk, msg);
tk->expect_more_rejects = 1;
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_YOU_OUTDATED) {
set_leader(tk, leader);
tk->expect_more_rejects = 1;
if (leader && leader != no_leader) {
tk_log_warn("our ticket is outdated, granted to %s",
site_string(leader));
become_follower(tk, msg);
} else {
tk_log_warn("our ticket is outdated and revoked");
update_ticket_from_msg(tk, sender, msg);
set_state(tk, ST_INIT);
}
return 0;
}
if (!tk->expect_more_rejects) {
tk_log_warn("from %s: in state %s, got %s (unexpected reject)",
site_string(sender),
state_to_string(tk->state),
state_to_string(rv));
}
return 0;
}
static int ticket_seems_ok(struct ticket_config *tk)
{
int left;
left = term_time_left(tk);
if (!left)
return 0; /* quite sure */
if (tk->state == ST_CANDIDATE)
return 0; /* in state of flux */
if (tk->state == ST_LEADER)
return 1; /* quite sure */
if (tk->state == ST_FOLLOWER &&
left >= tk->term_duration/3)
return 1; /* almost quite sure */
return 0;
}
static int test_reason(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int reason;
reason = ntohl(msg->header.reason);
if (reason == OR_TKT_LOST) {
if (tk->state == ST_INIT &&
tk->leader == no_leader) {
tk_log_warn("%s claims that the ticket is lost, "
"but it's in %s state (reject sent)",
site_string(sender),
state_to_string(tk->state)
);
return RLT_YOU_OUTDATED;
}
if (ticket_seems_ok(tk)) {
tk_log_warn("%s claims that the ticket is lost, "
"but it is ok here (reject sent)",
site_string(sender));
return RLT_TERM_STILL_VALID;
}
}
return 0;
}
/* §5.2 */
-static int answer_REQ_VOTE(
- struct ticket_config *tk,
- struct booth_site *sender,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg
- )
+static int answer_REQ_VOTE(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *sender, struct booth_site *leader,
+ struct boothc_ticket_msg *msg)
{
int valid;
struct boothc_ticket_msg omsg;
cmd_result_t inappr_reason;
int reason;
inappr_reason = test_reason(tk, sender, leader, msg);
- if (inappr_reason)
- return send_reject(sender, tk, inappr_reason, msg);
+ if (inappr_reason) {
+ return send_reject(conf, sender, tk, inappr_reason, msg);
+ }
valid = term_time_left(tk);
reason = ntohl(msg->header.reason);
/* valid tickets are not allowed only if the sender thinks
* the ticket got lost */
if (sender != tk->leader && valid && reason != OR_STEPDOWN) {
tk_log_warn("election from %s with reason %s rejected "
"(we have %s as ticket owner), ticket still valid for %ds",
site_string(sender), state_to_string(reason),
site_string(tk->leader), valid);
- return send_reject(sender, tk, RLT_TERM_STILL_VALID, msg);
+ return send_reject(conf, sender, tk, RLT_TERM_STILL_VALID, msg);
}
- if (term_too_low(tk, sender, leader, msg))
+ if (term_too_low(conf, tk, sender, leader, msg)) {
return 0;
+ }
/* set this, so that we know not to send status for the
* ticket */
tk->in_election = 1;
/* reset ticket's leader on not valid tickets */
if (!valid)
set_leader(tk, NULL);
/* if it's a newer term or ... */
if (newer_term(tk, sender, leader, msg, 1)) {
clear_election(tk);
goto vote_for_sender;
}
/* ... we didn't vote yet, then vote for the sender */
/* §5.2, §5.4 */
if (!tk->voted_for) {
vote_for_sender:
tk->voted_for = sender;
record_vote(tk, sender, leader);
}
init_ticket_msg(&omsg, OP_VOTE_FOR, OP_REQ_VOTE, RLT_SUCCESS, 0, tk);
omsg.ticket.leader = htonl(get_node_id(tk->voted_for));
- return booth_udp_send_auth(sender, &omsg, sendmsglen(&omsg));
+ return booth_udp_send_auth(conf, sender, &omsg, sendmsglen(&omsg));
}
#define is_reason(r, tk) \
(reason == (r) || (reason == OR_AGAIN && (tk)->election_reason == (r)))
-int new_election(struct ticket_config *tk,
- struct booth_site *preference, int update_term, cmd_reason_t reason)
+int new_election(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *preference, int update_term, cmd_reason_t reason)
{
struct booth_site *new_leader;
if (local->type != SITE)
return 0;
if ((is_reason(OR_TKT_LOST, tk) || is_reason(OR_STEPDOWN, tk)) &&
check_attr_prereq(tk, GRANT_AUTO)) {
tk_log_info("attribute prerequisite not met, "
"not starting elections");
return 0;
}
/* elections were already started, but not yet finished/timed out */
if (is_time_set(&tk->election_end) && !is_past(&tk->election_end))
return 1;
if (ANYDEBUG) {
int tdiff;
if (is_time_set(&tk->election_end)) {
tdiff = -time_left(&tk->election_end);
tk_log_debug("starting elections, previous finished since " intfmt(tdiff));
} else {
tk_log_debug("starting elections");
}
tk_log_debug("elections caused by %s %s",
state_to_string(reason),
reason == OR_AGAIN ? state_to_string(tk->election_reason) : "" );
}
/* §5.2 */
/* If there was _no_ answer, don't keep incrementing the term number
* indefinitely. If there was no peer, there'll probably be no one
* listening now either. However, we don't know if we were
* invoked due to a timeout (caller does).
*/
/* increment the term only if either the current term was
* valid or if there was a tie (in that case update_term > 1)
*/
if ((update_term > 1) ||
(update_term && tk->last_valid_tk &&
tk->last_valid_tk->current_term >= tk->current_term)) {
/* save the previous term, we may need to send out the
* MY_INDEX message */
if (tk->state != ST_CANDIDATE) {
save_committed_tkt(tk);
}
tk->current_term++;
}
set_future_time(&tk->election_end, tk->timeout);
tk->in_election = 1;
tk_log_info("starting new election (term=%d)",
tk->current_term);
clear_election(tk);
new_leader = preference ? preference : local;
record_vote(tk, local, new_leader);
tk->voted_for = new_leader;
set_state(tk, ST_CANDIDATE);
/* some callers may want just to repeat on timeout */
if (reason == OR_AGAIN) {
reason = tk->election_reason;
} else {
tk->election_reason = reason;
}
- ticket_broadcast(tk, OP_REQ_VOTE, OP_VOTE_FOR, RLT_SUCCESS, reason);
+ ticket_broadcast(conf, tk, OP_REQ_VOTE, OP_VOTE_FOR, RLT_SUCCESS, reason);
add_random_delay(tk);
return 0;
}
/* we were a leader and somebody says that they have a more up
* to date ticket
* there was probably connectivity loss
* tricky
*/
static int leader_handle_newer_ticket(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
update_term_from_msg(tk, msg);
if (leader != no_leader && leader && leader != local) {
/* eek, two leaders, split brain */
/* normally shouldn't happen; run election */
tk_log_error("from %s: ticket granted to %s! (revoking locally)",
site_string(sender),
site_string(leader)
);
} else if (term_time_left(tk)) {
/* eek, two leaders, split brain */
/* normally shouldn't happen; run election */
tk_log_error("from %s: ticket granted to %s! (revoking locally)",
site_string(sender),
site_string(leader)
);
}
set_next_state(tk, ST_LEADER);
return 0;
}
/* reply to STATUS */
-static int process_MY_INDEX (
- struct ticket_config *tk,
- struct booth_site *sender,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg
- )
+static int process_MY_INDEX(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *sender, struct booth_site *leader,
+ struct boothc_ticket_msg *msg)
{
int i;
int expired;
expired = !msg_term_time(msg);
/* test against the last valid(!) ticket we have */
i = my_last_term(tk) - ntohl(msg->ticket.term);
if (i > 0) {
/* let them know about our newer ticket */
- send_msg(OP_MY_INDEX, tk, sender, msg);
+ send_msg(conf, OP_MY_INDEX, tk, sender, msg);
if (tk->state == ST_LEADER) {
tk_log_info("sending ticket update to %s",
site_string(sender));
- return send_msg(OP_UPDATE, tk, sender, msg);
+ return send_msg(conf, OP_UPDATE, tk, sender, msg);
}
}
/* we have a newer or equal ticket and theirs is expired,
* nothing more to do here */
if (i >= 0 && expired) {
return 0;
}
if (tk->state == ST_LEADER) {
/* we're the leader, thread carefully */
if (expired) {
/* if their ticket is expired,
* nothing more to do */
return 0;
}
if (i < 0) {
/* they have a newer ticket, trouble if we're already leader
* for it */
tk_log_warn("from %s: more up to date ticket at %s",
site_string(sender),
site_string(leader)
);
return leader_handle_newer_ticket(tk, sender, leader, msg);
} else {
/* we have the ticket and we don't care */
return 0;
}
} else if (tk->state == ST_CANDIDATE) {
if (leader == local) {
/* a belated MY_INDEX, we're already trying to get the
* ticket */
return 0;
}
}
/* their ticket is either newer or not expired, don't
* ignore it */
update_ticket_from_msg(tk, sender, msg);
set_leader(tk, leader);
update_ticket_state(tk, sender);
save_committed_tkt(tk);
set_ticket_wakeup(tk);
return 0;
}
-int raft_answer(
- struct ticket_config *tk,
- struct booth_site *sender,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg
- )
+int raft_answer(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *sender, struct booth_site *leader,
+ struct boothc_ticket_msg *msg)
{
int cmd, req;
int rv;
rv = 0;
cmd = ntohl(msg->header.cmd);
req = ntohl(msg->header.request);
if (req)
tk_log_debug("got %s (req %s) from %s",
state_to_string(cmd),
state_to_string(req),
site_string(sender));
else
tk_log_debug("got %s from %s",
state_to_string(cmd),
site_string(sender));
/* don't process tickets with invalid term
*/
if (cmd != OP_STATUS &&
msg_term_invalid(tk, sender, leader, msg))
return 0;
switch (cmd) {
case OP_REQ_VOTE:
- rv = answer_REQ_VOTE(tk, sender, leader, msg);
+ rv = answer_REQ_VOTE(conf, tk, sender, leader, msg);
break;
case OP_VOTE_FOR:
- rv = process_VOTE_FOR(tk, sender, leader, msg);
+ rv = process_VOTE_FOR(conf, tk, sender, leader, msg);
break;
case OP_ACK:
if (tk->leader == local &&
tk->state == ST_LEADER)
- rv = process_ACK(tk, sender, leader, msg);
+ rv = process_ACK(conf, tk, sender, leader, msg);
break;
case OP_HEARTBEAT:
if ((tk->leader != local || !term_time_left(tk)) &&
(tk->state == ST_INIT || tk->state == ST_FOLLOWER ||
tk->state == ST_CANDIDATE))
- rv = answer_HEARTBEAT(tk, sender, leader, msg);
+ rv = answer_HEARTBEAT(conf, tk, sender, leader, msg);
else {
tk_log_warn("unexpected message %s, from %s",
state_to_string(cmd),
site_string(sender));
mark_ticket_as_granted(tk, sender);
- if (ticket_seems_ok(tk))
- send_reject(sender, tk, RLT_TERM_STILL_VALID, msg);
+ if (ticket_seems_ok(tk)) {
+ send_reject(conf, sender, tk, RLT_TERM_STILL_VALID, msg);
+ }
+
rv = -EINVAL;
}
break;
case OP_UPDATE:
if (((tk->leader != local && tk->leader == leader) || !is_owned(tk)) &&
(tk->state == ST_INIT || tk->state == ST_FOLLOWER ||
tk->state == ST_CANDIDATE)) {
- rv = process_UPDATE(tk, sender, leader, msg);
+ rv = process_UPDATE(conf, tk, sender, leader, msg);
} else {
tk_log_warn("unexpected message %s, from %s",
state_to_string(cmd),
site_string(sender));
- if (ticket_seems_ok(tk))
- send_reject(sender, tk, RLT_TERM_STILL_VALID, msg);
+
+ if (ticket_seems_ok(tk)) {
+ send_reject(conf, sender, tk, RLT_TERM_STILL_VALID, msg);
+ }
+
rv = -EINVAL;
}
break;
case OP_REJECTED:
rv = process_REJECTED(tk, sender, leader, msg);
break;
case OP_REVOKE:
- rv = process_REVOKE(tk, sender, leader, msg);
+ rv = process_REVOKE(conf, tk, sender, leader, msg);
break;
case OP_MY_INDEX:
- rv = process_MY_INDEX(tk, sender, leader, msg);
+ rv = process_MY_INDEX(conf, tk, sender, leader, msg);
break;
case OP_STATUS:
- if (!tk->in_election)
- rv = send_msg(OP_MY_INDEX, tk, sender, msg);
+ if (!tk->in_election) {
+ rv = send_msg(conf, OP_MY_INDEX, tk, sender, msg);
+ }
+
break;
default:
tk_log_error("unknown message %s, from %s",
state_to_string(cmd), site_string(sender));
rv = -EINVAL;
}
return rv;
}
diff --git a/src/raft.h b/src/raft.h
index 0e01b48..a246244 100644
--- a/src/raft.h
+++ b/src/raft.h
@@ -1,43 +1,43 @@
/*
* Copyright (C) 2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _RAFT_H
#define _RAFT_H
#include "booth.h"
typedef enum {
ST_INIT = CHAR2CONST('I', 'n', 'i', 't'),
ST_FOLLOWER = CHAR2CONST('F', 'l', 'l', 'w'),
ST_CANDIDATE = CHAR2CONST('C', 'n', 'd', 'i'),
ST_LEADER = CHAR2CONST('L', 'e', 'a', 'd'),
} server_state_e;
struct ticket_config;
-int raft_answer(struct ticket_config *tk,
- struct booth_site *from,
- struct booth_site *leader,
- struct boothc_ticket_msg *msg);
-
-int new_election(struct ticket_config *tk,
- struct booth_site *new_leader, int update_term, cmd_reason_t reason);
-void elections_end(struct ticket_config *tk);
+int raft_answer(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *from, struct booth_site *leader,
+ struct boothc_ticket_msg *msg);
+
+int new_election(struct booth_config *conf, struct ticket_config *tk,
+ struct booth_site *new_leader, int update_term,
+ cmd_reason_t reason);
+void elections_end(struct booth_config *conf, struct ticket_config *tk);
#endif /* _RAFT_H */
diff --git a/src/request.c b/src/request.c
index 2503f6c..d7ff5f9 100644
--- a/src/request.c
+++ b/src/request.c
@@ -1,83 +1,84 @@
/*
* Copyright (C) 2015 Dejan Muhamedagic <dejan@hello-penguin.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include <errno.h>
#include <stdio.h>
#include <assert.h>
#include <glib.h>
#include "booth.h"
#include "ticket.h"
#include "request.h"
#include "log.h"
static GList *req_l = NULL;
static int req_id_cnt;
/* add request to the queue; it is up to the caller to manage
* memory for the three parameters
*/
void *add_req(
struct ticket_config *tk,
struct client *req_client,
struct boothc_ticket_msg *msg)
{
struct request *rp;
rp = g_new(struct request, 1);
if (!rp)
return NULL;
rp->id = req_id_cnt++;
rp->tk = tk;
rp->client_fd = req_client->fd;
rp->msg = msg;
req_l = g_list_append(req_l, rp);
return rp;
}
int get_req_id(const void *rp)
{
if (!rp)
return -1;
return ((struct request *)rp)->id;
}
static void del_req(GList *lp)
{
if (!lp)
return;
req_l = g_list_delete_link(req_l, lp);
}
-void foreach_tkt_req(struct ticket_config *tk, req_fp f)
+void foreach_tkt_req(struct booth_config *conf, struct ticket_config *tk,
+ req_fp f)
{
GList *lp, *next;
struct request *rp;
lp = g_list_first(req_l);
while (lp) {
next = g_list_next(lp);
rp = (struct request *)lp->data;
if (rp->tk == tk &&
- (f)(rp->tk, rp->client_fd, rp->msg) == 0) {
+ (f)(conf, rp->tk, rp->client_fd, rp->msg) == 0) {
log_debug("remove request for client %d", rp->client_fd);
del_req(lp); /* don't need this request anymore */
}
lp = next;
}
}
diff --git a/src/request.h b/src/request.h
index c014a09..2b25ca7 100644
--- a/src/request.h
+++ b/src/request.h
@@ -1,55 +1,68 @@
/*
* Copyright (C) 2015 Dejan Muhamedagic <dejan@hello-penguin.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _REQUEST_H
#define _REQUEST_H
#include "booth.h"
#include "config.h"
/* Requests are coming from clients and get queued in a
* round-robin queue (fixed size)
*
* This is one way to make the server more responsive and less
* dependent on misbehaving clients. The requests are queued and
* later served from the server loop.
*/
struct request {
/** Request ID */
int id;
/** The ticket. */
struct ticket_config *tk;
/** The client which sent the request */
int client_fd;
/** The message containing the request */
void *msg;
};
-typedef int (*req_fp)(
- struct ticket_config *, int, struct boothc_ticket_msg *);
+typedef int (*req_fp)(struct booth_config *, struct ticket_config *, int,
+ struct boothc_ticket_msg *);
void *add_req(struct ticket_config *tk, struct client *req_client,
struct boothc_ticket_msg *msg);
-void foreach_tkt_req(struct ticket_config *tk, req_fp f);
+
+/**
+ * @internal
+ * Handle all pending requests for given ticket using function @p f
+ *
+ * @param[in,out] conf config object to refer to
+ * @param[in] tk ticket at hand
+ * @param[in] f handling function
+ *
+ * @return 1 on success, 0 when not done with the message, yet
+ */
+void foreach_tkt_req(struct booth_config *conf, struct ticket_config *tk,
+ req_fp f);
+
int get_req_id(const void *rp);
#endif /* _REQUEST_H */
diff --git a/src/ticket.c b/src/ticket.c
index c9147d8..fc5f3a7 100644
--- a/src/ticket.c
+++ b/src/ticket.c
@@ -1,1440 +1,1446 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "b_config.h"
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <inttypes.h>
#include <stdio.h>
#include <assert.h>
#include <time.h>
#ifndef RANGE2RANDOM_GLIB
#include <clplumbing/cl_random.h>
#else
#include "alt/range2random_glib.h"
#endif
#include "ticket.h"
#include "config.h"
#include "pacemaker.h"
#include "inline-fn.h"
#include "log.h"
#include "booth.h"
#include "raft.h"
#include "handler.h"
#include "request.h"
#include "manual.h"
extern int TIME_RES;
/* Untrusted input, must fit (incl. \0) in a buffer of max chars. */
int check_max_len_valid(const char *s, int max)
{
for (int i = 0; i < max; i++) {
if (s[i] == 0) {
return 1;
}
}
return 0;
}
int find_ticket_by_name(struct booth_config *conf, const char *ticket,
struct ticket_config **found)
{
struct ticket_config *tk;
int i;
if (found) {
*found = NULL;
}
FOREACH_TICKET(conf, i, tk) {
if (strncmp(tk->name, ticket, sizeof(tk->name))) {
continue;
}
if (found) {
*found = tk;
}
return 1;
}
return 0;
}
int check_ticket(struct booth_config *conf, char *ticket,
struct ticket_config **found)
{
if (found) {
*found = NULL;
}
if (conf == NULL) {
return 0;
}
if (!check_max_len_valid(ticket, sizeof(conf->ticket[0].name))) {
return 0;
}
return find_ticket_by_name(conf, ticket, found);
}
/* is it safe to commit the grant?
* if we didn't hear from all sites on the initial grant, we may
* need to delay the commit
*
* TODO: investigate possibility to devise from history whether a
* missing site could be holding a ticket or not
*/
static int ticket_dangerous(struct ticket_config *tk)
{
int tdiff;
/* we may be invoked often, don't spam the log unnecessarily
*/
static int no_log_delay_msg;
if (!is_time_set(&tk->delay_commit)) {
return 0;
}
if (is_past(&tk->delay_commit) || all_sites_replied(tk)) {
if (tk->leader == local) {
tk_log_info("%s, committing to CIB",
is_past(&tk->delay_commit) ?
"ticket delay expired" : "all sites replied");
}
time_reset(&tk->delay_commit);
no_log_delay_msg = 0;
return 0;
}
tdiff = time_left(&tk->delay_commit);
tk_log_debug("delay ticket commit for another " intfmt(tdiff));
if (!no_log_delay_msg) {
tk_log_info("delaying ticket commit to CIB for " intfmt(tdiff));
tk_log_info("(or all sites are reached)");
no_log_delay_msg = 1;
}
return 1;
}
int ticket_write(struct ticket_config *tk)
{
if (local->type != SITE) {
return -EINVAL;
}
if (ticket_dangerous(tk)) {
return 1;
}
if (tk->leader == local) {
if (tk->state != ST_LEADER) {
tk_log_info("ticket state not yet consistent, "
"delaying ticket grant to CIB");
return 1;
}
pcmk_handler.grant_ticket(tk);
} else {
pcmk_handler.revoke_ticket(tk);
}
tk->update_cib = 0;
return 0;
}
void save_committed_tkt(struct ticket_config *tk)
{
if (!tk->last_valid_tk) {
tk->last_valid_tk = malloc(sizeof(struct ticket_config));
if (!tk->last_valid_tk) {
log_error("out of memory");
return;
}
}
memcpy(tk->last_valid_tk, tk, sizeof(struct ticket_config));
}
-static void ext_prog_failed(struct ticket_config *tk, int start_election)
+static void ext_prog_failed(struct booth_config *conf, struct ticket_config *tk,
+ int start_election)
{
if (!is_manual(tk)) {
/* Give it to somebody else.
* Just send a VOTE_FOR message, so the
* others can start elections. */
if (!leader_and_valid(tk)) {
return;
}
save_committed_tkt(tk);
reset_ticket(tk);
ticket_write(tk);
if (start_election) {
- ticket_broadcast(tk, OP_VOTE_FOR, OP_REQ_VOTE, RLT_SUCCESS, OR_LOCAL_FAIL);
+ ticket_broadcast(conf, tk, OP_VOTE_FOR, OP_REQ_VOTE,
+ RLT_SUCCESS, OR_LOCAL_FAIL);
}
} else {
/* There is not much we can do now because
* the manual ticket cannot be relocated.
* Just warn the user. */
if (tk->leader != local) {
return;
}
save_committed_tkt(tk);
reset_ticket(tk);
ticket_write(tk);
log_error("external test failed on the specified machine, cannot acquire a manual ticket");
}
}
#define attr_found(geo_ap, ap) \
((geo_ap) && !strcmp((geo_ap)->val, (ap)->attr_val))
int check_attr_prereq(struct ticket_config *tk, grant_type_e grant_type)
{
GList *el;
struct attr_prereq *ap;
struct geo_attr *geo_ap;
for (el = g_list_first(tk->attr_prereqs); el; el = g_list_next(el))
{
ap = (struct attr_prereq *) el->data;
if (ap->grant_type != grant_type) {
continue;
}
geo_ap = (struct geo_attr *) g_hash_table_lookup(tk->attr, ap->attr_name);
switch(ap->op) {
case ATTR_OP_EQ:
if (!attr_found(geo_ap, ap)) {
goto fail;
}
break;
case ATTR_OP_NE:
if (attr_found(geo_ap, ap)) {
goto fail;
}
break;
default:
break;
}
}
return 0;
fail:
tk_log_warn("'%s' attr-prereq failed", ap->attr_name);
return 1;
}
/* do we need to run the external program?
* or we already done that and waiting for the outcome
* or program exited and we can collect the status
* return codes
* 0: no program defined
* RUNCMD_MORE: program forked, results later
* != 0: executing program failed (or some other failure)
*/
-static int do_ext_prog(struct ticket_config *tk, int start_election)
+static int do_ext_prog(struct booth_config *conf, struct ticket_config *tk,
+ int start_election)
{
int rv = 0;
if (!tk_test.path) {
return 0;
}
switch(tk_test.progstate) {
case EXTPROG_IDLE:
rv = run_handler(tk);
if (rv == RUNCMD_ERR) {
tk_log_warn("couldn't run external test, not allowed to acquire ticket");
- ext_prog_failed(tk, start_election);
+ ext_prog_failed(conf, tk, start_election);
}
break;
case EXTPROG_RUNNING:
/* should never get here, but just in case */
rv = RUNCMD_MORE;
break;
case EXTPROG_EXITED:
rv = tk_test_exit_status(tk);
if (rv) {
- ext_prog_failed(tk, start_election);
+ ext_prog_failed(conf, tk, start_election);
}
break;
case EXTPROG_IGNORE:
/* nothing to do here */
break;
}
return rv;
}
/* Try to acquire a ticket
* Could be manual grant or after start (if the ticket is granted
* and still valid in the CIB)
* If the external program needs to run, this is run twice, once
* to start the program, and then to get the result and start
* elections.
*/
-static int acquire_ticket(struct ticket_config *tk, cmd_reason_t reason)
+static int acquire_ticket(struct booth_config *conf, struct ticket_config *tk,
+ cmd_reason_t reason)
{
int rv;
if (reason == OR_ADMIN && check_attr_prereq(tk, GRANT_MANUAL)) {
return RLT_ATTR_PREREQ;
}
- switch(do_ext_prog(tk, 0)) {
+ switch(do_ext_prog(conf, tk, 0)) {
case 0:
/* everything fine */
break;
case RUNCMD_MORE:
/* need to wait for the outcome before starting elections */
return 0;
default:
return RLT_EXT_FAILED;
}
if (is_manual(tk)) {
- rv = manual_selection(tk, local, 1, reason);
+ rv = manual_selection(conf, tk, local, 1, reason);
} else {
- rv = new_election(tk, local, 1, reason);
+ rv = new_election(conf, tk, local, 1, reason);
}
return rv ? RLT_SYNC_FAIL : 0;
}
/** Try to get the ticket for the local site.
* */
-static int do_grant_ticket(struct ticket_config *tk, int options)
+static int do_grant_ticket(struct booth_config *conf, struct ticket_config *tk,
+ int options)
{
int rv;
tk_log_info("granting ticket");
if (tk->leader == local) {
return RLT_SUCCESS;
}
if (is_owned(tk)) {
if (is_manual(tk) && (options & OPT_IMMEDIATE)) {
/* -F flag has been used while granting a manual ticket.
* The ticket will be granted and may end up being granted
* on multiple sites */
tk_log_warn("manual ticket forced to be granted! be aware that "
"you may end up having two sites holding the same manual "
"ticket! revoke the ticket from the unnecessary site!");
} else {
return RLT_OVERGRANT;
}
}
set_future_time(&tk->delay_commit, tk->term_duration + tk->acquire_after);
if (options & OPT_IMMEDIATE) {
tk_log_warn("granting ticket immediately! If there are "
"unreachable sites, _hope_ you are sure that they don't "
"have the ticket!");
time_reset(&tk->delay_commit);
}
- rv = acquire_ticket(tk, OR_ADMIN);
+ rv = acquire_ticket(conf, tk, OR_ADMIN);
if (rv) {
time_reset(&tk->delay_commit);
return rv;
} else {
return RLT_MORE;
}
}
-static void start_revoke_ticket(struct ticket_config *tk)
+static void start_revoke_ticket(struct booth_config *conf, struct ticket_config *tk)
{
tk_log_info("revoking ticket");
save_committed_tkt(tk);
reset_ticket_and_set_no_leader(tk);
ticket_write(tk);
- ticket_broadcast(tk, OP_REVOKE, OP_ACK, RLT_SUCCESS, OR_ADMIN);
+ ticket_broadcast(conf, tk, OP_REVOKE, OP_ACK, RLT_SUCCESS, OR_ADMIN);
}
/** Ticket revoke.
* Only to be started from the leader. */
-static int do_revoke_ticket(struct ticket_config *tk)
+static int do_revoke_ticket(struct booth_config *conf, struct ticket_config *tk)
{
if (tk->acks_expected) {
tk_log_info("delay ticket revoke until the current operation finishes");
set_next_state(tk, ST_INIT);
return RLT_MORE;
} else {
- start_revoke_ticket(tk);
+ start_revoke_ticket(conf, tk);
return RLT_SUCCESS;
}
}
static int number_sites_marked_as_granted(struct booth_config *conf,
struct ticket_config *tk)
{
int i, result = 0;
struct booth_site *ignored __attribute__((unused));
FOREACH_NODE(conf, i, ignored) {
result += tk->sites_where_granted[i];
}
return result;
}
static int list_ticket(struct booth_config *conf, char **pdata)
{
GString *s = NULL;
struct ticket_config *tk;
struct booth_site *site;
char timeout_str[64];
char *pending_str = NULL;
int i, site_index;
time_t ts;
s = g_string_sized_new(BUFSIZ);
if (s == NULL) {
return -ENOMEM;
}
FOREACH_TICKET(conf, i, tk) {
if (!is_manual(tk) && is_time_set(&tk->term_expires)) {
/* Manual tickets doesn't have term_expires defined */
ts = wall_ts(&tk->term_expires);
strftime(timeout_str, sizeof(timeout_str), "%F %T",
localtime(&ts));
} else {
strcpy(timeout_str, "INF");
}
if (tk->leader == local && is_time_set(&tk->delay_commit) &&
!is_past(&tk->delay_commit)) {
char until_str[64];
int rc;
ts = wall_ts(&tk->delay_commit);
strftime(until_str, sizeof(until_str), "%F %T",
localtime(&ts));
rc = asprintf(&pending_str, " (commit pending until %s)",
until_str);
if (rc < 0) {
g_string_free(s, TRUE);
return -ENOMEM;
}
}
g_string_append_printf(s, "ticket: %s, leader: %s", tk->name,
ticket_leader_string(tk));
if (is_owned(tk)) {
g_string_append_printf(s, ", expires: %s", timeout_str);
if (pending_str != NULL) {
g_string_append(s, pending_str);
}
}
if (is_manual(tk)) {
g_string_append(s, " [manual mode]");
}
g_string_append(s, "\n");
if (pending_str != NULL) {
free(pending_str);
pending_str = NULL;
}
}
FOREACH_TICKET(conf, i, tk) {
int multiple_grant_warning_length = number_sites_marked_as_granted(conf, tk);
if (multiple_grant_warning_length <= 1) {
continue;
}
g_string_append_printf(s, "\nWARNING: The ticket %s is granted to multiple sites: ",
tk->name);
FOREACH_NODE(conf, site_index, site) {
if (tk->sites_where_granted[site_index] <= 0) {
continue;
}
g_string_append(s, site_string(site));
multiple_grant_warning_length--;
if (multiple_grant_warning_length > 0) {
g_string_append(s, ", ");
}
}
g_string_append(s, ". Revoke the ticket from the faulty sites.\n");
}
*pdata = strdup(s->str);
g_string_free(s, TRUE);
if (*pdata == NULL) {
return -ENOMEM;
}
return 0;
}
void disown_ticket(struct ticket_config *tk)
{
set_leader(tk, NULL);
tk->is_granted = 0;
get_time(&tk->term_expires);
}
void reset_ticket(struct ticket_config *tk)
{
ignore_ext_test(tk);
disown_ticket(tk);
no_resends(tk);
set_state(tk, ST_INIT);
set_next_state(tk, 0);
tk->voted_for = NULL;
}
void reset_ticket_and_set_no_leader(struct ticket_config *tk)
{
mark_ticket_as_revoked_from_leader(tk);
reset_ticket(tk);
tk->leader = no_leader;
tk_log_debug("ticket leader set to no_leader");
}
static void log_reacquire_reason(struct ticket_config *tk)
{
int valid;
const char *where_granted = NULL;
char buff[75];
valid = is_time_set(&tk->term_expires) && !is_past(&tk->term_expires);
if (tk->leader == local) {
where_granted = "granted here";
} else {
snprintf(buff, sizeof(buff), "granted to %s",
site_string(tk->leader));
where_granted = buff;
}
if (!valid) {
tk_log_warn("%s, but not valid anymore (will try to reacquire)",
where_granted);
}
if (tk->is_granted && tk->leader != local) {
if (tk->leader && tk->leader != no_leader) {
tk_log_error("granted here, but also %s, "
"that's really too bad (will try to reacquire)",
where_granted);
} else {
tk_log_warn("granted here, but we're "
"not recorded as the grantee (will try to reacquire)");
}
}
}
void update_ticket_state(struct ticket_config *tk, struct booth_site *sender)
{
if (tk->state == ST_CANDIDATE) {
tk_log_info("learned from %s about newer ticket, stopping elections",
site_string(sender));
/* there could be rejects coming from others; don't log
* warnings unnecessarily */
tk->expect_more_rejects = 1;
}
if (tk->leader == local || tk->is_granted) {
/* message from a live leader with valid ticket? */
if (sender == tk->leader && term_time_left(tk)) {
if (tk->is_granted) {
tk_log_warn("ticket was granted here, "
"but it's live at %s (revoking here)",
site_string(sender));
} else {
tk_log_info("ticket live at %s", site_string(sender));
}
disown_ticket(tk);
ticket_write(tk);
set_state(tk, ST_FOLLOWER);
set_next_state(tk, ST_FOLLOWER);
} else {
if (tk->state == ST_CANDIDATE) {
set_state(tk, ST_FOLLOWER);
}
set_next_state(tk, ST_LEADER);
}
} else {
if (!tk->leader || tk->leader == no_leader) {
if (sender) {
tk_log_info("ticket is not granted");
} else {
tk_log_info("ticket is not granted (from CIB)");
}
set_state(tk, ST_INIT);
} else {
if (sender) {
tk_log_info("ticket granted to %s (says %s)",
site_string(tk->leader),
tk->leader == sender ? "they" : site_string(sender));
} else {
tk_log_info("ticket granted to %s (from CIB)",
site_string(tk->leader));
}
set_state(tk, ST_FOLLOWER);
/* just make sure that we check the ticket soon */
set_next_state(tk, ST_FOLLOWER);
}
}
}
int setup_ticket(struct booth_config *conf)
{
struct ticket_config *tk;
int i;
FOREACH_TICKET(conf, i, tk) {
reset_ticket(tk);
if (local->type == SITE) {
if (!pcmk_handler.load_ticket(conf, tk)) {
update_ticket_state(tk, NULL);
}
tk->update_cib = 1;
}
tk_log_info("broadcasting state query");
/* wait until all send their status (or the first
* timeout) */
tk->start_postpone = 1;
- ticket_broadcast(tk, OP_STATUS, OP_MY_INDEX, RLT_SUCCESS, 0);
+ ticket_broadcast(conf, tk, OP_STATUS, OP_MY_INDEX, RLT_SUCCESS, 0);
}
return 0;
}
int ticket_answer_list(struct booth_config *conf, int fd)
{
char *data = NULL;
int rv;
struct boothc_hdr_msg hdr;
rv = list_ticket(conf, &data);
if (rv < 0) {
goto out;
}
init_header(&hdr.header, CL_LIST, 0, 0, RLT_SUCCESS, 0, sizeof(hdr) + strlen(data));
- rv = send_header_plus(fd, &hdr, data, strlen(data));
+ rv = send_header_plus(conf, fd, &hdr, data, strlen(data));
out:
if (data != NULL) {
free(data);
}
return rv;
}
int process_client_request(struct booth_config *conf, struct client *req_client,
void *buf)
{
int rv, rc = 1;
struct ticket_config *tk;
int cmd;
struct boothc_ticket_msg omsg;
struct boothc_ticket_msg *msg;
msg = (struct boothc_ticket_msg *)buf;
cmd = ntohl(msg->header.cmd);
if (!check_ticket(conf, msg->ticket.id, &tk)) {
log_warn("client referenced unknown ticket %s", msg->ticket.id);
rv = RLT_INVALID_ARG;
goto reply_now;
}
/* Perform the initial check before granting
* an already granted non-manual ticket */
if (!is_manual(tk) && cmd == CMD_GRANT && is_owned(tk)) {
log_warn("client wants to grant an (already granted!) ticket %s",
msg->ticket.id);
rv = RLT_OVERGRANT;
goto reply_now;
}
if (cmd == CMD_REVOKE && !is_owned(tk)) {
log_info("client wants to revoke a free ticket %s", msg->ticket.id);
rv = RLT_TICKET_IDLE;
goto reply_now;
}
if (cmd == CMD_REVOKE && tk->leader != local) {
tk_log_info("not granted here, redirect to %s",
ticket_leader_string(tk));
rv = RLT_REDIRECT;
goto reply_now;
}
if (cmd == CMD_REVOKE) {
- rv = do_revoke_ticket(tk);
+ rv = do_revoke_ticket(conf, tk);
} else {
- rv = do_grant_ticket(tk, ntohl(msg->header.options));
+ rv = do_grant_ticket(conf, tk, ntohl(msg->header.options));
}
if (rv == RLT_MORE) {
/* client may receive further notifications, save the
* request for further processing */
add_req(tk, req_client, msg);
tk_log_debug("queue request %s for client %d",
state_to_string(cmd), req_client->fd);
rc = 0; /* we're not yet done with the message */
}
reply_now:
init_ticket_msg(&omsg, CL_RESULT, 0, rv, 0, tk);
- send_client_msg(req_client->fd, &omsg);
+ send_client_msg(conf, req_client->fd, &omsg);
return rc;
}
-int notify_client(struct ticket_config *tk, int client_fd,
- struct boothc_ticket_msg *msg)
+int notify_client(struct booth_config *conf, struct ticket_config *tk,
+ int client_fd, struct boothc_ticket_msg *msg)
{
struct boothc_ticket_msg omsg;
void (*deadfn) (int ci);
int rv, rc, ci;
int cmd, options;
struct client *req_client;
cmd = ntohl(msg->header.cmd);
options = ntohl(msg->header.options);
rv = tk->outcome;
ci = find_client_by_fd(client_fd);
if (ci < 0) {
tk_log_info("client %d (request %s) left before being notified",
client_fd, state_to_string(cmd));
return 0;
}
tk_log_debug("notifying client %d (request %s)",
client_fd, state_to_string(cmd));
init_ticket_msg(&omsg, CL_RESULT, 0, rv, 0, tk);
- rc = send_client_msg(client_fd, &omsg);
+ rc = send_client_msg(conf, client_fd, &omsg);
if (rc == 0 && (rv == RLT_MORE ||
(rv == RLT_CIB_PENDING && (options & OPT_WAIT_COMMIT)))) {
/* more to do here, keep the request */
return 1;
} else {
/* we sent a definite answer or there was a write error, drop
* the client */
if (rc) {
tk_log_debug("failed to notify client %d (request %s)",
client_fd, state_to_string(cmd));
} else {
tk_log_debug("client %d (request %s) got final notification",
client_fd, state_to_string(cmd));
}
req_client = clients + ci;
deadfn = req_client->deadfn;
if (deadfn) {
deadfn(ci);
}
return 0; /* we're done with this request */
}
}
-int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd,
- cmd_request_t expected_reply, cmd_result_t res,
- cmd_reason_t reason)
+int ticket_broadcast(struct booth_config *conf, struct ticket_config *tk,
+ cmd_request_t cmd, cmd_request_t expected_reply,
+ cmd_result_t res, cmd_reason_t reason)
{
struct boothc_ticket_msg msg;
init_ticket_msg(&msg, cmd, 0, res, reason, tk);
tk_log_debug("broadcasting '%s' (term=%d, valid=%d)",
state_to_string(cmd),
ntohl(msg.ticket.term),
msg_term_time(&msg));
tk->last_request = cmd;
if (expected_reply) {
expect_replies(tk, expected_reply);
}
ticket_activate_timeout(tk);
- return transport()->broadcast_auth(&msg, sendmsglen(&msg));
+ return transport()->broadcast_auth(conf, &msg, sendmsglen(&msg));
}
/* update the ticket on the leader, write it to the CIB, and
send out the update message to others with the new expiry
time
*/
-int leader_update_ticket(struct ticket_config *tk)
+int leader_update_ticket(struct booth_config *conf, struct ticket_config *tk)
{
int rv = 0, rv2;
timetype now;
if (tk->ticket_updated >= 2) {
return 0;
}
/* for manual tickets, we don't set time expiration */
if (!is_manual(tk) && tk->ticket_updated < 1) {
tk->ticket_updated = 1;
get_time(&now);
copy_time(&now, &tk->last_renewal);
set_future_time(&tk->term_expires, tk->term_duration);
- rv = ticket_broadcast(tk, OP_UPDATE, OP_ACK, RLT_SUCCESS, 0);
+ rv = ticket_broadcast(conf, tk, OP_UPDATE, OP_ACK, RLT_SUCCESS, 0);
}
if (tk->ticket_updated < 2) {
rv2 = ticket_write(tk);
switch(rv2) {
case 0:
tk->ticket_updated = 2;
tk->outcome = RLT_SUCCESS;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
break;
case 1:
if (tk->outcome != RLT_CIB_PENDING) {
tk->outcome = RLT_CIB_PENDING;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
}
break;
default:
break;
}
}
return rv;
}
static void log_lost_servers(struct booth_config *conf, struct ticket_config *tk)
{
struct booth_site *n;
int i;
if (tk->retry_number > 1) {
/* log those that we couldn't reach, but do
* that only on the first retry
*/
return;
}
FOREACH_NODE(conf, i, n) {
if (tk->acks_received & n->bitmask) {
continue;
}
tk_log_warn("%s %s didn't acknowledge our %s, "
"will retry %d times",
(n->type == ARBITRATOR ? "arbitrator" : "site"),
site_string(n), state_to_string(tk->last_request),
tk->retries);
}
}
static void resend_msg(struct booth_config *conf, struct ticket_config *tk)
{
struct booth_site *n;
int i;
if (!(tk->acks_received ^ local->bitmask)) {
- ticket_broadcast(tk, tk->last_request, 0, RLT_SUCCESS, 0);
+ ticket_broadcast(conf, tk, tk->last_request, 0, RLT_SUCCESS, 0);
} else {
FOREACH_NODE(conf, i, n) {
if (tk->acks_received & n->bitmask) {
continue;
}
n->resend_cnt++;
tk_log_debug("resending %s to %s",
state_to_string(tk->last_request),
site_string(n));
- send_msg(tk->last_request, tk, n, NULL);
+ send_msg(conf, tk->last_request, tk, n, NULL);
}
ticket_activate_timeout(tk);
}
}
static void handle_resends(struct booth_config *conf, struct ticket_config *tk)
{
int ack_cnt;
if (++tk->retry_number > tk->retries) {
tk_log_info("giving up on sending retries");
no_resends(tk);
set_ticket_wakeup(tk);
return;
}
/* try to reach some sites again if we just stepped down */
if (tk->last_request == OP_VOTE_FOR) {
tk_log_warn("no answers to our VtFr request to step down (try #%d), "
"we are alone",
tk->retry_number);
goto just_resend;
}
if (!majority_of_bits(tk, tk->acks_received)) {
ack_cnt = count_bits(tk->acks_received) - 1;
if (!ack_cnt) {
tk_log_warn("no answers to our request (try #%d), "
"we are alone",
tk->retry_number);
} else {
tk_log_warn("not enough answers to our request (try #%d): "
"only got %d answers",
tk->retry_number, ack_cnt);
}
} else {
log_lost_servers(conf, tk);
}
just_resend:
resend_msg(conf, tk);
}
static int postpone_ticket_processing(struct ticket_config *tk)
{
extern timetype start_time;
return tk->start_postpone && (-time_left(&start_time) < tk->timeout);
}
#define has_extprog_exited(tk) ((tk)->clu_test.progstate == EXTPROG_EXITED)
-static void process_next_state(struct ticket_config *tk)
+static void process_next_state(struct booth_config *conf, struct ticket_config *tk)
{
int rv;
switch(tk->next_state) {
case ST_LEADER:
if (has_extprog_exited(tk)) {
if (tk->state == ST_LEADER) {
break;
}
- rv = acquire_ticket(tk, OR_ADMIN);
+ rv = acquire_ticket(conf, tk, OR_ADMIN);
if (rv != 0) { /* external program failed */
tk->outcome = rv;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
}
} else {
log_reacquire_reason(tk);
- acquire_ticket(tk, OR_REACQUIRE);
+ acquire_ticket(conf, tk, OR_REACQUIRE);
}
break;
case ST_INIT:
no_resends(tk);
- start_revoke_ticket(tk);
+ start_revoke_ticket(conf, tk);
tk->outcome = RLT_SUCCESS;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
break;
/* wanting to be follower is not much of an ambition; no
* processing, just return; don't reset start_postpone until
* we got some replies to status */
case ST_FOLLOWER:
return;
default:
break;
}
tk->start_postpone = 0;
}
static void ticket_lost(struct ticket_config *tk)
{
int reason = OR_TKT_LOST;
if (tk->leader != local) {
tk_log_warn("lost at %s", site_string(tk->leader));
} else {
if (is_ext_prog_running(tk)) {
ext_prog_timeout(tk);
reason = OR_LOCAL_FAIL;
} else {
tk_log_warn("lost majority (revoking locally)");
reason = tk->election_reason ? tk->election_reason : OR_REACQUIRE;
}
}
tk->lost_leader = tk->leader;
save_committed_tkt(tk);
mark_ticket_as_revoked_from_leader(tk);
reset_ticket(tk);
set_state(tk, ST_FOLLOWER);
if (local->type == SITE) {
ticket_write(tk);
schedule_election(tk, reason);
}
}
static void next_action(struct booth_config *conf, struct ticket_config *tk)
{
int rv;
switch(tk->state) {
case ST_INIT:
/* init state, handle resends for ticket revoke */
/* and rebroadcast if stepping down */
/* try to acquire ticket on grant */
if (has_extprog_exited(tk)) {
- rv = acquire_ticket(tk, OR_ADMIN);
+ rv = acquire_ticket(conf, tk, OR_ADMIN);
if (rv != 0) { /* external program failed */
tk->outcome = rv;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
}
} else {
if (tk->acks_expected) {
handle_resends(conf, tk);
}
}
break;
case ST_FOLLOWER:
if (!is_manual(tk)) {
/* leader/ticket lost? and we didn't vote yet */
tk_log_debug("leader: %s, voted_for: %s",
site_string(tk->leader),
site_string(tk->voted_for));
if (tk->leader) {
break;
}
if (!tk->voted_for || !tk->in_election) {
disown_ticket(tk);
- if (!new_election(tk, NULL, 1, OR_AGAIN)) {
+ if (!new_election(conf, tk, NULL, 1, OR_AGAIN)) {
ticket_activate_timeout(tk);
}
} else {
/* we should restart elections in case nothing
* happens in the meantime */
tk->in_election = 0;
ticket_activate_timeout(tk);
}
} else {
/* for manual tickets, also try to acquire ticket on grant
* in the Follower state (because we may end up having
* two Leaders) */
if (has_extprog_exited(tk)) {
- rv = acquire_ticket(tk, OR_ADMIN);
+ rv = acquire_ticket(conf, tk, OR_ADMIN);
if (rv != 0) { /* external program failed */
tk->outcome = rv;
- foreach_tkt_req(tk, notify_client);
+ foreach_tkt_req(conf, tk, notify_client);
}
} else {
/* Otherwise, just send ACKs if needed */
if (tk->acks_expected) {
handle_resends(conf, tk);
}
}
}
break;
case ST_CANDIDATE:
/* elections timed out? */
- elections_end(tk);
+ elections_end(conf, tk);
break;
case ST_LEADER:
/* timeout or ticket renewal? */
if (tk->acks_expected) {
handle_resends(conf, tk);
if (majority_of_bits(tk, tk->acks_received)) {
- leader_update_ticket(tk);
+ leader_update_ticket(conf, tk);
}
- } else if (!do_ext_prog(tk, 1)) {
+ } else if (!do_ext_prog(conf, tk, 1)) {
/* this is ticket renewal, run local test */
- ticket_broadcast(tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0);
+ ticket_broadcast(conf, tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0);
tk->ticket_updated = 0;
}
break;
default:
break;
}
}
static void ticket_cron(struct booth_config *conf, struct ticket_config *tk)
{
/* don't process the tickets too early after start */
if (postpone_ticket_processing(tk)) {
tk_log_debug("ticket processing postponed (start_postpone=%d)",
tk->start_postpone);
/* but run again soon */
ticket_activate_timeout(tk);
return;
}
/* no need for status resends, we hope we got at least one
* my_index back */
if (tk->acks_expected == OP_MY_INDEX) {
no_resends(tk);
}
/* after startup, we need to decide what to do based on the
* current ticket state; tk->next_state has a hint
* also used for revokes which had to be delayed
*/
if (tk->next_state) {
- process_next_state(tk);
+ process_next_state(conf, tk);
goto out;
}
/* Has an owner, has an expiry date, and expiry date in the past?
* For automatic tickets, losing the ticket must happen
* in _every_ state.
*/
if (!is_manual(tk) && is_owned(tk) && is_time_set(&tk->term_expires) &&
is_past(&tk->term_expires)) {
ticket_lost(tk);
goto out;
}
next_action(conf, tk);
out:
tk->next_state = 0;
if (!tk->in_election && tk->update_cib) {
ticket_write(tk);
}
}
void process_tickets(struct booth_config *conf)
{
struct ticket_config *tk;
int i;
timetype last_cron;
FOREACH_TICKET(conf, i, tk) {
if (!has_extprog_exited(tk) &&
is_time_set(&tk->next_cron) && !is_past(&tk->next_cron)) {
continue;
}
tk_log_debug("ticket cron");
copy_time(&tk->next_cron, &last_cron);
ticket_cron(conf, tk);
if (time_cmp(&last_cron, &tk->next_cron, ==)) {
tk_log_debug("nobody set ticket wakeup");
set_ticket_wakeup(tk);
}
}
}
void tickets_log_info(struct booth_config *conf)
{
struct ticket_config *tk;
int i;
time_t ts;
FOREACH_TICKET(conf, i, tk) {
ts = wall_ts(&tk->term_expires);
tk_log_info("state '%s' term %d leader %s expires %-24.24s",
state_to_string(tk->state),
tk->current_term,
ticket_leader_string(tk),
ctime(&ts));
}
}
static void update_acks(struct ticket_config *tk, struct booth_site *sender,
struct booth_site *leader, struct boothc_ticket_msg *msg)
{
uint32_t cmd;
uint32_t req;
cmd = ntohl(msg->header.cmd);
req = ntohl(msg->header.request);
if (req != tk->last_request ||
(tk->acks_expected != cmd && tk->acks_expected != OP_REJECTED)) {
return;
}
/* got an ack! */
tk->acks_received |= sender->bitmask;
if (all_replied(tk) ||
/* we just stepped down, need only one site to start elections */
(cmd == OP_REQ_VOTE && tk->last_request == OP_VOTE_FOR)) {
no_resends(tk);
tk->start_postpone = 0;
set_ticket_wakeup(tk);
}
}
/* read ticket message */
int ticket_recv(struct booth_config *conf, void *buf, struct booth_site *source)
{
struct boothc_ticket_msg *msg;
struct ticket_config *tk;
struct booth_site *leader;
uint32_t leader_u;
msg = (struct boothc_ticket_msg *)buf;
if (!check_ticket(conf, msg->ticket.id, &tk)) {
log_warn("got invalid ticket name %s from %s",
msg->ticket.id, site_string(source));
source->invalid_cnt++;
return -EINVAL;
}
leader_u = ntohl(msg->ticket.leader);
if (!find_site_by_id(conf, leader_u, &leader)) {
tk_log_error("message with unknown leader %u received", leader_u);
source->invalid_cnt++;
return -EINVAL;
}
update_acks(tk, source, leader, msg);
- return raft_answer(tk, source, leader, msg);
+ return raft_answer(conf, tk, source, leader, msg);
}
static void log_next_wakeup(struct ticket_config *tk)
{
int left;
left = time_left(&tk->next_cron);
tk_log_debug("set ticket wakeup in " intfmt(left));
}
/* New vote round; §5.2 */
/* delay the next election start for some random time
* (up to 1 second)
*/
void add_random_delay(struct ticket_config *tk)
{
timetype tv;
interval_add(&tk->next_cron, rand_time(min(1000, tk->timeout)), &tv);
ticket_next_cron_at(tk, &tv);
if (ANYDEBUG) {
log_next_wakeup(tk);
}
}
void set_ticket_wakeup(struct ticket_config *tk)
{
timetype near_future, tv, next_vote;
set_future_time(&near_future, 10);
if (!is_manual(tk)) {
/* At least every hour, perhaps sooner (default) */
tk_log_debug("ticket will be woken up after up to one hour");
ticket_next_cron_in(tk, 3600*TIME_RES);
switch (tk->state) {
case ST_LEADER:
assert(tk->leader == local);
get_next_election_time(tk, &next_vote);
/* If timestamp is in the past, wakeup in
* near future */
if (!is_time_set(&next_vote)) {
tk_log_debug("next ts unset, wakeup soon");
ticket_next_cron_at(tk, &near_future);
} else if (is_past(&next_vote)) {
int tdiff = time_left(&next_vote);
tk_log_debug("next ts in the past " intfmt(tdiff));
ticket_next_cron_at(tk, &near_future);
} else {
ticket_next_cron_at(tk, &next_vote);
}
break;
case ST_CANDIDATE:
assert(is_time_set(&tk->election_end));
ticket_next_cron_at(tk, &tk->election_end);
break;
case ST_INIT:
case ST_FOLLOWER:
/* If there is (or should be) some owner, check on it later on.
* If no one is interested - don't care. */
if (is_owned(tk)) {
interval_add(&tk->term_expires, tk->acquire_after, &tv);
ticket_next_cron_at(tk, &tv);
}
break;
default:
tk_log_error("unknown ticket state: %d", tk->state);
}
if (tk->next_state) {
/* we need to do something soon here */
if (!tk->acks_expected) {
ticket_next_cron_at(tk, &near_future);
} else {
ticket_activate_timeout(tk);
}
}
} else {
/* At least six minutes, to make sure that multi-leader situations
* will be solved promptly.
*/
tk_log_debug("manual ticket will be woken up after up to six minutes");
ticket_next_cron_in(tk, 60 * TIME_RES);
/* For manual tickets, no earlier timeout could be set in a similar
* way as it is done in a switch above for automatic tickets.
* The reason is that term's timeout is INF and no Raft-based elections
* are performed.
*/
}
if (ANYDEBUG) {
log_next_wakeup(tk);
}
}
void schedule_election(struct ticket_config *tk, cmd_reason_t reason)
{
if (local->type != SITE) {
return;
}
tk->election_reason = reason;
get_time(&tk->next_cron);
/* introduce a short delay before starting election */
add_random_delay(tk);
}
int is_manual(struct ticket_config *tk)
{
return (tk->mode == TICKET_MODE_MANUAL) ? 1 : 0;
}
/* Given a state (in host byte order), return a human-readable (char*).
* An array is used so that multiple states can be printed in a single printf(). */
char *state_to_string(uint32_t state_ho)
{
union mu { cmd_request_t s; char c[5]; };
static union mu cache[6] = { { 0 } }, *cur;
static int current = 0;
current ++;
if (current >= sizeof(cache)/sizeof(cache[0])) {
current = 0;
}
cur = cache + current;
cur->s = htonl(state_ho);
/* Shouldn't be necessary, union array is initialized with zeroes, and
* these bytes never get written. */
cur->c[4] = 0;
return cur->c;
}
-int send_reject(struct booth_site *dest, struct ticket_config *tk,
- cmd_result_t code, struct boothc_ticket_msg *in_msg)
+int send_reject(struct booth_config *conf, struct booth_site *dest,
+ struct ticket_config *tk, cmd_result_t code,
+ struct boothc_ticket_msg *in_msg)
{
int req = ntohl(in_msg->header.cmd);
struct boothc_ticket_msg msg;
tk_log_debug("sending reject to %s", site_string(dest));
init_ticket_msg(&msg, OP_REJECTED, req, code, 0, tk);
- return booth_udp_send_auth(dest, &msg, sendmsglen(&msg));
+ return booth_udp_send_auth(conf, dest, &msg, sendmsglen(&msg));
}
-int send_msg(int cmd, struct ticket_config *tk, struct booth_site *dest,
- struct boothc_ticket_msg *in_msg)
+int send_msg(struct booth_config *conf, int cmd, struct ticket_config *tk,
+ struct booth_site *dest, struct boothc_ticket_msg *in_msg)
{
int req = 0;
struct ticket_config *valid_tk = tk;
struct boothc_ticket_msg msg;
/* if we want to send the last valid ticket, then if we're in
* the ST_CANDIDATE state, the last valid ticket is in
* tk->last_valid_tk
*/
if (cmd == OP_MY_INDEX) {
if (tk->state == ST_CANDIDATE && tk->last_valid_tk) {
valid_tk = tk->last_valid_tk;
}
tk_log_info("sending status to %s", site_string(dest));
}
if (in_msg) {
req = ntohl(in_msg->header.cmd);
}
init_ticket_msg(&msg, cmd, req, RLT_SUCCESS, 0, valid_tk);
- return booth_udp_send_auth(dest, &msg, sendmsglen(&msg));
+ return booth_udp_send_auth(conf, dest, &msg, sendmsglen(&msg));
}
diff --git a/src/ticket.h b/src/ticket.h
index 454ebaf..51e18b6 100644
--- a/src/ticket.h
+++ b/src/ticket.h
@@ -1,249 +1,284 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _TICKET_H
#define _TICKET_H
#include <time.h>
#include <sys/time.h>
#include <math.h>
#include "timer.h"
#include "config.h"
#include "log.h"
extern int TIME_RES;
#define DEFAULT_TICKET_EXPIRY (600*TIME_RES)
#define DEFAULT_TICKET_TIMEOUT (5*TIME_RES)
#define DEFAULT_RETRIES 10
#define FOREACH_TICKET(b_, i_, t_) \
for (i_ = 0; \
(t_ = (b_)->ticket + i_, i_ < (b_)->ticket_count); \
i_++)
#define FOREACH_NODE(b_, i_, n_) \
for (i_ = 0; \
(n_ = (b_)->site + i_, i_ < (b_)->site_count); \
i_++)
#define _FOREACH_TICKET(i_, t_) \
for (i_ = 0; \
(t_ = booth_conf->ticket + i_, i_ < booth_conf->ticket_count); \
i_++)
#define _FOREACH_NODE(i_, n_) \
for (i_ = 0; \
(n_ = booth_conf->site + i_, i_ < booth_conf->site_count); \
i_++)
#define set_leader(tk, who) do { \
if (who == NULL) { \
mark_ticket_as_revoked_from_leader(tk); \
} \
\
tk->leader = who; \
tk_log_debug("ticket leader set to %s", ticket_leader_string(tk)); \
\
if (tk->leader) { \
mark_ticket_as_granted(tk, tk->leader); \
} \
} while(0)
#define mark_ticket_as_granted(tk, who) do { \
if (is_manual(tk) && (who->index > -1)) { \
tk->sites_where_granted[who->index] = 1; \
tk_log_debug("manual ticket marked as granted to %s", ticket_leader_string(tk)); \
} \
} while(0)
#define mark_ticket_as_revoked(tk, who) do { \
if (is_manual(tk) && who && (who->index > -1)) { \
tk->sites_where_granted[who->index] = 0; \
tk_log_debug("manual ticket marked as revoked from %s", site_string(who)); \
} \
} while(0)
#define mark_ticket_as_revoked_from_leader(tk) do { \
if (tk->leader) { \
mark_ticket_as_revoked(tk, tk->leader); \
} \
} while(0)
#define set_state(tk, newst) do { \
tk_log_debug("state transition: %s -> %s", \
state_to_string(tk->state), state_to_string(newst)); \
tk->state = newst; \
} while(0)
#define set_next_state(tk, newst) do { \
if (!(newst)) tk_log_debug("next state reset"); \
else tk_log_debug("next state set to %s", state_to_string(newst)); \
tk->next_state = newst; \
} while(0)
#define is_term_invalid(tk, term) \
((tk)->last_valid_tk && (tk)->last_valid_tk->current_term > (term))
void save_committed_tkt(struct ticket_config *tk);
void disown_ticket(struct ticket_config *tk);
/**
* @internal
* Like @find_ticket_by_name, but perform sanity checks on the found ticket
*
* @param[in,out] conf config object to refer to
* @param[in] ticket name of the ticket to search for
* @param[out] found place the reference here when found
*
* @return 0 on failure, see @find_ticket_by_name otherwise
*/
int check_ticket(struct booth_config *conf, char *ticket, struct ticket_config **tc);
int grant_ticket(struct ticket_config *ticket);
int revoke_ticket(struct ticket_config *ticket);
/**
* @internal
* Second stage of incoming datagram handling (after authentication)
*
* @param[in,out] conf config object to refer to
* @param[in] buf raw message to act upon
* @param[in] source member originating this message
*
* @return 0 on success or negative value (-1 or -errno) on error
*/
int ticket_recv(struct booth_config *conf, void *buf, struct booth_site *source);
void reset_ticket(struct ticket_config *tk);
void reset_ticket_and_set_no_leader(struct ticket_config *tk);
void update_ticket_state(struct ticket_config *tk, struct booth_site *sender);
/**
* @internal
* Broadcast the initial state query
*
* @param[in,out] conf config object to use as a starting point
*
* @return 0 (for the time being)
*/
int setup_ticket(struct booth_config *conf);
int check_max_len_valid(const char *s, int max);
/**
* @internal
* Find a ticket based on a given name
*
* @param[in,out] conf config object to refer to
* @param[in] ticket name of the ticket to search for
* @param[out] found place the reference here when found
*
* @return see @list_ticket and @send_header_plus
*/
int find_ticket_by_name(struct booth_config *conf,
const char *ticket, struct ticket_config **found);
void set_ticket_wakeup(struct ticket_config *tk);
/**
* @internal
* Implementation of ticket listing
*
* @param[in,out] conf config object to refer to
* @param[in] fd file descriptor of the socket to respond to
*
* @return see @list_ticket and @send_header_plus
*/
int ticket_answer_list(struct booth_config *conf, int fd);
/**
* @internal
* Process request from the client (as opposed to the peer daemon)
*
* @param[in,out] conf config object to refer to
* @param[in] req_client client structure of the sender
* @param[in] buf client message
*
* @return 1 on success, or 0 when not yet done with the message
*/
int process_client_request(struct booth_config *conf, struct client *req_client,
void *buf);
int ticket_write(struct ticket_config *tk);
/**
* @internal
* Mainloop of booth ticket handling
*
* @param[in,out] conf config object to refer to
*/
void process_tickets(struct booth_config *conf);
/**
* @internal
* Log properties of all tickets
*
* @param[in,out] conf config object to refer to
*/
void tickets_log_info(struct booth_config *conf);
char *state_to_string(uint32_t state_ho);
-int send_reject(struct booth_site *dest, struct ticket_config *tk,
- cmd_result_t code, struct boothc_ticket_msg *in_msg);
-int send_msg (int cmd, struct ticket_config *tk,
- struct booth_site *dest, struct boothc_ticket_msg *in_msg);
-int notify_client(struct ticket_config *tk, int client_fd,
- struct boothc_ticket_msg *msg);
-int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_request_t expected_reply, cmd_result_t res, cmd_reason_t reason);
-
-int leader_update_ticket(struct ticket_config *tk);
+
+/**
+ * @internal
+ * For a given ticket and recipient site, send a rejection
+ *
+ * @param[in,out] conf config object to refer to
+ * @param[in] dest site structure of the recipient
+ * @param[in] tk ticket at hand
+ * @param[in] code further detail for the rejection
+ * @param[in] in_msg message this is going to be a response to
+ */
+int send_reject(struct booth_config *conf, struct booth_site *dest,
+ struct ticket_config *tk, cmd_result_t code,
+ struct boothc_ticket_msg *in_msg);
+
+/**
+ * @internal
+ * For a given ticket, recipient site and possibly its message, send a response
+ *
+ * @param[in,out] conf config object to refer to
+ * @param[in] cmd what type of message is to be sent
+ * @param[in] dest site structure of the recipient
+ * @param[in] in_msg message this is going to be a response to
+ */
+int send_msg(struct booth_config *conf, int cmd, struct ticket_config *tk,
+ struct booth_site *dest, struct boothc_ticket_msg *in_msg);
+
+/**
+ * @internal
+ * Notify client at particular socket, regarding particular ticket
+ *
+ * @param[in,out] conf config object to refer to
+ * @param[in] tk ticket at hand
+ * @param[in] fd file descriptor of the socket to respond to
+ * @param[in] msg input message being responded to
+ */
+int notify_client(struct booth_config *conf, struct ticket_config *tk,
+ int client_fd, struct boothc_ticket_msg *msg);
+
+int ticket_broadcast(struct booth_config *conf, struct ticket_config *tk,
+ cmd_request_t cmd, cmd_request_t expected_reply,
+ cmd_result_t res, cmd_reason_t reason);
+
+int leader_update_ticket(struct booth_config *conf, struct ticket_config *tk);
void add_random_delay(struct ticket_config *tk);
void schedule_election(struct ticket_config *tk, cmd_reason_t reason);
int is_manual(struct ticket_config *tk);
int check_attr_prereq(struct ticket_config *tk, grant_type_e grant_type);
static inline void ticket_next_cron_at(struct ticket_config *tk, timetype *when)
{
copy_time(when, &tk->next_cron);
}
static inline void ticket_next_cron_in(struct ticket_config *tk, int interval)
{
timetype tv;
set_future_time(&tv, interval);
ticket_next_cron_at(tk, &tv);
}
static inline void ticket_activate_timeout(struct ticket_config *tk)
{
/* TODO: increase timeout when no answers */
tk_log_debug("activate ticket timeout in %d", tk->timeout);
ticket_next_cron_in(tk, tk->timeout);
}
#endif /* _TICKET_H */
diff --git a/src/transport.c b/src/transport.c
index 49217f6..63f3acc 100644
--- a/src/transport.c
+++ b/src/transport.c
@@ -1,1152 +1,1165 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "b_config.h"
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <netdb.h> /* getnameinfo */
#include <poll.h>
#include <arpa/inet.h>
#include <asm/types.h>
#include <linux/rtnetlink.h>
#include <net/if.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h> /* getnameinfo */
#include "attr.h"
#include "auth.h"
#include "booth.h"
#include "config.h"
#include "inline-fn.h"
#include "log.h"
#include "ticket.h"
#include "transport.h"
#define BOOTH_IPADDR_LEN (sizeof(struct in6_addr))
#define NETLINK_BUFSIZE 16384
#define SOCKET_BUFFER_SIZE 160000
#define FRAME_SIZE_MAX 10000
struct booth_site *local = NULL;
/* function to be called when handling booth-group-internal messages;
* it's expected to return 0 to indicate success, negative integer
* to indicate silent (or possibly already complained about) error,
* or positive integer to indicate sender's ID that will then be
* emitted in the error log message together with the real source
* address if this is available */
static int (*deliver_fn) (struct booth_config *conf, void *msg, int msglen);
static void parse_rtattr(struct rtattr *tb[],
int max, struct rtattr *rta, int len)
{
while (RTA_OK(rta, len)) {
if (rta->rta_type <= max)
tb[rta->rta_type] = rta;
rta = RTA_NEXT(rta,len);
}
}
enum match_type {
NO_MATCH = 0,
FUZZY_MATCH,
EXACT_MATCH,
};
static int find_address(struct booth_config *conf,
unsigned char ipaddr[BOOTH_IPADDR_LEN],
int family, int prefixlen,
int fuzzy_allowed,
struct booth_site **me,
int *address_bits_matched)
{
int i;
struct booth_site *node;
int bytes, bits_left, mask;
unsigned char node_bits, ip_bits;
uint8_t *n_a;
int matched;
enum match_type did_match = NO_MATCH;
assert(conf != NULL);
bytes = prefixlen / 8;
bits_left = prefixlen % 8;
/* One bit left to check means ignore 7 lowest bits. */
mask = ~( (1 << (8 - bits_left)) -1);
FOREACH_NODE(conf, i, node) {
if (family != node->family)
continue;
n_a = node_to_addr_pointer(node);
for(matched = 0; matched < node->addrlen; matched++)
if (ipaddr[matched] != n_a[matched])
break;
if (matched == node->addrlen) {
*address_bits_matched = matched * 8;
*me = node;
did_match = EXACT_MATCH;
break;
}
if (!fuzzy_allowed)
continue;
/* Check prefix, whole bytes */
if (matched < bytes)
continue;
if (matched * 8 < *address_bits_matched)
continue;
node_bits = n_a[bytes];
ip_bits = ipaddr[bytes];
if (((node_bits ^ ip_bits) & mask) == 0) {
/* _At_least_ prefixlen bits matched. */
if (did_match < EXACT_MATCH) {
*address_bits_matched = prefixlen;
*me = node;
did_match = FUZZY_MATCH;
}
}
}
return did_match;
}
static int _find_myself(struct booth_config *conf, int family,
struct booth_site **mep, int fuzzy_allowed)
{
int fd;
struct sockaddr_nl nladdr;
struct booth_site *me;
unsigned char ipaddr[BOOTH_IPADDR_LEN];
static char rcvbuf[NETLINK_BUFSIZE];
struct {
struct nlmsghdr nlh;
struct rtgenmsg g;
} req;
int address_bits_matched;
if (local)
goto found;
me = NULL;
address_bits_matched = 0;
if (mep)
*mep = NULL;
fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE);
if (fd < 0) {
log_error("failed to create netlink socket");
return 0;
}
(void)setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf));
memset(&nladdr, 0, sizeof(nladdr));
nladdr.nl_family = AF_NETLINK;
memset(&req, 0, sizeof(req));
req.nlh.nlmsg_len = sizeof(req);
req.nlh.nlmsg_type = RTM_GETADDR;
req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST;
req.nlh.nlmsg_pid = 0;
req.nlh.nlmsg_seq = 1;
req.g.rtgen_family = family;
if (sendto(fd, (void *)&req, sizeof(req), 0,
(struct sockaddr*)&nladdr, sizeof(nladdr)) < 0) {
close(fd);
log_error("failed to send data to netlink socket");
return 0;
}
while (1) {
int status;
struct nlmsghdr *h;
struct iovec iov = { rcvbuf, sizeof(rcvbuf) };
struct msghdr msg = {
(void *)&nladdr, sizeof(nladdr),
&iov, 1,
NULL, 0,
0
};
status = recvmsg(fd, &msg, 0);
if (!status) {
close(fd);
log_error("failed to recvmsg from netlink socket");
return 0;
}
for (h = (struct nlmsghdr *)rcvbuf; NLMSG_OK(h, status); h = NLMSG_NEXT(h, status)) {
if (h->nlmsg_type == NLMSG_DONE)
goto out;
if (h->nlmsg_type == NLMSG_ERROR) {
close(fd);
log_error("netlink socket recvmsg error");
return 0;
}
if (h->nlmsg_type == RTM_NEWADDR) {
struct ifaddrmsg *ifa = NLMSG_DATA(h);
struct rtattr *tb[IFA_MAX+1];
int len = h->nlmsg_len
- NLMSG_LENGTH(sizeof(*ifa));
memset(tb, 0, sizeof(tb));
parse_rtattr(tb, IFA_MAX, IFA_RTA(ifa), len);
memset(ipaddr, 0, BOOTH_IPADDR_LEN);
/* prefer IFA_LOCAL if it exists, for p-t-p
* interfaces, otherwise use IFA_ADDRESS */
if (tb[IFA_LOCAL]) {
memcpy(ipaddr, RTA_DATA(tb[IFA_LOCAL]),
BOOTH_IPADDR_LEN);
} else if (tb[IFA_ADDRESS]) {
memcpy(ipaddr, RTA_DATA(tb[IFA_ADDRESS]),
BOOTH_IPADDR_LEN);
} else {
log_error("failed to copy netlink addr");
close(fd);
return 0;
}
/* Try to find the exact address or the address with subnet matching.
* The function find_address will be called for each address received
* from NLMSG_DATA above.
* The exact match will be preferred. If no exact match is found,
* the function find_address will try to return another, most similar
* address (with the longest possible number of same bytes). */
if (ifa->ifa_prefixlen > address_bits_matched) {
find_address(conf, ipaddr,
ifa->ifa_family, ifa->ifa_prefixlen,
fuzzy_allowed, &me, &address_bits_matched);
if (me) {
log_debug("found myself at %s (%d bits matched)",
site_string(me), address_bits_matched);
}
}
/* If the previous NLMSG_DATA calls have already allowed us
* to find an address with address_bits_matched matching bits,
* then no other better non-exact address can be found.
* But we can still try to find an exact match, so let us
* call the function find_address with disabled searching of
* similar addresses (fuzzy_allowed == 0) */
else if (ifa->ifa_prefixlen == address_bits_matched) {
find_address(conf, ipaddr,
ifa->ifa_family, ifa->ifa_prefixlen,
0 /* fuzzy_allowed */, &me, &address_bits_matched);
if (me) {
log_debug("found myself at %s (exact match)",
site_string(me));
}
}
}
}
}
out:
close(fd);
if (!me)
return 0;
me->local = 1;
local = me;
found:
if (mep)
*mep = local;
return 1;
}
int find_myself(struct booth_config *conf, struct booth_site **mep,
int fuzzy_allowed)
{
return _find_myself(conf, AF_INET6, mep, fuzzy_allowed) ||
_find_myself(conf, AF_INET, mep, fuzzy_allowed);
}
/** Checks the header fields for validity.
* cf. init_header().
* For @len_incl_data < 0 the length is not checked.
* Return <0 if error, else bytes read. */
int check_boothc_header(struct boothc_header *h, int len_incl_data)
{
int l;
if (h->magic != htonl(BOOTHC_MAGIC)) {
log_error("magic error %x", ntohl(h->magic));
return -EINVAL;
}
if (h->version != htonl(BOOTHC_VERSION)) {
log_error("version error %x", ntohl(h->version));
return -EINVAL;
}
l = ntohl(h->length);
if (l < sizeof(*h)) {
log_error("length %d out of range", l);
return -EINVAL;
}
if (len_incl_data < 0)
return 0;
if (l != len_incl_data) {
log_error("length error - got %d, wanted %d",
len_incl_data, l);
return -EINVAL;
}
return len_incl_data;
}
static int do_read(int fd, void *buf, size_t count)
{
int rv, off = 0;
while (off < count) {
rv = read(fd, (char *)buf + off, count - off);
if (rv == 0)
return -1;
if (rv == -1 && errno == EINTR)
continue;
if (rv == -1 && errno == EWOULDBLOCK)
break;
if (rv == -1)
return -1;
off += rv;
}
return off;
}
static int do_write(int fd, void *buf, size_t count)
{
int rv, off = 0;
retry:
rv = send(fd, (char *)buf + off, count, MSG_NOSIGNAL);
if (rv == -1 && errno == EINTR)
goto retry;
/* If we cannot write _any_ data, we'd be in an (potential) loop. */
if (rv <= 0) {
log_error("send failed: %s (%d)", strerror(errno), errno);
return rv;
}
if (rv != count) {
count -= rv;
off += rv;
goto retry;
}
return 0;
}
/* Only used for client requests (tcp) */
int read_client(struct client *req_cl)
{
char *msg;
struct boothc_header *header;
int rv, fd;
int len = MAX_MSG_LEN;
if (!req_cl->msg) {
msg = malloc(MAX_MSG_LEN);
if (!msg) {
log_error("out of memory for client messages");
return -1;
}
memset(msg, 0, MAX_MSG_LEN);
req_cl->msg = (void *)msg;
} else {
msg = (char *)req_cl->msg;
}
header = (struct boothc_header *)msg;
/* update len if we read enough */
if (req_cl->offset >= sizeof(*header)) {
len = min(ntohl(header->length), MAX_MSG_LEN);
}
fd = req_cl->fd;
rv = do_read(fd, msg+req_cl->offset, len-req_cl->offset);
if (rv < 0) {
if (errno == ECONNRESET)
log_debug("client connection reset for fd %d", fd);
return -1;
}
req_cl->offset += rv;
/* update len if we read enough */
if (req_cl->offset >= sizeof(*header)) {
len = min(ntohl(header->length), MAX_MSG_LEN);
}
if (req_cl->offset < len) {
/* client promised to send more */
return 1;
}
if (check_boothc_header(header, len) < 0) {
return -1;
}
return 0;
}
/* Only used for client requests (tcp) */
static void process_connection(struct booth_config *conf, int ci)
{
struct client *req_cl;
void *msg = NULL;
struct boothc_header *header;
struct boothc_hdr_msg err_reply;
cmd_result_t errc;
void (*deadfn) (int ci);
req_cl = clients + ci;
switch (read_client(req_cl)) {
case -1: /* error */
goto kill;
case 1: /* more to read */
return;
case 0:
/* we can process the request now */
msg = req_cl->msg;
}
header = (struct boothc_header *)msg;
if (check_auth(NULL, msg, ntohl(header->length))) {
errc = RLT_AUTH;
goto send_err;
}
/* For CMD_GRANT and CMD_REVOKE:
* Don't close connection immediately, but send
* result a second later? */
switch (ntohl(header->cmd)) {
case CMD_LIST:
ticket_answer_list(conf, req_cl->fd);
goto kill;
case CMD_PEERS:
- list_peers(req_cl->fd);
+ list_peers(conf, req_cl->fd);
goto kill;
case CMD_GRANT:
case CMD_REVOKE:
if (process_client_request(conf, req_cl, msg) == 1) {
goto kill; /* request processed definitely, close connection */
} else {
return;
}
case ATTR_LIST:
case ATTR_GET:
case ATTR_SET:
case ATTR_DEL:
if (process_attr_request(conf, req_cl, msg) == 1) {
goto kill; /* request processed definitely, close connection */
} else {
return;
}
default:
log_error("connection %d cmd %x unknown",
ci, ntohl(header->cmd));
errc = RLT_INVALID_ARG;
goto send_err;
}
assert(0);
return;
send_err:
init_header(&err_reply.header, CL_RESULT, 0, 0, errc, 0, sizeof(err_reply));
- send_client_msg(req_cl->fd, &err_reply);
+ send_client_msg(conf, req_cl->fd, &err_reply);
kill:
deadfn = req_cl->deadfn;
if(deadfn) {
deadfn(ci);
}
return;
}
static void process_tcp_listener(struct booth_config *conf, int ci)
{
int fd, i, flags, one = 1;
socklen_t addrlen = sizeof(struct sockaddr);
struct sockaddr addr;
fd = accept(clients[ci].fd, &addr, &addrlen);
if (fd < 0) {
log_error("process_tcp_listener: accept error %d %d",
fd, errno);
return;
}
(void)setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one));
flags = fcntl(fd, F_GETFL, 0);
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
log_error("process_tcp_listener: fcntl O_NONBLOCK error %d %d",
fd, errno);
(void)close(fd);
return;
}
i = client_add(fd, clients[ci].transport,
process_connection, NULL);
log_debug("client connection %d fd %d", i, fd);
}
int setup_tcp_listener(int test_only)
{
int s, rv;
int one = 1;
s = socket(local->family, SOCK_STREAM, 0);
if (s == -1) {
log_error("failed to create tcp socket %s", strerror(errno));
return s;
}
rv = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one));
if (rv == -1) {
close(s);
log_error("failed to set the SO_REUSEADDR option");
return rv;
}
rv = bind(s, &local->sa6, local->saddrlen);
if (test_only) {
rv = (rv == -1) ? errno : 0;
close(s);
return rv;
}
if (rv == -1) {
close(s);
log_error("failed to bind socket %s", strerror(errno));
return rv;
}
rv = listen(s, 5);
if (rv == -1) {
close(s);
log_error("failed to listen on socket %s", strerror(errno));
return rv;
}
return s;
}
static int booth_tcp_init(void * unused __attribute__((unused)))
{
int rv;
if (get_local_id() < 0)
return -1;
rv = setup_tcp_listener(0);
if (rv < 0)
return rv;
client_add(rv, booth_transport + TCP,
process_tcp_listener, NULL);
return 0;
}
static int connect_nonb(int sockfd, const struct sockaddr *saptr,
socklen_t salen, int sec)
{
int flags, n, error;
socklen_t len;
fd_set rset, wset;
struct timeval tval;
flags = fcntl(sockfd, F_GETFL, 0);
if (fcntl(sockfd, F_SETFL, flags | O_NONBLOCK) == -1) {
log_error("fcntl: Can't set sockfd to nonblocking mode");
return -1;
}
error = 0;
if ( (n = connect(sockfd, saptr, salen)) < 0)
if (errno != EINPROGRESS)
return -1;
if (n == 0)
goto done; /* connect completed immediately */
FD_ZERO(&rset);
FD_SET(sockfd, &rset);
wset = rset;
tval.tv_sec = sec;
tval.tv_usec = 0;
if (select(sockfd + 1, &rset, &wset, NULL, sec ? &tval : NULL) == 0) {
/* leave outside function to close */
/* timeout */
/* close(sockfd); */
errno = ETIMEDOUT;
return -1;
}
if (FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) {
len = sizeof(error);
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0)
return -1; /* Solaris pending error */
} else {
log_error("select error: sockfd not set");
return -1;
}
done:
/* restore file status flags */
if (fcntl(sockfd, F_SETFL, flags) == -1) {
log_error("fcntl: Can't restore sockfd flags");
return -1;
}
if (error) {
/* leave outside function to close */
/* close(sockfd); */
errno = error;
return -1;
}
return 0;
}
-int booth_tcp_open(struct booth_site *to)
+static int booth_tcp_open(struct booth_site *to)
{
int s, rv;
if (to->tcp_fd >= STDERR_FILENO)
goto found;
s = socket(to->family, SOCK_STREAM, 0);
if (s == -1) {
log_error("cannot create socket of family %d", to->family);
return -1;
}
rv = connect_nonb(s, (struct sockaddr *)&to->sa6, to->saddrlen, 10);
if (rv == -1) {
if( errno == ETIMEDOUT)
log_error("connect to %s got a timeout", site_string(to));
else
log_error("connect to %s got an error: %s", site_string(to),
strerror(errno));
goto error;
}
to->tcp_fd = s;
found:
return 1;
error:
if (s >= 0)
close(s);
return -1;
}
-int booth_tcp_send(struct booth_site *to, void *buf, int len)
+/* data + (datalen-sizeof(struct hmac)) points to struct hmac
+ * i.e. struct hmac is always tacked on the payload
+ */
+static int add_hmac(struct booth_config *conf, void *data, int len)
+{
+ int rv = 0;
+#if HAVE_LIBGNUTLS || HAVE_LIBGCRYPT || HAVE_LIBMHASH
+ int payload_len;
+ struct hmac *hp;
+
+ if (!is_auth_req()) {
+ return 0;
+ }
+
+ payload_len = len - sizeof(struct hmac);
+ hp = (struct hmac *)((unsigned char *)data + payload_len);
+ hp->hid = htonl(BOOTH_HASH);
+ memset(hp->hash, 0, BOOTH_MAC_SIZE);
+ rv = calc_hmac(data, payload_len, BOOTH_HASH, hp->hash,
+ conf->authkey, conf->authkey_len);
+ if (rv < 0) {
+ log_error("internal error: cannot calculate mac");
+ }
+#endif
+ return rv;
+}
+
+static int booth_tcp_send(struct booth_config *conf, struct booth_site *to,
+ void *buf, int len)
{
int rv;
- rv = add_hmac(buf, len);
- if (!rv)
+ rv = add_hmac(conf, buf, len);
+ if (!rv) {
rv = do_write(to->tcp_fd, buf, len);
+ }
return rv;
}
static int booth_tcp_recv(struct booth_site *from, void *buf, int len)
{
int got;
/* Needs timeouts! */
got = do_read(from->tcp_fd, buf, len);
if (got < 0) {
log_error("read failed (%d): %s", errno, strerror(errno));
return got;
}
return got;
}
static int booth_tcp_recv_auth(struct booth_site *from, void *buf, int len)
{
int got, total;
int payload_len;
/* Needs timeouts! */
payload_len = len - sizeof(struct hmac);
got = booth_tcp_recv(from, buf, payload_len);
if (got < 0) {
return got;
}
total = got;
if (is_auth_req()) {
got = booth_tcp_recv(from, (unsigned char *)buf+payload_len, sizeof(struct hmac));
if (got != sizeof(struct hmac) || check_auth(from, buf, len)) {
return -1;
}
total += got;
}
return total;
}
static int booth_tcp_close(struct booth_site *to)
{
if (to) {
if (to->tcp_fd > STDERR_FILENO)
close(to->tcp_fd);
to->tcp_fd = -1;
}
return 0;
}
static int booth_tcp_exit(void)
{
return 0;
}
static int setup_udp_server(void)
{
int rv, fd;
int one = 1;
unsigned int recvbuf_size;
fd = socket(local->family, SOCK_DGRAM, 0);
if (fd == -1) {
log_error("failed to create UDP socket %s", strerror(errno));
goto ex;
}
rv = fcntl(fd, F_SETFL, O_NONBLOCK);
if (rv == -1) {
log_error("failed to set non-blocking operation "
"on UDP socket: %s", strerror(errno));
goto ex;
}
rv = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one));
if (rv == -1) {
log_error("failed to set the SO_REUSEADDR option");
goto ex;
}
rv = bind(fd, (struct sockaddr *)&local->sa6, local->saddrlen);
if (rv == -1) {
log_error("failed to bind UDP socket to [%s]:%d: %s",
site_string(local), site_port(local),
strerror(errno));
goto ex;
}
recvbuf_size = SOCKET_BUFFER_SIZE;
rv = setsockopt(fd, SOL_SOCKET, SO_RCVBUF,
&recvbuf_size, sizeof(recvbuf_size));
if (rv == -1) {
log_error("failed to set recvbuf size");
goto ex;
}
local->udp_fd = fd;
return 0;
ex:
if (fd >= 0)
close(fd);
return -1;
}
/* Receive/process callback for UDP */
static void process_recv(struct booth_config *conf, int ci)
{
struct sockaddr_storage sa;
int rv;
socklen_t sa_len;
/* beware, the buffer needs to be large enough to accept
* a packet */
char buffer[MAX_MSG_LEN];
/* Used for unit tests */
struct boothc_ticket_msg *msg;
sa_len = sizeof(sa);
msg = (void*)buffer;
rv = recvfrom(clients[ci].fd,
buffer, sizeof(buffer),
MSG_NOSIGNAL | MSG_DONTWAIT,
(struct sockaddr *)&sa, &sa_len);
if (rv == -1)
return;
rv = deliver_fn(conf, (void*) msg, rv);
if (rv > 0) {
if (getnameinfo((struct sockaddr *)&sa, sa_len,
buffer, sizeof(buffer), NULL, 0,
NI_NUMERICHOST) == 0)
log_error("unknown sender: %08x (real: %s)", rv, buffer);
else
log_error("unknown sender: %08x", rv);
}
}
static int booth_udp_init(void *f)
{
int rv;
rv = setup_udp_server();
if (rv < 0)
return rv;
deliver_fn = f;
client_add(local->udp_fd,
booth_transport + UDP,
process_recv, NULL);
return 0;
}
-int booth_udp_send(struct booth_site *to, void *buf, int len)
+static int booth_udp_send(struct booth_config *conf, struct booth_site *to,
+ void *buf, int len)
{
int rv;
to->sent_cnt++;
rv = sendto(local->udp_fd, buf, len, MSG_NOSIGNAL,
(struct sockaddr *)&to->sa6, to->saddrlen);
if (rv == len) {
rv = 0;
} else if (rv < 0) {
to->sent_err_cnt++;
log_error("Cannot send to %s: %d %s",
site_string(to),
errno,
strerror(errno));
} else {
rv = -1;
to->sent_err_cnt++;
log_error("Packet sent to %s got truncated",
site_string(to));
}
return rv;
}
-int booth_udp_send_auth(struct booth_site *to, void *buf, int len)
+int booth_udp_send_auth(struct booth_config *conf, struct booth_site *to,
+ void *buf, int len)
{
int rv;
- rv = add_hmac(buf, len);
- if (rv < 0)
+ rv = add_hmac(conf, buf, len);
+ if (rv < 0) {
return rv;
- return booth_udp_send(to, buf, len);
+ }
+
+ return booth_udp_send(conf, to, buf, len);
}
-static int booth_udp_broadcast_auth(void *buf, int len)
+static int booth_udp_broadcast_auth(struct booth_config *conf, void *buf, int len)
{
int i, rv, rvs;
struct booth_site *site;
-
- if (!booth_conf || !booth_conf->site_count)
+ if (!conf || !conf->site_count) {
return -1;
+ }
- rv = add_hmac(buf, len);
- if (rv < 0)
+ rv = add_hmac(conf, buf, len);
+ if (rv < 0) {
return rv;
+ }
rvs = 0;
- _FOREACH_NODE(i, site) {
+ FOREACH_NODE(conf, i, site) {
if (site != local) {
- rv = booth_udp_send(site, buf, len);
- if (!rvs)
+ rv = booth_udp_send(conf, site, buf, len);
+ if (!rvs) {
rvs = rv;
+ }
}
}
return rvs;
}
static int booth_udp_exit(void)
{
return 0;
}
/* SCTP transport layer has not been developed yet */
static int booth_sctp_init(void *f __attribute__((unused)))
{
return 0;
}
-static int booth_sctp_send(struct booth_site * to __attribute__((unused)),
+static int booth_sctp_send(struct booth_config *conf __attribute__((unused)),
+ struct booth_site *to __attribute__((unused)),
void *buf __attribute__((unused)),
int len __attribute__((unused)))
{
return 0;
}
static int booth_sctp_broadcast(void *buf __attribute__((unused)),
int len __attribute__((unused)))
{
return 0;
}
static int return_0_booth_site(struct booth_site *v __attribute((unused)))
{
return 0;
}
static int return_0(void)
{
return 0;
}
const struct booth_transport booth_transport[TRANSPORT_ENTRIES] = {
[TCP] = {
.name = "TCP",
.init = booth_tcp_init,
.open = booth_tcp_open,
.send = booth_tcp_send,
.recv = booth_tcp_recv,
.recv_auth = booth_tcp_recv_auth,
.close = booth_tcp_close,
.exit = booth_tcp_exit
},
[UDP] = {
.name = "UDP",
.init = booth_udp_init,
.open = return_0_booth_site,
.send = booth_udp_send,
.send_auth = booth_udp_send_auth,
.close = return_0_booth_site,
.broadcast_auth = booth_udp_broadcast_auth,
.exit = booth_udp_exit
},
[SCTP] = {
.name = "SCTP",
.init = booth_sctp_init,
.open = return_0_booth_site,
.send = booth_sctp_send,
.broadcast = booth_sctp_broadcast,
.exit = return_0,
}
};
-/* data + (datalen-sizeof(struct hmac)) points to struct hmac
- * i.e. struct hmac is always tacked on the payload
- */
-int add_hmac(void *data, int len)
-{
- int rv = 0;
-#if HAVE_LIBGNUTLS || HAVE_LIBGCRYPT || HAVE_LIBMHASH
- int payload_len;
- struct hmac *hp;
-
- if (!is_auth_req())
- return 0;
-
- payload_len = len - sizeof(struct hmac);
- hp = (struct hmac *)((unsigned char *)data + payload_len);
- hp->hid = htonl(BOOTH_HASH);
- memset(hp->hash, 0, BOOTH_MAC_SIZE);
- rv = calc_hmac(data, payload_len, BOOTH_HASH, hp->hash,
- booth_conf->authkey, booth_conf->authkey_len);
- if (rv < 0) {
- log_error("internal error: cannot calculate mac");
- }
-#endif
- return rv;
-}
-
#if HAVE_LIBGNUTLS || HAVE_LIBGCRYPT || HAVE_LIBMHASH
/* TODO: we need some client identification for logging */
#define peer_string(p) (p ? site_string(p) : "client")
/* verify the validity of timestamp from the header
* the timestamp needs to be either greater than the one already
* recorded for the site or, and this is checked for clients,
* not to be older than booth_conf->maxtimeskew
* update the timestamp for the site, if this packet is from a
* site
*/
static int verify_ts(struct booth_site *from, void *buf, int len)
{
struct boothc_header *h;
struct timeval tv, curr_tv, now;
if (len < sizeof(*h)) {
log_error("%s: packet too short", peer_string(from));
return -1;
}
h = (struct boothc_header *)buf;
tv.tv_sec = ntohl(h->secs);
tv.tv_usec = ntohl(h->usecs);
if (from) {
curr_tv.tv_sec = from->last_secs;
curr_tv.tv_usec = from->last_usecs;
if (timercmp(&tv, &curr_tv, >))
goto accept;
log_warn("%s: packet timestamp older than previous one",
site_string(from));
}
gettimeofday(&now, NULL);
now.tv_sec -= booth_conf->maxtimeskew;
if (timercmp(&tv, &now, >))
goto accept;
log_error("%s: packet timestamp older than %d seconds",
peer_string(from), booth_conf->maxtimeskew);
return -1;
accept:
if (from) {
from->last_secs = tv.tv_sec;
from->last_usecs = tv.tv_usec;
}
return 0;
}
#endif
int check_auth(struct booth_site *from, void *buf, int len)
{
int rv = 0;
#if HAVE_LIBGNUTLS || HAVE_LIBGCRYPT || HAVE_LIBMHASH
int payload_len;
struct hmac *hp;
if (!is_auth_req())
return 0;
payload_len = len - sizeof(struct hmac);
if (payload_len < 0) {
log_error("%s: failed to authenticate, packet too short (size:%d)",
peer_string(from), len);
return -1;
}
hp = (struct hmac *)((unsigned char *)buf + payload_len);
rv = verify_hmac(buf, payload_len, ntohl(hp->hid), hp->hash,
booth_conf->authkey, booth_conf->authkey_len);
if (!rv) {
rv = verify_ts(from, buf, len);
}
if (rv != 0) {
log_error("%s: failed to authenticate", peer_string(from));
}
#endif
return rv;
}
-int send_data(int fd, void *data, int datalen)
+int send_data(struct booth_config *conf, int fd, void *data, int datalen)
{
int rv = 0;
- rv = add_hmac(data, datalen);
- if (!rv)
+ rv = add_hmac(conf, data, datalen);
+ if (!rv) {
rv = do_write(fd, data, datalen);
+ }
return rv;
}
-int send_header_plus(int fd, struct boothc_hdr_msg *msg, void *data, int len)
+int send_header_plus(struct booth_config *conf, int fd,
+ struct boothc_hdr_msg *msg, void *data, int len)
{
int rv;
- rv = send_data(fd, msg, sendmsglen(msg)-len);
+ rv = send_data(conf, fd, msg, sendmsglen(msg)-len);
- if (rv >= 0 && len)
+ if (rv >= 0 && len) {
rv = do_write(fd, data, len);
+ }
return rv;
}
/* UDP message receiver (see also deliver_fn declaration's comment) */
int message_recv(struct booth_config *conf, void *msg, int msglen)
{
uint32_t from;
struct boothc_header *header;
struct booth_site *source;
header = (struct boothc_header *)msg;
from = ntohl(header->from);
if (!find_site_by_id(conf, from, &source)) {
/* caller knows the actual source address, pass
the (assuredly) positive number and let it report */
from = from ? from : ~from; /* avoid 0 (success) */
return from & (~0U >> 1); /* avoid negative (error code} */
}
time(&source->last_recv);
source->recv_cnt++;
if (check_boothc_header(header, msglen) < 0) {
log_error("message from %s receive error", site_string(source));
source->recv_err_cnt++;
return -1;
}
if (check_auth(source, msg, msglen)) {
log_error("%s failed to authenticate", site_string(source));
source->sec_cnt++;
return -1;
}
if (ntohl(header->opts) & BOOTH_OPT_ATTR) {
/* not used, clients send/retrieve attributes directly
* from sites
*/
return attr_recv(conf, msg, source);
} else {
return ticket_recv(conf, msg, source);
}
}
diff --git a/src/transport.h b/src/transport.h
index 4f4d497..2784ce4 100644
--- a/src/transport.h
+++ b/src/transport.h
@@ -1,112 +1,146 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _TRANSPORT_H
#define _TRANSPORT_H
#include "b_config.h"
#include "booth.h"
typedef enum {
TCP = 1,
UDP,
SCTP,
TRANSPORT_ENTRIES,
} transport_layer_t;
typedef enum {
ARBITRATOR = 0x50,
SITE,
CLIENT,
DAEMON,
STATUS,
GEOSTORE,
} action_t;
/* when allocating space for messages
*/
#define MAX_MSG_LEN 1024
struct booth_transport {
const char *name;
int (*init) (void *);
int (*open) (struct booth_site *);
- int (*send) (struct booth_site *, void *, int);
- int (*send_auth) (struct booth_site *, void *, int);
+ int (*send) (struct booth_config *, struct booth_site *, void *, int);
+ int (*send_auth) (struct booth_config *, struct booth_site *, void *, int);
int (*recv) (struct booth_site *, void *, int);
int (*recv_auth) (struct booth_site *, void *, int);
int (*broadcast) (void *, int);
- int (*broadcast_auth) (void *, int);
+ int (*broadcast_auth) (struct booth_config *, void *, int);
int (*close) (struct booth_site *);
int (*exit) (void);
};
extern const struct booth_transport booth_transport[TRANSPORT_ENTRIES];
/**
* @internal
* Attempts to pick identity of self from config-tracked enumeration of sites
*
* @param[in,out] conf config object to refer to
* @param[out] mep when self-discovery successful, site pointer is stored here
* @param[in] fuzzy_allowed whether it's OK to approximate the match
*
* @return 0 on success or negative value (-1 or -errno) on error
*/
int find_myself(struct booth_config *conf, struct booth_site **me,
int fuzzy_allowed);
int read_client(struct client *req_cl);
int check_boothc_header(struct boothc_header *data, int len_incl_data);
int setup_tcp_listener(int test_only);
-int booth_udp_send(struct booth_site *to, void *buf, int len);
-int booth_udp_send_auth(struct booth_site *to, void *buf, int len);
-int booth_tcp_open(struct booth_site *to);
-int booth_tcp_send(struct booth_site *to, void *buf, int len);
+/**
+ * @internal
+ * Send data, with authentication added
+ *
+ * @param[in,out] conf config object to refer to
+ * @param[in] to site structure of the recipient
+ * @param[in] buf message itself
+ * @param[in] len length of #buf
+ *
+ * @return see @add_hmac and @booth_udp_send
+ */
+int booth_udp_send_auth(struct booth_config *conf, struct booth_site *to,
+ void *buf, int len);
/**
* @internal
* First stage of incoming datagram handling (authentication)
*
* @param[in,out] conf config object to refer to
* @param[in] msg raw message to act upon
* @param[in] msglen length of #msg
*
* @return 0 on success or negative value (-1 or -errno) on error
*/
int message_recv(struct booth_config *conf, void *msg, int msglen);
inline static void * node_to_addr_pointer(struct booth_site *node) {
switch (node->family) {
case AF_INET: return &node->sa4.sin_addr;
case AF_INET6: return &node->sa6.sin6_addr;
}
return NULL;
}
-int send_data(int fd, void *data, int datalen);
-int send_header_plus(int fd, struct boothc_hdr_msg *hdr, void *data, int len);
-#define send_client_msg(fd, msg) send_data(fd, msg, sendmsglen(msg))
+/**
+ * @internal
+ * Send data, with authentication added
+ *
+ * @param[in,out] conf config object to refer to
+ * @param[in] fd descriptor of the socket to respond to
+ * @param[in] data message itself
+ * @param[in] datalen length of #data
+ *
+ * @return 0 on success or negative value (-1 or -errno) on error
+ */
+int send_data(struct booth_config *conf, int fd, void *data, int datalen);
+
+/**
+ * @internal
+ * First stage of incoming datagram handling (authentication)
+ *
+ * @param[in,out] conf config object to refer to
+ * @param[in] fd descriptor of the socket to respond to
+ * @param[in] hdr message header
+ * @param[in] data message itself
+ * @param[in] len length of @data
+ *
+ * @return see #send_data and #do_write
+ */
+int send_header_plus(struct booth_config *conf, int fd,
+ struct boothc_hdr_msg *hdr, void *data, int len);
+
+#define send_client_msg(conf, fd, msg) send_data(conf, fd, msg, sendmsglen(msg))
-int add_hmac(void *data, int len);
int check_auth(struct booth_site *from, void *buf, int len);
#endif /* _TRANSPORT_H */

File Metadata

Mime Type
text/x-diff
Expires
Thu, Jun 26, 6:06 PM (1 d, 3 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1959334
Default Alt Text
(181 KB)

Event Timeline