Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/src/request.c b/src/request.c
index 2354490..9c9ee8e 100644
--- a/src/request.c
+++ b/src/request.c
@@ -1,84 +1,85 @@
/*
* Copyright (C) 2015 Dejan Muhamedagic <dejan@hello-penguin.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include <errno.h>
#include <stdio.h>
#include <assert.h>
#include <glib.h>
#include "booth.h"
#include "ticket.h"
#include "request.h"
#include "log.h"
static GList *req_l = NULL;
static int req_id_cnt;
/* add request to the queue; it is up to the caller to manage
* memory for the three parameters
*/
void *add_req(
struct ticket_config *tk,
struct client *req_client,
struct boothc_ticket_msg *msg)
{
struct request *rp;
rp = g_new(struct request, 1);
if (!rp)
return NULL;
rp->id = req_id_cnt++;
rp->tk = tk;
rp->client_fd = req_client->fd;
rp->msg = msg;
req_l = g_list_append(req_l, rp);
return rp;
}
+/* XXX UNUSED */
int get_req_id(const void *rp)
{
if (!rp)
return -1;
return ((struct request *)rp)->id;
}
static void del_req(GList *lp)
{
if (!lp)
return;
req_l = g_list_delete_link(req_l, lp);
}
void foreach_tkt_req(struct booth_config *conf_ptr, struct ticket_config *tk,
req_fp f)
{
GList *lp, *next;
struct request *rp;
lp = g_list_first(req_l);
while (lp) {
next = g_list_next(lp);
rp = (struct request *)lp->data;
if (rp->tk == tk &&
(f)(conf_ptr, rp->tk, rp->client_fd, rp->msg) == 0) {
log_debug("remove request for client %d", rp->client_fd);
del_req(lp); /* don't need this request anymore */
}
lp = next;
}
}
diff --git a/src/request.h b/src/request.h
index aee8985..bfddec2 100644
--- a/src/request.h
+++ b/src/request.h
@@ -1,67 +1,69 @@
/*
* Copyright (C) 2015 Dejan Muhamedagic <dejan@hello-penguin.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _REQUEST_H
#define _REQUEST_H
#include "booth.h"
#include "config.h"
/* Requests are coming from clients and get queued in a
* round-robin queue (fixed size)
*
* This is one way to make the server more responsive and less
* dependent on misbehaving clients. The requests are queued and
* later served from the server loop.
*/
struct request {
/** Request ID */
int id;
/** The ticket. */
struct ticket_config *tk;
/** The client which sent the request */
int client_fd;
/** The message containing the request */
void *msg;
};
typedef int (*req_fp)(struct booth_config *conf_ptr, struct ticket_config *,
int, struct boothc_ticket_msg *);
void *add_req(struct ticket_config *tk, struct client *req_client,
struct boothc_ticket_msg *msg);
/**
* @internal
* Handle all pendign requests for given ticket using function @p f
*
* @param[inout] conf_ptr config object to refer to
* @param[in] tk ticket at hand
* @param[in] f handling function
*
* @return 1 on success, 0 when not done with the message, yet
*/
void foreach_tkt_req(struct booth_config *conf_ptr, struct ticket_config *tk,
req_fp f);
+
+/* XXX UNUSED */
int get_req_id(const void *rp);
#endif /* _REQUEST_H */
diff --git a/src/ticket.c b/src/ticket.c
index cfc2826..ed2e808 100644
--- a/src/ticket.c
+++ b/src/ticket.c
@@ -1,1517 +1,1518 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <inttypes.h>
#include <stdio.h>
#include <assert.h>
#include <time.h>
#include "b_config.h"
#ifndef RANGE2RANDOM_GLIB
#include <clplumbing/cl_random.h>
#else
#include "alt/range2random_glib.h"
#endif
#include "ticket.h"
#include "config.h"
#include "pacemaker.h"
#include "inline-fn.h"
#include "log.h"
#include "booth.h"
#include "raft.h"
#include "handler.h"
#include "request.h"
#include "manual.h"
#define TK_LINE 256
extern int TIME_RES;
/* Untrusted input, must fit (incl. \0) in a buffer of max chars. */
int check_max_len_valid(const char *s, int max)
{
int i;
for(i=0; i<max; i++)
if (s[i] == 0)
return 1;
return 0;
}
int find_ticket_by_name(struct booth_config *conf_ptr,
const char *ticket, struct ticket_config **found)
{
struct ticket_config *tk;
int i;
assert(conf_ptr != NULL);
if (found)
*found = NULL;
FOREACH_TICKET(conf_ptr, i, tk) {
if (!strncmp(tk->name, ticket, sizeof(tk->name))) {
if (found)
*found = tk;
return 1;
}
}
return 0;
}
int check_ticket(struct booth_config *conf_ptr, const char *ticket,
struct ticket_config **found)
{
if (found)
*found = NULL;
if (conf_ptr == NULL)
return 0;
if (!check_max_len_valid(ticket, sizeof(conf_ptr->ticket[0].name)))
return 0;
return find_ticket_by_name(conf_ptr, ticket, found);
}
/* XXX UNUSED */
int check_site(struct booth_config *conf_ptr, const char *site,
int *is_local)
{
struct booth_site *node;
if (!check_max_len_valid(site, sizeof(node->addr_string)))
return 0;
if (find_site_by_name(conf_ptr, site, &node, 0)) {
*is_local = node->local;
return 1;
}
return 0;
}
/* is it safe to commit the grant?
* if we didn't hear from all sites on the initial grant, we may
* need to delay the commit
*
* TODO: investigate possibility to devise from history whether a
* missing site could be holding a ticket or not
*/
static int ticket_dangerous(struct booth_config *conf_ptr,
struct ticket_config *tk)
{
int tdiff;
/* we may be invoked often, don't spam the log unnecessarily
*/
static int no_log_delay_msg;
assert(conf_ptr != NULL);
assert(conf_ptr->local != NULL);
if (!is_time_set(&tk->delay_commit))
return 0;
if (is_past(&tk->delay_commit) || all_sites_replied(conf_ptr, tk)) {
if (tk->leader == conf_ptr->local) {
tk_log_info("%s, committing to CIB",
is_past(&tk->delay_commit) ?
"ticket delay expired" : "all sites replied");
}
time_reset(&tk->delay_commit);
no_log_delay_msg = 0;
return 0;
}
tdiff = time_left(&tk->delay_commit);
tk_log_debug("delay ticket commit for another " intfmt(tdiff));
if (!no_log_delay_msg) {
tk_log_info("delaying ticket commit to CIB for " intfmt(tdiff));
tk_log_info("(or all sites are reached)");
no_log_delay_msg = 1;
}
return 1;
}
int ticket_write(struct booth_config *conf_ptr, struct ticket_config *tk)
{
assert(conf_ptr != NULL);
assert(conf_ptr->local != NULL);
if (conf_ptr->local->type != SITE)
return -EINVAL;
if (ticket_dangerous(conf_ptr, tk))
return 1;
if (tk->leader == conf_ptr->local) {
if (tk->state != ST_LEADER) {
tk_log_info("ticket state not yet consistent, "
"delaying ticket grant to CIB");
return 1;
}
conf_ptr->ticket_handler->grant_ticket(tk);
} else {
conf_ptr->ticket_handler->revoke_ticket(tk);
}
tk->update_cib = 0;
return 0;
}
void save_committed_tkt(struct ticket_config *tk)
{
if (!tk->last_valid_tk) {
tk->last_valid_tk = malloc(sizeof(struct ticket_config));
if (!tk->last_valid_tk) {
log_error("out of memory");
return;
}
}
memcpy(tk->last_valid_tk, tk, sizeof(struct ticket_config));
}
static void ext_prog_failed(struct booth_config *conf_ptr,
struct ticket_config *tk, int start_election)
{
assert(conf_ptr != NULL);
assert(conf_ptr->local != NULL);
if (!is_manual(tk)) {
/* Give it to somebody else.
* Just send a VOTE_FOR message, so the
* others can start elections. */
if (leader_and_valid(tk, conf_ptr->local)) {
save_committed_tkt(tk);
reset_ticket(tk);
ticket_write(conf_ptr, tk);
if (start_election) {
ticket_broadcast(conf_ptr, tk, OP_VOTE_FOR,
OP_REQ_VOTE, RLT_SUCCESS,
OR_LOCAL_FAIL);
}
}
} else {
/* There is not much we can do now because
* the manual ticket cannot be relocated.
* Just warn the user. */
if (tk->leader == conf_ptr->local) {
save_committed_tkt(tk);
reset_ticket(tk);
ticket_write(conf_ptr, tk);
log_error("external test failed on the specified machine, cannot acquire a manual ticket");
}
}
}
#define attr_found(geo_ap, ap) \
((geo_ap) && !strcmp((geo_ap)->val, (ap)->attr_val))
int check_attr_prereq(struct ticket_config *tk, grant_type_e grant_type)
{
GList *el;
struct attr_prereq *ap;
struct geo_attr *geo_ap;
for (el = g_list_first(tk->attr_prereqs); el; el = g_list_next(el))
{
ap = (struct attr_prereq *)el->data;
if (ap->grant_type != grant_type)
continue;
geo_ap = (struct geo_attr *)g_hash_table_lookup(tk->attr, ap->attr_name);
switch(ap->op) {
case ATTR_OP_EQ:
if (!attr_found(geo_ap, ap))
goto fail;
break;
case ATTR_OP_NE:
if (attr_found(geo_ap, ap))
goto fail;
break;
default:
break;
}
}
return 0;
fail:
tk_log_warn("'%s' attr-prereq failed", ap->attr_name);
return 1;
}
/* do we need to run the external program?
* or we already done that and waiting for the outcome
* or program exited and we can collect the status
* return codes
* 0: no program defined
* RUNCMD_MORE: program forked, results later
* != 0: executing program failed (or some other failure)
*/
static int do_ext_prog(struct booth_config *conf_ptr,
struct ticket_config *tk, int start_election)
{
int rv = 0;
if (!tk_test.path)
return 0;
switch(tk_test.progstate) {
case EXTPROG_IDLE:
rv = run_handler(conf_ptr, tk);
if (rv == RUNCMD_ERR) {
tk_log_warn("couldn't run external test, not allowed to acquire ticket");
ext_prog_failed(conf_ptr, tk, start_election);
}
break;
case EXTPROG_RUNNING:
/* should never get here, but just in case */
rv = RUNCMD_MORE;
break;
case EXTPROG_EXITED:
rv = tk_test_exit_status(tk);
if (rv) {
ext_prog_failed(conf_ptr, tk, start_election);
}
break;
case EXTPROG_IGNORE:
/* nothing to do here */
break;
}
return rv;
}
/* Try to acquire a ticket
* Could be manual grant or after start (if the ticket is granted
* and still valid in the CIB)
* If the external program needs to run, this is run twice, once
* to start the program, and then to get the result and start
* elections.
*/
static int acquire_ticket(struct booth_config *conf_ptr,
struct ticket_config *tk, cmd_reason_t reason)
{
int rv;
assert(conf_ptr != NULL);
assert(conf_ptr->local != NULL);
if (reason == OR_ADMIN && check_attr_prereq(tk, GRANT_MANUAL))
return RLT_ATTR_PREREQ;
switch(do_ext_prog(conf_ptr, tk, 0)) {
case 0:
/* everything fine */
break;
case RUNCMD_MORE:
/* need to wait for the outcome before starting elections */
return 0;
default:
return RLT_EXT_FAILED;
}
if (is_manual(tk)) {
rv = manual_selection(conf_ptr, tk, conf_ptr->local, 1, reason);
} else {
rv = new_election(conf_ptr, tk, conf_ptr->local, 1, reason);
}
return rv ? RLT_SYNC_FAIL : 0;
}
/** Try to get the ticket for the local site.
* */
static int do_grant_ticket(struct booth_config *conf_ptr,
struct ticket_config *tk, int options)
{
int rv;
assert(conf_ptr != NULL);
assert(conf_ptr->local != NULL);
tk_log_info("granting ticket");
if (tk->leader == conf_ptr->local)
return RLT_SUCCESS;
if (is_owned(tk)) {
if (is_manual(tk) && (options & OPT_IMMEDIATE)) {
/* -F flag has been used while granting a manual ticket.
* The ticket will be granted and may end up being granted
* on multiple sites */
tk_log_warn("manual ticket forced to be granted! be aware that "
"you may end up having two sites holding the same manual "
"ticket! revoke the ticket from the unnecessary site!");
} else {
return RLT_OVERGRANT;
}
}
set_future_time(&tk->delay_commit, tk->term_duration + tk->acquire_after);
if (options & OPT_IMMEDIATE) {
tk_log_warn("granting ticket immediately! If there are "
"unreachable sites, _hope_ you are sure that they don't "
"have the ticket!");
time_reset(&tk->delay_commit);
}
rv = acquire_ticket(conf_ptr, tk, OR_ADMIN);
if (rv) {
time_reset(&tk->delay_commit);
return rv;
} else {
return RLT_MORE;
}
}
static void start_revoke_ticket(struct booth_config *conf_ptr,
struct ticket_config *tk)
{
tk_log_info("revoking ticket");
save_committed_tkt(tk);
reset_ticket_and_set_no_leader(tk);
ticket_write(conf_ptr, tk);
ticket_broadcast(conf_ptr, tk, OP_REVOKE, OP_ACK, RLT_SUCCESS, OR_ADMIN);
}
/** Ticket revoke.
* Only to be started from the leader. */
static int do_revoke_ticket(struct booth_config *conf_ptr,
struct ticket_config *tk)
{
if (tk->acks_expected) {
tk_log_info("delay ticket revoke until the current operation finishes");
set_next_state(tk, ST_INIT);
return RLT_MORE;
} else {
start_revoke_ticket(conf_ptr, tk);
return RLT_SUCCESS;
}
}
static int number_sites_marked_as_granted(struct booth_config *conf_ptr,
struct ticket_config *tk)
{
struct booth_site *ignored __attribute__((unused));
int i, result = 0;
assert(conf_ptr != NULL);
FOREACH_NODE(conf_ptr, i, ignored) {
result += tk->sites_where_granted[i];
}
return result;
}
static int list_ticket(struct booth_config *conf_ptr, char **pdata,
unsigned int *len)
{
struct ticket_config *tk;
struct booth_site *site;
char timeout_str[64];
char pending_str[64];
char *data, *cp;
int i, alloc, site_index;
time_t ts;
int multiple_grant_warning_length = 0;
assert(conf_ptr != NULL);
assert(conf_ptr->local != NULL);
*pdata = NULL;
*len = 0;
alloc = conf_ptr->ticket_count * (BOOTH_NAME_LEN * 2 + 128 + 16);
FOREACH_TICKET(conf_ptr, i, tk) {
multiple_grant_warning_length = \
number_sites_marked_as_granted(conf_ptr, tk);
if (multiple_grant_warning_length > 1) {
// 164: 55 + 45 + 2*number_of_multiple_sites + some margin
alloc += 164 + BOOTH_NAME_LEN * (1+multiple_grant_warning_length);
}
}
data = malloc(alloc);
if (!data)
return -ENOMEM;
cp = data;
FOREACH_TICKET(conf_ptr, i, tk) {
if ((!is_manual(tk)) && is_time_set(&tk->term_expires)) {
/* Manual tickets doesn't have term_expires defined */
ts = wall_ts(&tk->term_expires);
strftime(timeout_str, sizeof(timeout_str), "%F %T",
localtime(&ts));
} else
strcpy(timeout_str, "INF");
if (tk->leader == conf_ptr->local
&& is_time_set(&tk->delay_commit)
&& !is_past(&tk->delay_commit)) {
ts = wall_ts(&tk->delay_commit);
strcpy(pending_str, " (commit pending until ");
strftime(pending_str + strlen(" (commit pending until "),
sizeof(pending_str) - strlen(" (commit pending until ") - 1,
"%F %T", localtime(&ts));
strcat(pending_str, ")");
} else
*pending_str = '\0';
cp += snprintf(cp,
alloc - (cp - data),
"ticket: %s, leader: %s",
tk->name,
ticket_leader_string(tk));
if (is_owned(tk)) {
cp += snprintf(cp,
alloc - (cp - data),
", expires: %s%s",
timeout_str,
pending_str);
}
if (is_manual(tk)) {
cp += snprintf(cp,
alloc - (cp - data),
" [manual mode]");
}
cp += snprintf(cp, alloc - (cp - data), "\n");
if (alloc - (cp - data) <= 0) {
free(data);
return -ENOMEM;
}
}
FOREACH_TICKET(conf_ptr, i, tk) {
multiple_grant_warning_length = \
number_sites_marked_as_granted(conf_ptr, tk);
if (multiple_grant_warning_length > 1) {
cp += snprintf(cp,
alloc - (cp - data),
"\nWARNING: The ticket %s is granted to multiple sites: ", // ~55 characters
tk->name);
FOREACH_NODE(conf_ptr, site_index, site) {
if (tk->sites_where_granted[site_index] > 0) {
cp += snprintf(cp,
alloc - (cp - data),
"%s",
site_string(site));
if (--multiple_grant_warning_length > 0) {
cp += snprintf(cp,
alloc - (cp - data),
", ");
}
}
}
cp += snprintf(cp,
alloc - (cp - data),
". Revoke the ticket from the faulty sites.\n"); // ~45 characters
}
}
*pdata = data;
*len = cp - data;
return 0;
}
void disown_ticket(struct ticket_config *tk)
{
set_leader(tk, NULL);
tk->is_granted = 0;
get_time(&tk->term_expires);
}
+/* XXX UNUSED */
int disown_if_expired(struct ticket_config *tk)
{
if (is_past(&tk->term_expires) ||
!tk->leader) {
disown_ticket(tk);
return 1;
}
return 0;
}
void reset_ticket(struct ticket_config *tk)
{
ignore_ext_test(tk);
disown_ticket(tk);
no_resends(tk);
set_state(tk, ST_INIT);
set_next_state(tk, 0);
tk->voted_for = NULL;
}
void reset_ticket_and_set_no_leader(struct ticket_config *tk)
{
mark_ticket_as_revoked_from_leader(tk);
reset_ticket(tk);
tk->leader = no_leader;
tk_log_debug("ticket leader set to no_leader");
}
static void log_reacquire_reason(struct booth_config *conf_ptr,
struct ticket_config *tk)
{
int valid;
const char *where_granted = "\0";
char buff[75];
assert(conf_ptr != NULL);
assert(conf_ptr->local != NULL);
valid = is_time_set(&tk->term_expires) && !is_past(&tk->term_expires);
if (tk->leader == conf_ptr->local) {
where_granted = "granted here";
} else {
snprintf(buff, sizeof(buff), "granted to %s",
site_string(tk->leader));
where_granted = buff;
}
if (!valid) {
tk_log_warn("%s, but not valid "
"anymore (will try to reacquire)", where_granted);
}
if (tk->is_granted && tk->leader != conf_ptr->local) {
if (tk->leader && tk->leader != no_leader) {
tk_log_error("granted here, but also %s, "
"that's really too bad (will try to reacquire)",
where_granted);
} else {
tk_log_warn("granted here, but we're "
"not recorded as the grantee (will try to reacquire)");
}
}
}
void update_ticket_state(struct booth_config *conf_ptr,
struct ticket_config *tk, struct booth_site *sender)
{
assert(conf_ptr != NULL);
assert(conf_ptr->local != NULL);
if (tk->state == ST_CANDIDATE) {
tk_log_info("learned from %s about "
"newer ticket, stopping elections",
site_string(sender));
/* there could be rejects coming from others; don't log
* warnings unnecessarily */
tk->expect_more_rejects = 1;
}
if (tk->leader == conf_ptr->local || tk->is_granted) {
/* message from a live leader with valid ticket? */
if (sender == tk->leader && term_time_left(tk)) {
if (tk->is_granted) {
tk_log_warn("ticket was granted here, "
"but it's live at %s (revoking here)",
site_string(sender));
} else {
tk_log_info("ticket live at %s",
site_string(sender));
}
disown_ticket(tk);
ticket_write(conf_ptr, tk);
set_state(tk, ST_FOLLOWER);
set_next_state(tk, ST_FOLLOWER);
} else {
if (tk->state == ST_CANDIDATE) {
set_state(tk, ST_FOLLOWER);
}
set_next_state(tk, ST_LEADER);
}
} else {
if (!tk->leader || tk->leader == no_leader) {
if (sender)
tk_log_info("ticket is not granted");
else
tk_log_info("ticket is not granted (from CIB)");
set_state(tk, ST_INIT);
} else {
if (sender)
tk_log_info("ticket granted to %s (says %s)",
site_string(tk->leader),
tk->leader == sender ? "they" : site_string(sender));
else
tk_log_info("ticket granted to %s (from CIB)",
site_string(tk->leader));
set_state(tk, ST_FOLLOWER);
/* just make sure that we check the ticket soon */
set_next_state(tk, ST_FOLLOWER);
}
}
}
int setup_ticket(struct booth_config *conf_ptr)
{
struct ticket_config *tk;
int i;
assert(conf_ptr != NULL);
assert(conf_ptr->local != NULL);
FOREACH_TICKET(conf_ptr, i, tk) {
reset_ticket(tk);
if (conf_ptr->local->type == SITE) {
if (!conf_ptr->ticket_handler->load_ticket(conf_ptr, tk)) {
update_ticket_state(conf_ptr, tk, NULL);
}
tk->update_cib = 1;
}
tk_log_info("broadcasting state query");
/* wait until all send their status (or the first
* timeout) */
tk->start_postpone = 1;
ticket_broadcast(conf_ptr, tk, OP_STATUS, OP_MY_INDEX,
RLT_SUCCESS, 0);
}
return 0;
}
int ticket_answer_list(struct booth_config *conf_ptr, int fd)
{
char *data;
int rv;
unsigned int olen;
struct boothc_hdr_msg hdr;
rv = list_ticket(conf_ptr, &data, &olen);
if (rv < 0)
goto out;
init_header(conf_ptr, &hdr.header, CL_LIST, 0, 0, RLT_SUCCESS, 0,
sizeof(hdr) + olen);
rv = send_header_plus(conf_ptr, fd, &hdr, data, olen);
out:
if (data)
free(data);
return rv;
}
int process_client_request(struct booth_config *conf_ptr,
struct client *req_client, void *buf)
{
int rv, rc = 1;
struct ticket_config *tk;
int cmd;
struct boothc_ticket_msg omsg;
struct boothc_ticket_msg *msg;
assert(conf_ptr != NULL);
assert(conf_ptr->local != NULL);
msg = (struct boothc_ticket_msg *)buf;
cmd = ntohl(msg->header.cmd);
if (!check_ticket(conf_ptr, msg->ticket.id, &tk)) {
log_warn("client referenced unknown ticket %s",
msg->ticket.id);
rv = RLT_INVALID_ARG;
goto reply_now;
}
/* Perform the initial check before granting
* an already granted non-manual ticket */
if ((!is_manual(tk) && (cmd == CMD_GRANT) && is_owned(tk))) {
log_warn("client wants to grant an (already granted!) ticket %s",
msg->ticket.id);
rv = RLT_OVERGRANT;
goto reply_now;
}
if ((cmd == CMD_REVOKE) && !is_owned(tk)) {
log_info("client wants to revoke a free ticket %s",
msg->ticket.id);
rv = RLT_TICKET_IDLE;
goto reply_now;
}
if ((cmd == CMD_REVOKE) && tk->leader != conf_ptr->local) {
tk_log_info("not granted here, redirect to %s",
ticket_leader_string(tk));
rv = RLT_REDIRECT;
goto reply_now;
}
if (cmd == CMD_REVOKE)
rv = do_revoke_ticket(conf_ptr, tk);
else
rv = do_grant_ticket(conf_ptr, tk, ntohl(msg->header.options));
if (rv == RLT_MORE) {
/* client may receive further notifications, save the
* request for further processing */
add_req(tk, req_client, msg);
tk_log_debug("queue request %s for client %d",
state_to_string(cmd), req_client->fd);
rc = 0; /* we're not yet done with the message */
}
reply_now:
init_ticket_msg(conf_ptr, &omsg, CL_RESULT, 0, rv, 0, tk);
send_client_msg(conf_ptr, req_client->fd, &omsg);
return rc;
}
int notify_client(struct booth_config *conf_ptr, struct ticket_config *tk,
int client_fd, struct boothc_ticket_msg *msg)
{
struct boothc_ticket_msg omsg;
void (*deadfn) (int ci);
int rv, rc, ci;
int cmd, options;
struct client *req_client;
cmd = ntohl(msg->header.cmd);
options = ntohl(msg->header.options);
rv = tk->outcome;
ci = find_client_by_fd(client_fd);
if (ci < 0) {
tk_log_info("client %d (request %s) left before being notified",
client_fd, state_to_string(cmd));
return 0;
}
tk_log_debug("notifying client %d (request %s)",
client_fd, state_to_string(cmd));
init_ticket_msg(conf_ptr, &omsg, CL_RESULT, 0, rv, 0, tk);
rc = send_client_msg(conf_ptr, client_fd, &omsg);
if (rc == 0 && ((rv == RLT_MORE) ||
(rv == RLT_CIB_PENDING && (options & OPT_WAIT_COMMIT)))) {
/* more to do here, keep the request */
return 1;
} else {
/* we sent a definite answer or there was a write error, drop
* the client */
if (rc) {
tk_log_debug("failed to notify client %d (request %s)",
client_fd, state_to_string(cmd));
} else {
tk_log_debug("client %d (request %s) got final notification",
client_fd, state_to_string(cmd));
}
req_client = clients + ci;
deadfn = req_client->deadfn;
if(deadfn) {
deadfn(ci);
}
return 0; /* we're done with this request */
}
}
int ticket_broadcast(struct booth_config *conf_ptr,
struct ticket_config *tk, cmd_request_t cmd,
cmd_request_t expected_reply, cmd_result_t res,
cmd_reason_t reason)
{
struct boothc_ticket_msg msg;
assert(conf_ptr != NULL);
init_ticket_msg(conf_ptr, &msg, cmd, 0, res, reason, tk);
tk_log_debug("broadcasting '%s' (term=%d, valid=%d)",
state_to_string(cmd),
ntohl(msg.ticket.term),
msg_term_time(&msg));
tk->last_request = cmd;
if (expected_reply) {
expect_replies(tk, expected_reply, conf_ptr->local);
}
ticket_activate_timeout(tk);
return transport(conf_ptr)->broadcast_auth(conf_ptr, &msg,
sendmsglen(&msg));
}
/* update the ticket on the leader, write it to the CIB, and
send out the update message to others with the new expiry
time
*/
int leader_update_ticket(struct booth_config *conf_ptr,
struct ticket_config *tk)
{
int rv = 0, rv2;
timetype now;
if (tk->ticket_updated >= 2)
return 0;
/* for manual tickets, we don't set time expiration */
if (!is_manual(tk)) {
if (tk->ticket_updated < 1) {
tk->ticket_updated = 1;
get_time(&now);
copy_time(&now, &tk->last_renewal);
set_future_time(&tk->term_expires, tk->term_duration);
rv = ticket_broadcast(conf_ptr, tk, OP_UPDATE, OP_ACK,
RLT_SUCCESS, 0);
}
}
if (tk->ticket_updated < 2) {
rv2 = ticket_write(conf_ptr, tk);
switch(rv2) {
case 0:
tk->ticket_updated = 2;
tk->outcome = RLT_SUCCESS;
foreach_tkt_req(conf_ptr, tk, notify_client);
break;
case 1:
if (tk->outcome != RLT_CIB_PENDING) {
tk->outcome = RLT_CIB_PENDING;
foreach_tkt_req(conf_ptr, tk, notify_client);
}
break;
default:
break;
}
}
return rv;
}
static void log_lost_servers(struct booth_config *conf_ptr,
struct ticket_config *tk)
{
struct booth_site *n;
int i;
assert(conf_ptr != NULL);
if (tk->retry_number > 1)
/* log those that we couldn't reach, but do
* that only on the first retry
*/
return;
FOREACH_NODE(conf_ptr, i, n) {
if (!(tk->acks_received & n->bitmask)) {
tk_log_warn("%s %s didn't acknowledge our %s, "
"will retry %d times",
(n->type == ARBITRATOR ? "arbitrator" : "site"),
site_string(n),
state_to_string(tk->last_request),
tk->retries);
}
}
}
static void resend_msg(struct booth_config *conf_ptr,
struct ticket_config *tk)
{
struct booth_site *n;
int i;
assert(conf_ptr != NULL);
assert(conf_ptr->local != NULL);
if (!(tk->acks_received ^ conf_ptr->local->bitmask)) {
ticket_broadcast(conf_ptr, tk, tk->last_request, 0,
RLT_SUCCESS, 0);
} else {
FOREACH_NODE(conf_ptr, i, n) {
if (!(tk->acks_received & n->bitmask)) {
n->resend_cnt++;
tk_log_debug("resending %s to %s",
state_to_string(tk->last_request),
site_string(n)
);
send_msg(conf_ptr, tk->last_request, tk, n,
NULL);
}
}
ticket_activate_timeout(tk);
}
}
static void handle_resends(struct booth_config *conf_ptr,
struct ticket_config *tk)
{
int ack_cnt;
if (++tk->retry_number > tk->retries) {
tk_log_info("giving up on sending retries");
no_resends(tk);
set_ticket_wakeup(conf_ptr, tk);
return;
}
/* try to reach some sites again if we just stepped down */
if (tk->last_request == OP_VOTE_FOR) {
tk_log_warn("no answers to our VtFr request to step down (try #%d), "
"we are alone",
tk->retry_number);
goto just_resend;
}
if (!majority_of_bits(conf_ptr, tk, tk->acks_received)) {
ack_cnt = count_bits(tk->acks_received) - 1;
if (!ack_cnt) {
tk_log_warn("no answers to our request (try #%d), "
"we are alone",
tk->retry_number);
} else {
tk_log_warn("not enough answers to our request (try #%d): "
"only got %d answers",
tk->retry_number,
ack_cnt);
}
} else {
log_lost_servers(conf_ptr, tk);
}
just_resend:
resend_msg(conf_ptr, tk);
}
-int postpone_ticket_processing(struct ticket_config *tk)
+static int postpone_ticket_processing(struct ticket_config *tk)
{
extern timetype start_time;
return tk->start_postpone &&
(-time_left(&start_time) < tk->timeout);
}
#define has_extprog_exited(tk) ((tk)->clu_test.progstate == EXTPROG_EXITED)
static void process_next_state(struct booth_config *conf_ptr,
struct ticket_config *tk)
{
int rv;
switch(tk->next_state) {
case ST_LEADER:
if (has_extprog_exited(tk)) {
if (tk->state != ST_LEADER) {
rv = acquire_ticket(conf_ptr, tk, OR_ADMIN);
if (rv != 0) { /* external program failed */
tk->outcome = rv;
foreach_tkt_req(conf_ptr, tk, notify_client);
}
}
} else {
log_reacquire_reason(conf_ptr, tk);
acquire_ticket(conf_ptr, tk, OR_REACQUIRE);
}
break;
case ST_INIT:
no_resends(tk);
start_revoke_ticket(conf_ptr, tk);
tk->outcome = RLT_SUCCESS;
foreach_tkt_req(conf_ptr, tk, notify_client);
break;
/* wanting to be follower is not much of an ambition; no
* processing, just return; don't reset start_postpone until
* we got some replies to status */
case ST_FOLLOWER:
return;
default:
break;
}
tk->start_postpone = 0;
}
static void ticket_lost(struct booth_config *conf_ptr,
struct ticket_config *tk)
{
int reason = OR_TKT_LOST;
assert(conf_ptr != NULL);
assert(conf_ptr->local != NULL);
if (tk->leader != conf_ptr->local) {
tk_log_warn("lost at %s", site_string(tk->leader));
} else {
if (is_ext_prog_running(tk)) {
ext_prog_timeout(tk);
reason = OR_LOCAL_FAIL;
} else {
tk_log_warn("lost majority (revoking locally)");
reason = tk->election_reason ? tk->election_reason : OR_REACQUIRE;
}
}
tk->lost_leader = tk->leader;
save_committed_tkt(tk);
mark_ticket_as_revoked_from_leader(tk);
reset_ticket(tk);
set_state(tk, ST_FOLLOWER);
if (conf_ptr->local->type == SITE) {
ticket_write(conf_ptr, tk);
schedule_election(conf_ptr, tk, reason);
}
}
static void next_action(struct booth_config *conf_ptr,
struct ticket_config *tk)
{
int rv;
switch(tk->state) {
case ST_INIT:
/* init state, handle resends for ticket revoke */
/* and rebroadcast if stepping down */
/* try to acquire ticket on grant */
if (has_extprog_exited(tk)) {
rv = acquire_ticket(conf_ptr, tk, OR_ADMIN);
if (rv != 0) { /* external program failed */
tk->outcome = rv;
foreach_tkt_req(conf_ptr, tk, notify_client);
}
} else {
if (tk->acks_expected) {
handle_resends(conf_ptr, tk);
}
}
break;
case ST_FOLLOWER:
if (!is_manual(tk)) {
/* leader/ticket lost? and we didn't vote yet */
tk_log_debug("leader: %s, voted_for: %s",
site_string(tk->leader),
site_string(tk->voted_for));
if (!tk->leader) {
if (!tk->voted_for || !tk->in_election) {
disown_ticket(tk);
if (!new_election(conf_ptr, tk, NULL,
1, OR_AGAIN)) {
ticket_activate_timeout(tk);
}
} else {
/* we should restart elections in case nothing
* happens in the meantime */
tk->in_election = 0;
ticket_activate_timeout(tk);
}
}
} else {
/* for manual tickets, also try to acquire ticket on grant
* in the Follower state (because we may end up having
* two Leaders) */
if (has_extprog_exited(tk)) {
rv = acquire_ticket(conf_ptr, tk, OR_ADMIN);
if (rv != 0) { /* external program failed */
tk->outcome = rv;
foreach_tkt_req(conf_ptr, tk,
notify_client);
}
} else {
/* Otherwise, just send ACKs if needed */
if (tk->acks_expected) {
handle_resends(conf_ptr, tk);
}
}
}
break;
case ST_CANDIDATE:
/* elections timed out? */
elections_end(conf_ptr, tk);
break;
case ST_LEADER:
/* timeout or ticket renewal? */
if (tk->acks_expected) {
handle_resends(conf_ptr, tk);
if (majority_of_bits(conf_ptr, tk, tk->acks_received)) {
leader_update_ticket(conf_ptr, tk);
}
} else {
/* this is ticket renewal, run local test */
if (!do_ext_prog(conf_ptr, tk, 1)) {
ticket_broadcast(conf_ptr, tk, OP_HEARTBEAT,
OP_ACK, RLT_SUCCESS, 0);
tk->ticket_updated = 0;
}
}
break;
default:
break;
}
}
static void ticket_cron(struct booth_config *conf_ptr,
struct ticket_config *tk)
{
/* don't process the tickets too early after start */
if (postpone_ticket_processing(tk)) {
tk_log_debug("ticket processing postponed (start_postpone=%d)",
tk->start_postpone);
/* but run again soon */
ticket_activate_timeout(tk);
return;
}
/* no need for status resends, we hope we got at least one
* my_index back */
if (tk->acks_expected == OP_MY_INDEX) {
no_resends(tk);
}
/* after startup, we need to decide what to do based on the
* current ticket state; tk->next_state has a hint
* also used for revokes which had to be delayed
*/
if (tk->next_state) {
process_next_state(conf_ptr, tk);
goto out;
}
/* Has an owner, has an expiry date, and expiry date in the past?
* For automatic tickets, losing the ticket must happen
* in _every_ state.
*/
if ((!is_manual(tk)) &&
is_owned(tk) && is_time_set(&tk->term_expires)
&& is_past(&tk->term_expires)) {
ticket_lost(conf_ptr, tk);
goto out;
}
next_action(conf_ptr, tk);
out:
tk->next_state = 0;
if (!tk->in_election && tk->update_cib)
ticket_write(conf_ptr, tk);
}
void process_tickets(struct booth_config *conf_ptr)
{
struct ticket_config *tk;
int i;
timetype last_cron;
assert(conf_ptr != NULL);
FOREACH_TICKET(conf_ptr, i, tk) {
if (!has_extprog_exited(tk) &&
is_time_set(&tk->next_cron) && !is_past(&tk->next_cron))
continue;
tk_log_debug("ticket cron");
copy_time(&tk->next_cron, &last_cron);
ticket_cron(conf_ptr, tk);
if (time_cmp(&last_cron, &tk->next_cron, ==)) {
tk_log_debug("nobody set ticket wakeup");
set_ticket_wakeup(conf_ptr, tk);
}
}
}
void tickets_log_info(struct booth_config *conf_ptr)
{
struct ticket_config *tk;
int i;
time_t ts;
assert(conf_ptr != NULL);
FOREACH_TICKET(conf_ptr, i, tk) {
ts = wall_ts(&tk->term_expires);
tk_log_info("state '%s' "
"term %d "
"leader %s "
"expires %-24.24s",
state_to_string(tk->state),
tk->current_term,
ticket_leader_string(tk),
ctime(&ts));
}
}
static void update_acks(struct booth_config *conf_ptr, struct ticket_config *tk,
struct booth_site *sender, struct booth_site *leader,
struct boothc_ticket_msg *msg)
{
uint32_t cmd;
uint32_t req;
cmd = ntohl(msg->header.cmd);
req = ntohl(msg->header.request);
if (req != tk->last_request ||
(tk->acks_expected != cmd &&
tk->acks_expected != OP_REJECTED))
return;
/* got an ack! */
tk->acks_received |= sender->bitmask;
if (all_replied(conf_ptr, tk) ||
/* we just stepped down, need only one site to start
* elections */
(cmd == OP_REQ_VOTE && tk->last_request == OP_VOTE_FOR)) {
no_resends(tk);
tk->start_postpone = 0;
set_ticket_wakeup(conf_ptr, tk);
}
}
/* read ticket message */
int ticket_recv(struct booth_config *conf_ptr, void *buf,
struct booth_site *source)
{
struct boothc_ticket_msg *msg;
struct ticket_config *tk;
struct booth_site *leader;
uint32_t leader_u;
msg = (struct boothc_ticket_msg *)buf;
if (!check_ticket(conf_ptr, msg->ticket.id, &tk)) {
log_warn("got invalid ticket name %s from %s",
msg->ticket.id, site_string(source));
source->invalid_cnt++;
return -EINVAL;
}
leader_u = ntohl(msg->ticket.leader);
if (!find_site_by_id(conf_ptr, leader_u, &leader)) {
tk_log_error("message with unknown leader %u received", leader_u);
source->invalid_cnt++;
return -EINVAL;
}
update_acks(conf_ptr, tk, source, leader, msg);
return raft_answer(conf_ptr, tk, source, leader, msg);
}
static void log_next_wakeup(struct ticket_config *tk)
{
int left;
left = time_left(&tk->next_cron);
tk_log_debug("set ticket wakeup in " intfmt(left));
}
/* New vote round; §5.2 */
/* delay the next election start for some random time
* (up to 1 second)
*/
void add_random_delay(struct ticket_config *tk)
{
timetype tv;
interval_add(&tk->next_cron, rand_time(min(1000, tk->timeout)), &tv);
ticket_next_cron_at(tk, &tv);
if (ANYDEBUG) {
log_next_wakeup(tk);
}
}
void set_ticket_wakeup(struct booth_config *conf_ptr, struct ticket_config *tk)
{
timetype near_future, tv, next_vote;
set_future_time(&near_future, 10);
if (!is_manual(tk)) {
/* At least every hour, perhaps sooner (default) */
tk_log_debug("ticket will be woken up after up to one hour");
ticket_next_cron_in(tk, 3600*TIME_RES);
switch (tk->state) {
case ST_LEADER:
assert(tk->leader == conf_ptr->local);
get_next_election_time(tk, &next_vote, conf_ptr->local);
/* If timestamp is in the past, wakeup in
* near future */
if (!is_time_set(&next_vote)) {
tk_log_debug("next ts unset, wakeup soon");
ticket_next_cron_at(tk, &near_future);
} else if (is_past(&next_vote)) {
int tdiff = time_left(&next_vote);
tk_log_debug("next ts in the past " intfmt(tdiff));
ticket_next_cron_at(tk, &near_future);
} else {
ticket_next_cron_at(tk, &next_vote);
}
break;
case ST_CANDIDATE:
assert(is_time_set(&tk->election_end));
ticket_next_cron_at(tk, &tk->election_end);
break;
case ST_INIT:
case ST_FOLLOWER:
/* If there is (or should be) some owner, check on it later on.
* If no one is interested - don't care. */
if (is_owned(tk)) {
interval_add(&tk->term_expires, tk->acquire_after, &tv);
ticket_next_cron_at(tk, &tv);
}
break;
default:
tk_log_error("unknown ticket state: %d", tk->state);
}
if (tk->next_state) {
/* we need to do something soon here */
if (!tk->acks_expected) {
ticket_next_cron_at(tk, &near_future);
} else {
ticket_activate_timeout(tk);
}
}
} else {
/* At least six minutes, to make sure that multi-leader situations
* will be solved promptly.
*/
tk_log_debug("manual ticket will be woken up after up to six minutes");
ticket_next_cron_in(tk, 60*TIME_RES);
/* For manual tickets, no earlier timeout could be set in a similar
* way as it is done in a switch above for automatic tickets.
* The reason is that term's timeout is INF and no Raft-based elections
* are performed.
*/
}
if (ANYDEBUG) {
log_next_wakeup(tk);
}
}
void schedule_election(struct booth_config *conf_ptr, struct ticket_config *tk,
cmd_reason_t reason)
{
assert(conf_ptr != NULL);
assert(conf_ptr->local != NULL);
if (conf_ptr->local->type != SITE)
return;
tk->election_reason = reason;
get_time(&tk->next_cron);
/* introduce a short delay before starting election */
add_random_delay(tk);
}
int is_manual(struct ticket_config *tk)
{
return (tk->mode == TICKET_MODE_MANUAL) ? 1 : 0;
}
/* Given a state (in host byte order), return a human-readable (char*).
* An array is used so that multiple states can be printed in a single printf(). */
char *state_to_string(uint32_t state_ho)
{
union mu { cmd_request_t s; char c[5]; };
static union mu cache[6] = { { 0 } }, *cur;
static int current = 0;
current ++;
if (current >= sizeof(cache)/sizeof(cache[0]))
current = 0;
cur = cache + current;
cur->s = htonl(state_ho);
/* Shouldn't be necessary, union array is initialized with zeroes, and
* these bytes never get written. */
cur->c[4] = 0;
return cur->c;
}
int send_reject(struct booth_config *conf_ptr, struct booth_site *dest,
struct ticket_config *tk, cmd_result_t code,
struct boothc_ticket_msg *in_msg)
{
int req = ntohl(in_msg->header.cmd);
struct boothc_ticket_msg msg;
tk_log_debug("sending reject to %s",
site_string(dest));
init_ticket_msg(conf_ptr, &msg, OP_REJECTED, req, code, 0, tk);
return booth_udp_send_auth(conf_ptr, dest, &msg, sendmsglen(&msg));
}
int send_msg(struct booth_config *conf_ptr, int cmd, struct ticket_config *tk,
struct booth_site *dest, struct boothc_ticket_msg *in_msg)
{
int req = 0;
struct ticket_config *valid_tk = tk;
struct boothc_ticket_msg msg;
/* if we want to send the last valid ticket, then if we're in
* the ST_CANDIDATE state, the last valid ticket is in
* tk->last_valid_tk
*/
if (cmd == OP_MY_INDEX) {
if (tk->state == ST_CANDIDATE && tk->last_valid_tk) {
valid_tk = tk->last_valid_tk;
}
tk_log_info("sending status to %s",
site_string(dest));
}
if (in_msg)
req = ntohl(in_msg->header.cmd);
init_ticket_msg(conf_ptr, &msg, cmd, req, RLT_SUCCESS, 0, valid_tk);
return booth_udp_send_auth(conf_ptr, dest, &msg, sendmsglen(&msg));
}
diff --git a/src/ticket.h b/src/ticket.h
index 7258888..496f4b3 100644
--- a/src/ticket.h
+++ b/src/ticket.h
@@ -1,354 +1,351 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _TICKET_H
#define _TICKET_H
#include <time.h>
#include <sys/time.h>
#include <math.h>
#include "timer.h"
#include "config.h"
#include "log.h"
extern int TIME_RES;
#define DEFAULT_TICKET_EXPIRY (600*TIME_RES)
#define DEFAULT_TICKET_TIMEOUT (5*TIME_RES)
#define DEFAULT_RETRIES 10
#define FOREACH_TICKET(b_, i_, t_) \
for (i_ = 0; \
(t_ = (b_)->ticket + i_, i_ < (b_)->ticket_count); \
i_++)
#define FOREACH_NODE(b_, i_, n_) \
for (i_ = 0; \
(n_ = (b_)->site + i_, i_ < (b_)->site_count); \
i_++)
#define set_leader(tk, who) do { \
if (who == NULL) { \
mark_ticket_as_revoked_from_leader(tk); \
} \
\
tk->leader = who; \
tk_log_debug("ticket leader set to %s", ticket_leader_string(tk)); \
\
if (tk->leader) { \
mark_ticket_as_granted(tk, tk->leader); \
} \
} while(0)
#define mark_ticket_as_granted(tk, who) do { \
if (is_manual(tk) && (who->index > -1)) { \
tk->sites_where_granted[who->index] = 1; \
tk_log_debug("manual ticket marked as granted to %s", ticket_leader_string(tk)); \
} \
} while(0)
#define mark_ticket_as_revoked(tk, who) do { \
if (is_manual(tk) && who && (who->index > -1)) { \
tk->sites_where_granted[who->index] = 0; \
tk_log_debug("manual ticket marked as revoked from %s", site_string(who)); \
} \
} while(0)
#define mark_ticket_as_revoked_from_leader(tk) do { \
if (tk->leader) { \
mark_ticket_as_revoked(tk, tk->leader); \
} \
} while(0)
#define set_state(tk, newst) do { \
tk_log_debug("state transition: %s -> %s", \
state_to_string(tk->state), state_to_string(newst)); \
tk->state = newst; \
} while(0)
#define set_next_state(tk, newst) do { \
if (!(newst)) tk_log_debug("next state reset"); \
else tk_log_debug("next state set to %s", state_to_string(newst)); \
tk->next_state = newst; \
} while(0)
#define is_term_invalid(tk, term) \
((tk)->last_valid_tk && (tk)->last_valid_tk->current_term > (term))
void save_committed_tkt(struct ticket_config *tk);
void disown_ticket(struct ticket_config *tk);
+
+/* XXX UNUSED */
int disown_if_expired(struct ticket_config *tk);
/**
* @internal
* Pick a ticket structure based on given name, with some apriori sanity checks
*
* @param[inout] conf_ptr config object to refer to
* @param[in] ticket name of the ticket to search for
* @param[out] found place the reference here when found
*
* @return 0 on failure, see @find_ticket_by_name otherwise
*/
int check_ticket(struct booth_config *conf_ptr, const char *ticket,
struct ticket_config **tc);
/**
* @internal
* Check whether given site is valid
*
* @param[inout] conf_ptr config object to refer to
* @param[in] site which member to look for
* @param[out] is_local store whether the member is local on success
*
* @note XXX UNUSED
*
* @return 1 on success (found and valid), 0 otherwise
*/
int check_site(struct booth_config *conf_ptr, const char *site,
int *local);
-int grant_ticket(struct ticket_config *ticket);
-int revoke_ticket(struct ticket_config *ticket);
-
/**
* @internal
* Second stage of incoming datagram handling (after authentication)
*
* @param[inout] conf_ptr config object to refer to
* @param[in] buf raw message to act upon
* @param[in] source member originating this message
*
* @return 0 on success or negative value (-1 or -errno) on error
*/
int ticket_recv(struct booth_config *conf_ptr, void *buf,
struct booth_site *source);
void reset_ticket(struct ticket_config *tk);
void reset_ticket_and_set_no_leader(struct ticket_config *tk);
/**
* @internal
* Based on the current state and circumstances, make a state transition
*
* @param[inout] conf_ptr config object to refer to
* @param[in] tk ticket at hand
* @param[in] sender site structure of the sender
*/
void update_ticket_state(struct booth_config *conf_ptr,
struct ticket_config *tk, struct booth_site *sender);
/**
* @internal
* Initial "consult local pacemaker and booth peers" inquiries
*
* @param[inout] conf_ptr config object to use as a starting point
*
* @return 0 (for the time being)
*/
int setup_ticket(struct booth_config *conf_ptr);
int check_max_len_valid(const char *s, int max);
/**
* @internal
* Pick a ticket structure based on given name
*
* @param[inout] conf_ptr config object to refer to
* @param[in] ticket name of the ticket to search for
* @param[out] found place the reference here when found
*
* @return see @list_ticket and @send_header_plus
*/
int find_ticket_by_name(struct booth_config *conf_ptr,
const char *ticket, struct ticket_config **found);
/**
* @internal
* Apply the next step with the ticket if possible.
*
* @param[inout] conf_ptr config object to refer to
* @param[in] tk ticket at hand
*/
void set_ticket_wakeup(struct booth_config *conf_ptr,
struct ticket_config *tk);
-int postpone_ticket_processing(struct ticket_config *tk);
-
/**
* @internal
* Implementation of the ticket listing
*
* @param[inout] conf_ptr config object to refer to
* @param[in] file descriptor of the socket to respond to
*
* @return see @list_ticket and @send_header_plus
*/
int ticket_answer_list(struct booth_config *conf_ptr, int fd);
/**
* @internal
* Process request from the client (as opposed to peer daemon)
*
* @param[inout] conf_ptr config object to refer to
* @param[in] req_client client structure of the sender
* @param[in] buf message itself
*
* @return 1 on success, 0 when not done with the message, yet
*/
int process_client_request(struct booth_config *conf_ptr,
struct client *req_client, void *buf);
/**
* @internal
* Cause the ticket storage backend to persist the ticket
*
* @param[inout] conf_ptr config object to refer to
* @param[in] tk ticket at hand
*
* @return 0 on success, 1 when not carried out for being dangerous
*/
int ticket_write(struct booth_config *conf_ptr,
struct ticket_config *tk);
/**
* @internal
* Mainloop of booth ticket handling
*
* @param[inout] conf_ptr config object to refer to
*/
void process_tickets(struct booth_config *conf_ptr);
/**
* @internal
* For each ticket, log some notable properties
*
* @param[inout] conf_ptr config object to refer to
*/
void tickets_log_info(struct booth_config *conf_ptr);
char *state_to_string(uint32_t state_ho);
/**
* @internal
* For a given ticket and recipient site, send a rejection
*
* @param[inout] conf_ptr config object to refer to
* @param[in] dest site structure of the recipient
* @param[in] tk ticket at hand
* @param[in] code further detail for the rejection
* @param[in] in_msg message this is going to be a response to
*/
int send_reject(struct booth_config *conf_ptr, struct booth_site *dest,
struct ticket_config *tk, cmd_result_t code,
struct boothc_ticket_msg *in_msg);
/**
* @internal
* For a given ticket, recipient site and possibly its message, send a response
*
* @param[inout] conf_ptr config object to refer to
* @param[in] cmd what type of message is to be sent
* @param[in] dest site structure of the recipient
* @param[in] in_msg message this is going to be a response to
*/
int send_msg(struct booth_config *conf_ptr, int cmd, struct ticket_config *tk,
struct booth_site *dest, struct boothc_ticket_msg *in_msg);
/**
* @internal
* Notify client at particular socket, regarding particular ticket
*
* @param[inout] conf_ptr config object to refer to
* @param[in] tk ticket at hand
* @param[in] fd file descriptor of the socket to respond to
* @param[in] msg input message being responded to
*/
int notify_client(struct booth_config *conf_ptr, struct ticket_config *tk,
int client_fd, struct boothc_ticket_msg *msg);
/**
* @internal
* Broadcast the current state of the ticket as seen from local perspective
*
* @param[inout] conf_ptr config object to refer to
* @param[in] tk ticket at hand
* @param[in] cmd what type of message is to be sent
* @param[in] expected_reply what to expect in response
* @param[in] res may carry further detail with cmd == OP_REJECTED
* @param[in] reason trigger of this broadcast
*/
int ticket_broadcast(struct booth_config *conf_ptr,
struct ticket_config *tk, cmd_request_t cmd,
cmd_request_t expected_reply, cmd_result_t res,
cmd_reason_t reason);
/**
* @internal
* Update the ticket (+broadcast to that effect) and/or write it to the backend
*
* @param[inout] conf_ptr config object to refer to
* @param[in] tk ticket at hand
*
* @return 0 or see #ticket_broadcast
*/
int leader_update_ticket(struct booth_config *conf_ptr,
struct ticket_config *tk);
void add_random_delay(struct ticket_config *tk);
/**
* @internal
* Make it so the nearest ticket swipe will start election
*
* @param[inout] conf_ptr config object to refer to
* @param[in] tk ticket at hand
* @param[in] reason explains why new election is conducted
*/
void schedule_election(struct booth_config *conf_ptr, struct ticket_config *tk,
cmd_reason_t reason);
int is_manual(struct ticket_config *tk);
int check_attr_prereq(struct ticket_config *tk, grant_type_e grant_type);
static inline void ticket_next_cron_at(struct ticket_config *tk, timetype *when)
{
copy_time(when, &tk->next_cron);
}
static inline void ticket_next_cron_in(struct ticket_config *tk, int interval)
{
timetype tv;
set_future_time(&tv, interval);
ticket_next_cron_at(tk, &tv);
}
static inline void ticket_activate_timeout(struct ticket_config *tk)
{
/* TODO: increase timeout when no answers */
tk_log_debug("activate ticket timeout in %d", tk->timeout);
ticket_next_cron_in(tk, tk->timeout);
}
#endif /* _TICKET_H */
diff --git a/src/transport.c b/src/transport.c
index f7685b8..158ec51 100644
--- a/src/transport.c
+++ b/src/transport.c
@@ -1,1153 +1,1151 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <netdb.h> /* getnameinfo */
#include <poll.h>
#include <arpa/inet.h>
#include <asm/types.h>
#include <linux/rtnetlink.h>
#include <net/if.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h> /* getnameinfo */
#include "b_config.h"
#include "config.h"
#include "transport.h"
#include "attr.h"
#include "auth.h"
#include "booth.h"
#include "inline-fn.h"
#include "log.h"
#include "ticket.h"
#define BOOTH_IPADDR_LEN (sizeof(struct in6_addr))
#define NETLINK_BUFSIZE 16384
#define SOCKET_BUFFER_SIZE 160000
#define FRAME_SIZE_MAX 10000
/* function to be called when handling booth-group-internal messages;
* it's expected to return 0 to indicate success, negative integer
* to indicate silent (or possibly already complained about) error,
* or positive integer to indicate sender's ID that will then be
* emitted in the error log message together with the real source
* address if this is available */
static int (*deliver_fn) (void *msg, int msglen);
static void parse_rtattr(struct rtattr *tb[],
int max, struct rtattr *rta, int len)
{
while (RTA_OK(rta, len)) {
if (rta->rta_type <= max)
tb[rta->rta_type] = rta;
rta = RTA_NEXT(rta,len);
}
}
enum match_type {
NO_MATCH = 0,
FUZZY_MATCH,
EXACT_MATCH,
};
static int find_address(struct booth_config *conf_ptr,
unsigned char ipaddr[BOOTH_IPADDR_LEN],
int family, int prefixlen, int fuzzy_allowed,
struct booth_site **me, int *address_bits_matched)
{
int i;
struct booth_site *node;
int bytes, bits_left, mask;
unsigned char node_bits, ip_bits;
uint8_t *n_a;
int matched;
enum match_type did_match = NO_MATCH;
assert(conf_ptr != NULL);
bytes = prefixlen / 8;
bits_left = prefixlen % 8;
/* One bit left to check means ignore 7 lowest bits. */
mask = ~( (1 << (8 - bits_left)) -1);
FOREACH_NODE(conf_ptr, i, node) {
if (family != node->family)
continue;
n_a = node_to_addr_pointer(node);
for(matched = 0; matched < node->addrlen; matched++)
if (ipaddr[matched] != n_a[matched])
break;
if (matched == node->addrlen) {
*address_bits_matched = matched * 8;
*me = node;
did_match = EXACT_MATCH;
break;
}
if (!fuzzy_allowed)
continue;
/* Check prefix, whole bytes */
if (matched < bytes)
continue;
if (matched * 8 < *address_bits_matched)
continue;
node_bits = n_a[bytes];
ip_bits = ipaddr[bytes];
if (((node_bits ^ ip_bits) & mask) == 0) {
/* _At_least_ prefixlen bits matched. */
if (did_match < EXACT_MATCH) {
*address_bits_matched = prefixlen;
*me = node;
did_match = FUZZY_MATCH;
}
}
}
return did_match;
}
static int _find_myself(struct booth_config *conf_ptr, int family,
int fuzzy_allowed)
{
int fd;
struct sockaddr_nl nladdr;
struct booth_site *me = NULL;
unsigned char ipaddr[BOOTH_IPADDR_LEN];
static char rcvbuf[NETLINK_BUFSIZE];
struct {
struct nlmsghdr nlh;
struct rtgenmsg g;
} req;
int address_bits_matched;
assert(conf_ptr != NULL);
if (conf_ptr->local != NULL)
goto found;
address_bits_matched = 0;
fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE);
if (fd < 0) {
log_error("failed to create netlink socket");
return 0;
}
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf));
memset(&nladdr, 0, sizeof(nladdr));
nladdr.nl_family = AF_NETLINK;
memset(&req, 0, sizeof(req));
req.nlh.nlmsg_len = sizeof(req);
req.nlh.nlmsg_type = RTM_GETADDR;
req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST;
req.nlh.nlmsg_pid = 0;
req.nlh.nlmsg_seq = 1;
req.g.rtgen_family = family;
if (sendto(fd, (void *)&req, sizeof(req), 0,
(struct sockaddr*)&nladdr, sizeof(nladdr)) < 0) {
close(fd);
log_error("failed to send data to netlink socket");
return 0;
}
while (1) {
int status;
struct nlmsghdr *h;
struct iovec iov = { rcvbuf, sizeof(rcvbuf) };
struct msghdr msg = {
(void *)&nladdr, sizeof(nladdr),
&iov, 1,
NULL, 0,
0
};
status = recvmsg(fd, &msg, 0);
if (!status) {
close(fd);
log_error("failed to recvmsg from netlink socket");
return 0;
}
h = (struct nlmsghdr *)rcvbuf;
if (h->nlmsg_type == NLMSG_DONE)
break;
if (h->nlmsg_type == NLMSG_ERROR) {
close(fd);
log_error("netlink socket recvmsg error");
return 0;
}
while (NLMSG_OK(h, status)) {
if (h->nlmsg_type == RTM_NEWADDR) {
struct ifaddrmsg *ifa = NLMSG_DATA(h);
struct rtattr *tb[IFA_MAX+1];
int len = h->nlmsg_len
- NLMSG_LENGTH(sizeof(*ifa));
memset(tb, 0, sizeof(tb));
parse_rtattr(tb, IFA_MAX, IFA_RTA(ifa), len);
memset(ipaddr, 0, BOOTH_IPADDR_LEN);
/* prefer IFA_LOCAL if it exists, for p-t-p
* interfaces, otherwise use IFA_ADDRESS */
if (tb[IFA_LOCAL]) {
memcpy(ipaddr, RTA_DATA(tb[IFA_LOCAL]),
BOOTH_IPADDR_LEN);
} else {
memcpy(ipaddr, RTA_DATA(tb[IFA_ADDRESS]),
BOOTH_IPADDR_LEN);
}
/* Try to find the exact address or the address with subnet matching.
* The function find_address will be called for each address received
* from NLMSG_DATA above.
* The exact match will be preferred. If no exact match is found,
* the function find_address will try to return another, most similar
* address (with the longest possible number of same bytes). */
if (ifa->ifa_prefixlen > address_bits_matched) {
find_address(conf_ptr, ipaddr,
ifa->ifa_family, ifa->ifa_prefixlen,
fuzzy_allowed, &me, &address_bits_matched);
if (me != NULL) {
log_debug("found myself at %s (%d bits matched)",
site_string(me), address_bits_matched);
}
}
/* If the previous NLMSG_DATA calls have already allowed us
* to find an address with address_bits_matched matching bits,
* then no other better non-exact address can be found.
* But we can still try to find an exact match, so let us
* call the function find_address with disabled searching of
* similar addresses (fuzzy_allowed == 0) */
else if (ifa->ifa_prefixlen == address_bits_matched) {
find_address(conf_ptr, ipaddr,
ifa->ifa_family, ifa->ifa_prefixlen,
0 /* fuzzy_allowed */, &me, &address_bits_matched);
if (me != NULL) {
log_debug("found myself at %s (exact match)",
site_string(me));
}
}
}
h = NLMSG_NEXT(h, status);
}
}
close(fd);
if (me == NULL)
return 0;
me->local = 1;
conf_ptr->local = me;
found:
return 1;
}
int find_myself(struct booth_config *conf_ptr, int fuzzy_allowed)
{
return _find_myself(conf_ptr, AF_INET6, fuzzy_allowed)
|| _find_myself(conf_ptr, AF_INET, fuzzy_allowed);
}
/** Checks the header fields for validity.
* cf. init_header().
* For @len_incl_data < 0 the length is not checked.
* Return <0 if error, else bytes read. */
int check_boothc_header(struct boothc_header *h, int len_incl_data)
{
int l;
if (h->magic != htonl(BOOTHC_MAGIC)) {
log_error("magic error %x", ntohl(h->magic));
return -EINVAL;
}
if (h->version != htonl(BOOTHC_VERSION)) {
log_error("version error %x", ntohl(h->version));
return -EINVAL;
}
l = ntohl(h->length);
if (l < sizeof(*h)) {
log_error("length %d out of range", l);
return -EINVAL;
}
if (len_incl_data < 0)
return 0;
if (l != len_incl_data) {
log_error("length error - got %d, wanted %d",
len_incl_data, l);
return -EINVAL;
}
return len_incl_data;
}
static int do_read(int fd, void *buf, size_t count)
{
int rv, off = 0;
while (off < count) {
rv = read(fd, (char *)buf + off, count - off);
if (rv == 0)
return -1;
if (rv == -1 && errno == EINTR)
continue;
if (rv == -1 && errno == EWOULDBLOCK)
break;
if (rv == -1)
return -1;
off += rv;
}
return off;
}
static int do_write(int fd, void *buf, size_t count)
{
int rv, off = 0;
retry:
rv = send(fd, (char *)buf + off, count, MSG_NOSIGNAL);
if (rv == -1 && errno == EINTR)
goto retry;
/* If we cannot write _any_ data, we'd be in an (potential) loop. */
if (rv <= 0) {
log_error("send failed: %s (%d)", strerror(errno), errno);
return rv;
}
if (rv != count) {
count -= rv;
off += rv;
goto retry;
}
return 0;
}
-
/* Only used for client requests (tcp) */
-int read_client(struct client *req_cl)
+static int read_client(struct client *req_cl)
{
char *msg;
struct boothc_header *header;
int rv, fd;
int len = MAX_MSG_LEN;
if (!req_cl->msg) {
msg = malloc(MAX_MSG_LEN);
if (!msg) {
log_error("out of memory for client messages");
return -1;
}
req_cl->msg = (void *)msg;
} else {
msg = (char *)req_cl->msg;
}
header = (struct boothc_header *)msg;
/* update len if we read enough */
if (req_cl->offset >= sizeof(*header)) {
len = min(ntohl(header->length), MAX_MSG_LEN);
}
fd = req_cl->fd;
rv = do_read(fd, msg+req_cl->offset, len-req_cl->offset);
if (rv < 0) {
if (errno == ECONNRESET)
log_debug("client connection reset for fd %d", fd);
return -1;
}
req_cl->offset += rv;
/* update len if we read enough */
if (req_cl->offset >= sizeof(*header)) {
len = min(ntohl(header->length), MAX_MSG_LEN);
}
if (req_cl->offset < len) {
/* client promised to send more */
return 1;
}
if (check_boothc_header(header, len) < 0) {
return -1;
}
return 0;
}
-
/* Only used for client requests (tcp) */
static void process_connection(struct booth_config *conf_ptr, int ci)
{
struct client *req_cl;
void *msg = NULL;
struct boothc_header *header;
struct boothc_hdr_msg err_reply;
cmd_result_t errc;
void (*deadfn) (int ci);
req_cl = clients + ci;
switch (read_client(req_cl)) {
case -1: /* error */
goto kill;
case 1: /* more to read */
return;
case 0:
/* we can process the request now */
msg = req_cl->msg;
}
header = (struct boothc_header *)msg;
if (check_auth(conf_ptr, NULL, msg, ntohl(header->length))) {
errc = RLT_AUTH;
goto send_err;
}
/* For CMD_GRANT and CMD_REVOKE:
* Don't close connection immediately, but send
* result a second later? */
switch (ntohl(header->cmd)) {
case CMD_LIST:
ticket_answer_list(conf_ptr, req_cl->fd);
goto kill;
case CMD_PEERS:
list_peers(conf_ptr, req_cl->fd);
goto kill;
case CMD_GRANT:
case CMD_REVOKE:
if (process_client_request(conf_ptr, req_cl, msg) == 1)
goto kill; /* request processed definitely, close connection */
else
return;
case ATTR_LIST:
case ATTR_GET:
case ATTR_SET:
case ATTR_DEL:
if (process_attr_request(conf_ptr, req_cl, msg) == 1)
goto kill; /* request processed definitely, close connection */
else
return;
default:
log_error("connection %d cmd %x unknown",
ci, ntohl(header->cmd));
errc = RLT_INVALID_ARG;
goto send_err;
}
assert(0);
return;
send_err:
init_header(conf_ptr, &err_reply.header, CL_RESULT, 0, 0, errc, 0,
sizeof(err_reply));
send_client_msg(conf_ptr, req_cl->fd, &err_reply);
kill:
deadfn = req_cl->deadfn;
if(deadfn) {
deadfn(ci);
}
return;
}
static void process_tcp_listener(struct booth_config *conf_ptr, int ci)
{
int fd, i, flags, one = 1;
socklen_t addrlen = sizeof(struct sockaddr);
struct sockaddr addr;
fd = accept(clients[ci].fd, &addr, &addrlen);
if (fd < 0) {
log_error("process_tcp_listener: accept error %d %d",
fd, errno);
return;
}
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one));
flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
i = client_add(fd, clients[ci].transport,
process_connection, NULL);
log_debug("client connection %d fd %d", i, fd);
}
int setup_tcp_listener(struct booth_site *local, int test_only)
{
int s, rv;
int one = 1;
assert(local != NULL);
s = socket(local->family, SOCK_STREAM, 0);
if (s == -1) {
log_error("failed to create tcp socket %s", strerror(errno));
return s;
}
rv = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one));
if (rv == -1) {
close(s);
log_error("failed to set the SO_REUSEADDR option");
return rv;
}
rv = bind(s, &local->sa6, local->saddrlen);
if (test_only) {
rv = (rv == -1) ? errno : 0;
close(s);
return rv;
}
if (rv == -1) {
close(s);
log_error("failed to bind socket %s", strerror(errno));
return rv;
}
rv = listen(s, 5);
if (rv == -1) {
close(s);
log_error("failed to listen on socket %s", strerror(errno));
return rv;
}
return s;
}
static int booth_tcp_init(struct booth_config *conf_ptr,
void * unused __attribute__((unused)))
{
int rv;
assert(conf_ptr != NULL && conf_ptr->transport != NULL);
if (get_local_id(conf_ptr) < 0)
return -1;
rv = setup_tcp_listener(conf_ptr->local, 0);
if (rv < 0)
return rv;
client_add(rv, *conf_ptr->transport + TCP, process_tcp_listener, NULL);
return 0;
}
static int connect_nonb(int sockfd, const struct sockaddr *saptr,
socklen_t salen, int sec)
{
int flags, n, error;
socklen_t len;
fd_set rset, wset;
struct timeval tval;
flags = fcntl(sockfd, F_GETFL, 0);
fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
error = 0;
if ( (n = connect(sockfd, saptr, salen)) < 0)
if (errno != EINPROGRESS)
return -1;
if (n == 0)
goto done; /* connect completed immediately */
FD_ZERO(&rset);
FD_SET(sockfd, &rset);
wset = rset;
tval.tv_sec = sec;
tval.tv_usec = 0;
if ((n = select(sockfd + 1, &rset, &wset, NULL,
sec ? &tval : NULL)) == 0) {
/* leave outside function to close */
/* timeout */
/* close(sockfd); */
errno = ETIMEDOUT;
return -1;
}
if (FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) {
len = sizeof(error);
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0)
return -1; /* Solaris pending error */
} else {
log_error("select error: sockfd not set");
return -1;
}
done:
fcntl(sockfd, F_SETFL, flags); /* restore file status flags */
if (error) {
/* leave outside function to close */
/* close(sockfd); */
errno = error;
return -1;
}
return 0;
}
static int booth_tcp_open(struct booth_site *to)
{
int s, rv;
if (to->tcp_fd >= STDERR_FILENO)
goto found;
s = socket(to->family, SOCK_STREAM, 0);
if (s == -1) {
log_error("cannot create socket of family %d", to->family);
return -1;
}
rv = connect_nonb(s, (struct sockaddr *)&to->sa6, to->saddrlen, 10);
if (rv == -1) {
if( errno == ETIMEDOUT)
log_error("connect to %s got a timeout", site_string(to));
else
log_error("connect to %s got an error: %s", site_string(to),
strerror(errno));
goto error;
}
to->tcp_fd = s;
found:
return 1;
error:
if (s >= 0)
close(s);
return -1;
}
/* data + (datalen-sizeof(struct hmac)) points to struct hmac
* i.e. struct hmac is always tacked on the payload
*/
static int add_hmac(struct booth_config *conf_ptr, void *data, int len)
{
int rv = 0;
#if HAVE_LIBGCRYPT || HAVE_LIBMHASH
int payload_len;
struct hmac *hp;
assert(conf_ptr != NULL);
if (!is_auth_req(conf_ptr))
return 0;
payload_len = len - sizeof(struct hmac);
hp = (struct hmac *)((unsigned char *)data + payload_len);
hp->hid = htonl(BOOTH_HASH);
memset(hp->hash, 0, BOOTH_MAC_SIZE);
rv = calc_hmac(data, payload_len, BOOTH_HASH, hp->hash,
conf_ptr->authkey, conf_ptr->authkey_len);
if (rv < 0) {
log_error("internal error: cannot calculate mac");
}
#endif
return rv;
}
static int booth_tcp_send(struct booth_config *conf_ptr,
struct booth_site *to, void *buf, int len)
{
int rv;
rv = add_hmac(conf_ptr, buf, len);
if (!rv)
rv = do_write(to->tcp_fd, buf, len);
return rv;
}
static int booth_tcp_recv(struct booth_site *from, void *buf, int len)
{
int got;
/* Needs timeouts! */
got = do_read(from->tcp_fd, buf, len);
if (got < 0) {
log_error("read failed (%d): %s", errno, strerror(errno));
return got;
}
return got;
}
static int booth_tcp_recv_auth(struct booth_config *conf_ptr,
struct booth_site *from, void *buf, int len)
{
int got, total;
int payload_len;
/* Needs timeouts! */
payload_len = len - sizeof(struct hmac);
got = booth_tcp_recv(from, buf, payload_len);
if (got < 0) {
return got;
}
total = got;
if (is_auth_req(conf_ptr)) {
got = booth_tcp_recv(from, (unsigned char *)buf+payload_len, sizeof(struct hmac));
if (got != sizeof(struct hmac)
|| check_auth(conf_ptr, from, buf, len)) {
return -1;
}
total += got;
}
return total;
}
static int booth_tcp_close(struct booth_site *to)
{
if (to) {
if (to->tcp_fd > STDERR_FILENO)
close(to->tcp_fd);
to->tcp_fd = -1;
}
return 0;
}
static int booth_tcp_exit(void)
{
return 0;
}
static int setup_udp_server(struct booth_site *local)
{
int rv, fd;
int one = 1;
unsigned int recvbuf_size;
assert(local != NULL);
fd = socket(local->family, SOCK_DGRAM, 0);
if (fd == -1) {
log_error("failed to create UDP socket %s", strerror(errno));
goto ex;
}
rv = fcntl(fd, F_SETFL, O_NONBLOCK);
if (rv == -1) {
log_error("failed to set non-blocking operation "
"on UDP socket: %s", strerror(errno));
goto ex;
}
rv = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one));
if (rv == -1) {
log_error("failed to set the SO_REUSEADDR option");
goto ex;
}
rv = bind(fd, (struct sockaddr *)&local->sa6, local->saddrlen);
if (rv == -1) {
log_error("failed to bind UDP socket to [%s]:%d: %s",
site_string(local), site_port(local),
strerror(errno));
goto ex;
}
recvbuf_size = SOCKET_BUFFER_SIZE;
rv = setsockopt(fd, SOL_SOCKET, SO_RCVBUF,
&recvbuf_size, sizeof(recvbuf_size));
if (rv == -1) {
log_error("failed to set recvbuf size");
goto ex;
}
local->udp_fd = fd;
return 0;
ex:
if (fd >= 0)
close(fd);
return -1;
}
/* Receive/process callback for UDP */
static void process_recv(struct booth_config *conf_ptr, int ci)
{
struct sockaddr_storage sa;
int rv;
socklen_t sa_len;
/* beware, the buffer needs to be large enough to accept
* a packet */
char buffer[MAX_MSG_LEN];
/* Used for unit tests */
struct boothc_ticket_msg *msg;
sa_len = sizeof(sa);
msg = (void*)buffer;
rv = recvfrom(clients[ci].fd,
buffer, sizeof(buffer),
MSG_NOSIGNAL | MSG_DONTWAIT,
(struct sockaddr *)&sa, &sa_len);
if (rv == -1)
return;
rv = deliver_fn((void*)msg, rv);
if (rv > 0) {
if (getnameinfo((struct sockaddr *)&sa, sa_len,
buffer, sizeof(buffer), NULL, 0,
NI_NUMERICHOST) == 0)
log_error("unknown sender: %08x (real: %s)", rv, buffer);
else
log_error("unknown sender: %08x", rv);
}
}
static int booth_udp_init(struct booth_config *conf_ptr, void *f)
{
int rv;
assert(conf_ptr != NULL && conf_ptr->transport != NULL);
assert(conf_ptr->local != NULL);
rv = setup_udp_server(conf_ptr->local);
if (rv < 0)
return rv;
deliver_fn = f;
client_add(conf_ptr->local->udp_fd, *conf_ptr->transport + UDP,
process_recv, NULL);
return 0;
}
static int booth_udp_send(struct booth_config *conf_ptr, struct booth_site *to,
void *buf, int len)
{
int rv;
assert(conf_ptr != NULL);
assert(conf_ptr->local != NULL);
to->sent_cnt++;
rv = sendto(conf_ptr->local->udp_fd, buf, len, MSG_NOSIGNAL,
(struct sockaddr *)&to->sa6, to->saddrlen);
if (rv == len) {
rv = 0;
} else if (rv < 0) {
to->sent_err_cnt++;
log_error("Cannot send to %s: %d %s",
site_string(to),
errno,
strerror(errno));
} else {
rv = -1;
to->sent_err_cnt++;
log_error("Packet sent to %s got truncated",
site_string(to));
}
return rv;
}
int booth_udp_send_auth(struct booth_config *conf_ptr,
struct booth_site *to, void *buf, int len)
{
int rv;
rv = add_hmac(conf_ptr, buf, len);
if (rv < 0)
return rv;
return booth_udp_send(conf_ptr, to, buf, len);
}
static int booth_udp_broadcast_auth(struct booth_config *conf_ptr,
void *buf, int len)
{
int i, rv, rvs;
struct booth_site *site;
assert(conf_ptr != NULL);
assert(conf_ptr->local != NULL);
if (conf_ptr == NULL || !conf_ptr->site_count)
return -1;
rv = add_hmac(conf_ptr, buf, len);
if (rv < 0)
return rv;
rvs = 0;
FOREACH_NODE(conf_ptr, i, site) {
if (site != conf_ptr->local) {
rv = booth_udp_send(conf_ptr, site, buf, len);
if (!rvs)
rvs = rv;
}
}
return rvs;
}
static int booth_udp_exit(void)
{
return 0;
}
/* SCTP transport layer has not been developed yet */
static int booth_sctp_init(struct booth_config *conf_ptr __attribute__((unused)),
void *f __attribute__((unused)))
{
return 0;
}
static int booth_sctp_send(struct booth_config *conf_ptr __attribute__((unused)),
struct booth_site * to __attribute__((unused)),
void *buf __attribute__((unused)),
int len __attribute__((unused)))
{
return 0;
}
static int booth_sctp_broadcast(void *buf __attribute__((unused)),
int len __attribute__((unused)))
{
return 0;
}
static int return_0_booth_site(struct booth_site *v __attribute((unused)))
{
return 0;
}
static int return_0(void)
{
return 0;
}
/* semi-hidden, only main.c to have a knowledge about this */
const booth_transport_table_t booth__transport = {
[TCP] = {
.name = "TCP",
.init = booth_tcp_init,
.open = booth_tcp_open,
.send = booth_tcp_send,
.recv = booth_tcp_recv,
.recv_auth = booth_tcp_recv_auth,
.close = booth_tcp_close,
.exit = booth_tcp_exit
},
[UDP] = {
.name = "UDP",
.init = booth_udp_init,
.open = return_0_booth_site,
.send = booth_udp_send,
.send_auth = booth_udp_send_auth,
.close = return_0_booth_site,
.broadcast_auth = booth_udp_broadcast_auth,
.exit = booth_udp_exit
},
[SCTP] = {
.name = "SCTP",
.init = booth_sctp_init,
.open = return_0_booth_site,
.send = booth_sctp_send,
.broadcast = booth_sctp_broadcast,
.exit = return_0,
}
};
#if HAVE_LIBGCRYPT || HAVE_LIBMHASH
/* TODO: we need some client identification for logging */
#define peer_string(p) (p ? site_string(p) : "client")
/* verify the validity of timestamp from the header
* the timestamp needs to be either greater than the one already
* recorded for the site or, and this is checked for clients,
* not to be older than conf_ptr->maxtimeskew
* update the timestamp for the site, if this packet is from a
* site
*/
static int verify_ts(struct booth_config *conf_ptr, struct booth_site *from,
void *buf, int len)
{
struct boothc_header *h;
struct timeval tv, curr_tv, now;
assert(conf_ptr != NULL);
if (len < sizeof(*h)) {
log_error("%s: packet too short", peer_string(from));
return -1;
}
h = (struct boothc_header *)buf;
tv.tv_sec = ntohl(h->secs);
tv.tv_usec = ntohl(h->usecs);
if (from) {
curr_tv.tv_sec = from->last_secs;
curr_tv.tv_usec = from->last_usecs;
if (timercmp(&tv, &curr_tv, >))
goto accept;
log_warn("%s: packet timestamp older than previous one",
site_string(from));
}
gettimeofday(&now, NULL);
now.tv_sec -= conf_ptr->maxtimeskew;
if (timercmp(&tv, &now, >))
goto accept;
log_error("%s: packet timestamp older than %d seconds",
peer_string(from), conf_ptr->maxtimeskew);
return -1;
accept:
if (from) {
from->last_secs = tv.tv_sec;
from->last_usecs = tv.tv_usec;
}
return 0;
}
#endif
int check_auth(struct booth_config *conf_ptr, struct booth_site *from,
void *buf, int len)
{
int rv = 0;
#if HAVE_LIBGCRYPT || HAVE_LIBMHASH
int payload_len;
struct hmac *hp;
assert(conf_ptr != NULL);
if (!is_auth_req(conf_ptr))
return 0;
payload_len = len - sizeof(struct hmac);
if (payload_len < 0) {
log_error("%s: failed to authenticate, packet too short (size:%d)",
peer_string(from), len);
return -1;
}
hp = (struct hmac *)((unsigned char *)buf + payload_len);
rv = verify_hmac(buf, payload_len, ntohl(hp->hid), hp->hash,
conf_ptr->authkey, conf_ptr->authkey_len);
if (!rv) {
rv = verify_ts(conf_ptr, from, buf, len);
}
if (rv != 0) {
log_error("%s: failed to authenticate", peer_string(from));
}
#endif
return rv;
}
int send_data(struct booth_config *conf_ptr, int fd, void *data, int datalen)
{
int rv = 0;
rv = add_hmac(conf_ptr, data, datalen);
if (!rv)
rv = do_write(fd, data, datalen);
return rv;
}
int send_header_plus(struct booth_config *conf_ptr, int fd,
struct boothc_hdr_msg *msg, void *data, int len)
{
int rv;
rv = send_data(conf_ptr, fd, msg, sendmsglen(msg)-len);
if (rv >= 0 && len)
rv = do_write(fd, data, len);
return rv;
}
/* UDP message receiver (see also deliver_fn declaration's comment) */
int message_recv(struct booth_config *conf_ptr, void *msg, int msglen)
{
uint32_t from;
struct boothc_header *header;
struct booth_site *source;
header = (struct boothc_header *)msg;
from = ntohl(header->from);
if (!find_site_by_id(conf_ptr, from, &source)) {
/* caller knows the actual source address, pass
the (assuredly) positive number and let it report */
from = from ? from : ~from; /* avoid 0 (success) */
return from & (~0U >> 1); /* avoid negative (error code} */
}
time(&source->last_recv);
source->recv_cnt++;
if (check_boothc_header(header, msglen) < 0) {
log_error("message from %s receive error", site_string(source));
source->recv_err_cnt++;
return -1;
}
if (check_auth(conf_ptr, source, msg, msglen)) {
log_error("%s failed to authenticate", site_string(source));
source->sec_cnt++;
return -1;
}
if (ntohl(header->opts) & BOOTH_OPT_ATTR) {
/* not used, clients send/retrieve attributes directly
* from sites
*/
return attr_recv(conf_ptr, msg, source);
} else {
return ticket_recv(conf_ptr, msg, source);
}
}
diff --git a/src/transport.h b/src/transport.h
index 7df8d6c..4350736 100644
--- a/src/transport.h
+++ b/src/transport.h
@@ -1,165 +1,164 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _TRANSPORT_H
#define _TRANSPORT_H
#include "b_config.h"
#include "booth.h"
typedef enum {
TCP = 1,
UDP,
SCTP,
TRANSPORT_ENTRIES,
} transport_layer_t;
typedef enum {
ARBITRATOR = 0x50,
SITE,
CLIENT,
DAEMON,
STATUS,
GEOSTORE,
} action_t;
/* when allocating space for messages
*/
#define MAX_MSG_LEN 1024
struct booth_transport {
const char *name;
int (*init) (struct booth_config *, void *);
int (*open) (struct booth_site *);
int (*send) (struct booth_config *, struct booth_site *, void *, int);
int (*send_auth) (struct booth_config *, struct booth_site *, void *, int);
int (*recv) (struct booth_site *, void *, int);
int (*recv_auth) (struct booth_config *, struct booth_site *, void *, int);
int (*broadcast) (void *, int);
int (*broadcast_auth) (struct booth_config *, void *, int);
int (*close) (struct booth_site *);
int (*exit) (void);
};
typedef struct booth_transport booth_transport_table_t[TRANSPORT_ENTRIES];
/**
* @internal
* Attempts to pick identity of self from config-tracked enumeration of sites
*
* @param[inout] conf_ptr config object to refer to
* @param[in] fuzzy_allowed whether it's OK to approximate the match
*
* @return 0 on success or negative value (-1 or -errno) on error
*/
int find_myself(struct booth_config *conf_ptr, int fuzzy_allowed);
-int read_client(struct client *req_cl);
int check_boothc_header(struct boothc_header *data, int len_incl_data);
/**
* @internal
* Setup the TCP listener/server
*
* @param[in] local thix verysite
* @param[in] test_only whether to just check if binding is clear
*
* @return 0 on success or -1 or errno on error
*/
int setup_tcp_listener(struct booth_site *local, int test_only);
/**
* @internal
* Send data, with authentication added
*
* @param[inout] conf_ptr config object to refer to
* @param[in] to site structure of the recipient
* @param[in] buf message itself
* @param[in] len lenght of #buf
*
* @return see @add_hmac and @booth_udp_send
*/
int booth_udp_send_auth(struct booth_config *conf_ptr, struct booth_site *to,
void *buf, int len);
/**
* @internal
* First stage of incoming datagram handling (authentication)
*
* @param[inout] conf_ptr config object to refer to
* @param[in] msg raw message to act upon
* @param[in] msglen lenght of #msg
*
* @return 0 on success or negative value (-1 or -errno) on error
*/
int message_recv(struct booth_config *conf_ptr, void *msg, int msglen);
inline static void * node_to_addr_pointer(struct booth_site *node) {
switch (node->family) {
case AF_INET: return &node->sa4.sin_addr;
case AF_INET6: return &node->sa6.sin6_addr;
}
return NULL;
}
/**
* @internal
* Send data, with authentication added
*
* @param[inout] conf_ptr config object to refer to
* @param[in] fd descriptor of the socket to respond to
* @param[in] data message itself
* @param[in] datalen lenght of #data
*
* @return 0 on success or negative value (-1 or -errno) on error
*/
int send_data(struct booth_config *conf_ptr, int fd, void *data, int datalen);
/**
* @internal
* First stage of incoming datagram handling (authentication)
*
* @param[inout] conf_ptr config object to refer to
* @param[in] fd descriptor of the socket to respond to
* @param[in] hdr message header
* @param[in] data message itself
* @param[in] len lengh of @data
*
* @return see #send_data and #do_write
*/
int send_header_plus(struct booth_config *conf_ptr, int fd,
struct boothc_hdr_msg *hdr, void *data, int len);
#define send_client_msg(bc, fd, msg) send_data(bc, fd, msg, sendmsglen(msg))
/**
* @internal
* First stage of incoming datagram handling (authentication)
*
* @param[inout] conf_ptr config object to refer to
* @param[in] from site structure of the sender
* @param[in] buf message to check
* @param[in] len lengh of @buf
*
* @return see #send_data and #do_write
*/
int check_auth(struct booth_config *conf_ptr, struct booth_site *from,
void *buf, int len);
#endif /* _TRANSPORT_H */

File Metadata

Mime Type
text/x-diff
Expires
Mon, Feb 24, 5:46 AM (1 d, 20 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1463982
Default Alt Text
(86 KB)

Event Timeline