Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F2823018
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
86 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/src/booth.h b/src/booth.h
index 51b667e..2cb9616 100644
--- a/src/booth.h
+++ b/src/booth.h
@@ -1,281 +1,282 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _BOOTH_H
#define _BOOTH_H
#include <stdint.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#define BOOTH_RUN_DIR "/var/run/booth/"
#define BOOTH_LOG_DIR "/var/log"
#define BOOTH_LOGFILE_NAME "booth.log"
#define BOOTH_DEFAULT_CONF_DIR "/etc/booth/"
#define BOOTH_DEFAULT_CONF_NAME "booth"
#define BOOTH_DEFAULT_CONF_EXT ".conf"
#define BOOTH_DEFAULT_CONF \
BOOTH_DEFAULT_CONF_DIR BOOTH_DEFAULT_CONF_NAME BOOTH_DEFAULT_CONF_EXT
#define DAEMON_NAME "boothd"
#define BOOTH_PATH_LEN 127
#define BOOTH_DEFAULT_PORT 9929
/* TODO: remove */
#define BOOTH_PROTO_FAMILY AF_INET
#define BOOTHC_MAGIC 0x5F1BA08C
#define BOOTHC_VERSION 0x00010002
/** Timeout value for poll().
* Determines frequency of periodic jobs, eg. when send-retries are done.
* See process_tickets(). */
#define POLL_TIMEOUT 1000
/** @{ */
/** The on-network data structures and constants. */
#define BOOTH_NAME_LEN 64
#define CHAR2CONST(a,b,c,d) ((a << 24) | (b << 16) | (c << 8) | d)
/* Says that the ticket shouldn't be active anywhere.
* NONE wouldn't be specific enough. */
#define NO_ONE ((uint32_t)-1)
/* Says that another one should recover. */
#define TICKET_LOST CHAR2CONST('L', 'O', 'S', 'T')
typedef unsigned char boothc_site [BOOTH_NAME_LEN];
typedef unsigned char boothc_ticket[BOOTH_NAME_LEN];
struct boothc_header {
/** Authentication data; not used now. */
uint32_t iv;
uint32_t auth1;
uint32_t auth2;
/** BOOTHC_MAGIC */
uint32_t magic;
/** BOOTHC_VERSION */
uint32_t version;
/** Packet source; site_id. See add_site(). */
uint32_t from;
/** Length including header */
uint32_t length;
/** The command respectively protocol state. See cmd_request_t. */
uint32_t cmd;
/** Command options. */
uint32_t options;
/** The reason for this RPC. */
uint32_t reason;
/** Result of operation. 0 == OK */
uint32_t result;
char data[0];
} __attribute__((packed));
struct ticket_msg {
/** Ticket name. */
boothc_ticket id;
/** Current leader. May be NO_ONE. See add_site().
* For a OP_REQ_VOTE this is */
uint32_t leader;
/** Current term. */
uint32_t term;
uint32_t term_valid_for;
/* Perhaps we need to send a status along, too - like
* starting, running, stopping, error, ...? */
uint32_t leader_commit; // TODO: NEEDED?
} __attribute__((packed));
struct boothc_ticket_msg {
struct boothc_header header;
struct ticket_msg ticket;
} __attribute__((packed));
typedef enum {
/* 0x43 = "C"ommands */
CMD_LIST = CHAR2CONST('C', 'L', 's', 't'),
CMD_GRANT = CHAR2CONST('C', 'G', 'n', 't'),
CMD_REVOKE = CHAR2CONST('C', 'R', 'v', 'k'),
/* Replies */
CMR_GENERAL = CHAR2CONST('G', 'n', 'l', 'R'), // Increase distance to CMR_GRANT
CMR_LIST = CHAR2CONST('R', 'L', 's', 't'),
CMR_GRANT = CHAR2CONST('R', 'G', 'n', 't'),
CMR_REVOKE = CHAR2CONST('R', 'R', 'v', 'k'),
/* get status from another server */
OP_STATUS = CHAR2CONST('S', 't', 'a', 't'),
OP_MY_INDEX = CHAR2CONST('M', 'I', 'd', 'x'), /* reply to status */
/* Raft */
OP_REQ_VOTE = CHAR2CONST('R', 'V', 'o', 't'), /* start election */
OP_VOTE_FOR = CHAR2CONST('V', 't', 'F', 'r'), /* reply to REQ_VOTE */
OP_HEARTBEAT= CHAR2CONST('H', 'r', 't', 'B'), /* Heartbeat */
OP_UPDATE = CHAR2CONST('U', 'p', 'd', 'E'), /* Update ticket */
OP_REVOKE = CHAR2CONST('R', 'e', 'v', 'k'), /* Revoke ticket */
OP_REJECTED = CHAR2CONST('R', 'J', 'C', '!'),
} cmd_request_t;
typedef enum {
/* for compatibility with other functions */
RLT_SUCCESS = 0,
RLT_ASYNC = CHAR2CONST('A', 's', 'y', 'n'),
RLT_SYNC_SUCC = CHAR2CONST('S', 'c', 'c', 's'),
RLT_SYNC_FAIL = CHAR2CONST('F', 'a', 'i', 'l'),
RLT_INVALID_ARG = CHAR2CONST('I', 'A', 'r', 'g'),
RLT_EXT_FAILED = CHAR2CONST('X', 'P', 'r', 'g'),
+ RLT_TICKET_IDLE = CHAR2CONST('T', 'i', 'd', 'l'),
RLT_OVERGRANT = CHAR2CONST('O', 'v', 'e', 'r'),
RLT_PROBABLY_SUCCESS = CHAR2CONST('S', 'u', 'c', '?'),
RLT_BUSY = CHAR2CONST('B', 'u', 's', 'y'),
RLT_TERM_OUTDATED = CHAR2CONST('T', 'O', 'd', 't'),
RLT_TERM_STILL_VALID = CHAR2CONST('T', 'V', 'l', 'd'),
RLT_YOU_OUTDATED = CHAR2CONST('O', 'u', 't', 'd'),
RLT_REDIRECT = CHAR2CONST('R', 'e', 'd', 'r'),
} cmd_result_t;
typedef enum {
/* for compatibility with other functions */
OR_JUST_SO = 0,
OR_AGAIN = CHAR2CONST('A', 'a', 'a', 'a'),
OR_TKT_LOST = CHAR2CONST('T', 'L', 's', 't'),
OR_REACQUIRE = CHAR2CONST('R', 'a', 'c', 'q'),
OR_ADMIN = CHAR2CONST('A', 'd', 'm', 'n'),
OR_LOCAL_FAIL = CHAR2CONST('L', 'o', 'c', 'F'),
OR_STEPDOWN = CHAR2CONST('S', 'p', 'd', 'n'),
OR_SPLIT = CHAR2CONST('S', 'p', 'l', 't'),
} cmd_reason_t;
/* bitwise command options, currently used only for immediate
* grant */
typedef enum {
OPT_IMMEDIATE = 1,
} cmd_options_t;
/** @} */
/** @{ */
struct booth_site {
/** Calculated ID. See add_site(). */
int site_id;
int type;
int local;
/** Roles, like ACCEPTOR, PROPOSER, or LEARNER. Not really used ATM. */
int role;
char addr_string[BOOTH_NAME_LEN];
int tcp_fd;
int udp_fd;
/* 0-based, used for indexing into per-ticket weights */
int index;
uint64_t bitmask;
unsigned short family;
union {
struct sockaddr_in sa4;
struct sockaddr_in6 sa6;
};
int saddrlen;
int addrlen;
} __attribute__((packed));
extern struct booth_site *local;
extern struct booth_site * no_leader;
/** @} */
struct booth_transport;
struct client {
int fd;
const struct booth_transport *transport;
void (*workfn)(int);
void (*deadfn)(int);
};
extern struct client *clients;
extern struct pollfd *pollfds;
int client_add(int fd, const struct booth_transport *tpt,
void (*workfn)(int ci), void (*deadfn)(int ci));
int do_read(int fd, void *buf, size_t count);
int do_write(int fd, void *buf, size_t count);
void process_connection(int ci);
void safe_copy(char *dest, char *value, size_t buflen, const char *description);
struct command_line {
int type; /* ACT_ */
int op; /* OP_ */
int options; /* OPT_ */
char configfile[BOOTH_PATH_LEN];
char lockfile[BOOTH_PATH_LEN];
char site[BOOTH_NAME_LEN];
struct boothc_ticket_msg msg;
};
extern struct command_line cl;
/* http://gcc.gnu.org/onlinedocs/gcc/Typeof.html */
#define min(a__,b__) \
({ typeof (a__) _a = (a__); \
typeof (b__) _b = (b__); \
_a < _b ? _a : _b; })
#define max(a__,b__) \
({ typeof (a__) _a = (a__); \
typeof (b__) _b = (b__); \
_a > _b ? _a : _b; })
#endif /* _BOOTH_H */
diff --git a/src/config.h b/src/config.h
index d90b2d0..e531833 100644
--- a/src/config.h
+++ b/src/config.h
@@ -1,207 +1,210 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _CONFIG_H
#define _CONFIG_H
#include <stdint.h>
#include "booth.h"
#include "raft.h"
#include "transport.h"
/** @{ */
/** Definitions for in-RAM data. */
#define MAX_NODES 16
#define TICKET_ALLOC 16
struct ticket_config {
/** \name Configuration items.
* @{ */
/** Name of ticket. */
boothc_ticket name;
/** How many seconds a term lasts (if not refreshed). */
int term_duration;
/** Network related timeouts. */
int timeout;
/** Retries before giving up. */
int retries;
/** If >0, time to wait for a site to get fenced.
* The ticket may be acquired after that timespan by
* another site. */
int acquire_after; /* TODO: needed? */
/* Program to ask whether it makes sense to
* acquire the ticket */
char *ext_verifier;
/** Node weights. */
int weight[MAX_NODES];
/** @} */
/** \name Runtime values.
* @{ */
/** Current state. */
server_state_e state;
/** When something has to be done */
struct timeval next_cron;
/** Current leader. This is effectively the log[] in Raft. */
struct booth_site *leader;
/** Is the ticket granted? */
int is_granted;
/** Timestamp of leadership expiration */
time_t term_expires;
/** End of election period */
time_t election_end;
struct booth_site *voted_for;
/** Who the various sites vote for.
* NO_OWNER = no vote yet. */
struct booth_site *votes_for[MAX_NODES];
/* bitmap */
uint64_t votes_received;
/** Last voting round that was seen. */
uint32_t current_term;
/** Do ticket updates whenever we get enough heartbeats.
* But do that only once.
* This is reset to 0 whenever we broadcast heartbeat and set
* to 1 once enough acks are received.
*/
uint32_t ticket_updated;
/** @} */
/** */
uint32_t commit_index;
/** */
uint32_t last_applied;
uint32_t next_index[MAX_NODES];
uint32_t match_index[MAX_NODES];
/* if it is potentially dangerous to grant the ticket
* immediately, then this is set to some point in time,
* usually (now + term_duration + acquire_after)
*/
time_t delay_grant;
/* if we expect some acks, then set this to the id of
* the RPC which others will send us; it is cleared once all
* replies were received
*/
uint32_t acks_expected;
/* bitmask of servers which sent acks
*/
uint64_t acks_received;
/* timestamp of the request, currently unused */
time_t req_sent_at;
+ /* don't log warnings unnecessarily
+ */
+ int expect_more_rejects;
/** \name Needed while proposals are being done.
* @{ */
/** Whom to vote for the next time.
* Needed to push a ticket to someone else. */
#if 0
/** Bitmap of sites that acknowledge that state. */
uint64_t proposal_acknowledges;
/** When an incompletely acknowledged proposal gets done.
* If all peers agree, that happens sooner.
* See switch_state_to(). */
struct timeval proposal_switch;
/** Timestamp of proposal expiration. */
time_t proposal_expires;
#endif
/** Number of send retries left.
* Used on the new owner.
* Starts at 0, counts up. */
int retry_number;
/** @} */
};
struct booth_config {
char name[BOOTH_NAME_LEN];
transport_layer_t proto;
uint16_t port;
/** Stores the OR of sites bitmasks. */
uint64_t sites_bits;
/** Stores the OR of all members' bitmasks. */
uint64_t all_bits;
char site_user[BOOTH_NAME_LEN];
char site_group[BOOTH_NAME_LEN];
char arb_user[BOOTH_NAME_LEN];
char arb_group[BOOTH_NAME_LEN];
uid_t uid;
gid_t gid;
int site_count;
struct booth_site site[MAX_NODES];
int ticket_count;
int ticket_allocated;
struct ticket_config *ticket;
};
extern struct booth_config *booth_conf;
int read_config(const char *path);
int check_config(int type);
int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type);
int find_site_by_id(uint32_t site_id, struct booth_site **node);
const char *type_to_string(int type);
#include <stdio.h>
#define R(tk_) do { if (ANYDEBUG) printf("## %12s:%3d state %s, %d:%d, " \
"leader %s, exp %s", __FILE__, __LINE__, \
state_to_string(tk_->state), tk_->current_term, \
tk_->commit_index, site_string(tk_->leader), ctime(&tk_->term_expires)); } while(0)
#endif /* _CONFIG_H */
diff --git a/src/handler.c b/src/handler.c
index b51fca8..885997c 100644
--- a/src/handler.c
+++ b/src/handler.c
@@ -1,70 +1,70 @@
/*
* Copyright (C) 2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <inttypes.h>
#include <stdio.h>
#include <assert.h>
#include <time.h>
#include "ticket.h"
#include "config.h"
#include "inline-fn.h"
#include "log.h"
#include "pacemaker.h"
#include "booth.h"
#include "handler.h"
/** Runs an external handler.
* See eg. 'before-acquire-handler'.
* TODO: timeout, async operation?. */
int run_handler(struct ticket_config *tk,
const char *cmd, int synchronous)
{
int rv;
char expires[16];
if (!cmd)
return 0;
assert(synchronous);
sprintf(expires, "%" PRId64, (int64_t)(tk->term_expires));
rv = setenv("BOOTH_TICKET", tk->name, 1) ||
setenv("BOOTH_LOCAL", local->addr_string, 1) ||
setenv("BOOTH_CONF_NAME", booth_conf->name, 1) ||
setenv("BOOTH_CONF_PATH", cl.configfile, 1) ||
setenv("BOOTH_TICKET_EXPIRES", expires, 1);
if (rv) {
- log_error("Cannot set environment: %d", errno);
+ log_error("Cannot set environment: %s", strerror(errno));
} else {
rv = system(cmd);
if (rv)
- log_warn("handler \"%s\" exited with error %s",
+ tk_log_warn("handler \"%s\" exited with error %s",
cmd, interpret_rv(rv));
else
- log_info("handler \"%s\" exited with success", cmd);
+ tk_log_debug("handler \"%s\" exited with success", cmd);
}
return rv;
}
diff --git a/src/log.h b/src/log.h
index 4ac60fe..74e662a 100644
--- a/src/log.h
+++ b/src/log.h
@@ -1,32 +1,46 @@
/*
* Copyright (C) 2010-2011 Red Hat, Inc. All rights reserved.
* (This code is borrowed from the sanlock project which is hosted on
* fedorahosted.org.)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _LOG_H
#define _LOG_H
#include <heartbeat/glue_config.h>
#include <clplumbing/cl_log.h>
-#define log_debug(fmt, args...) do { if (ANYDEBUG) cl_log(LOG_DEBUG, fmt, ##args); } while (0)
+#define log_debug(fmt, args...) do { \
+ if (ANYDEBUG) cl_log(LOG_DEBUG, fmt, ##args); } \
+ while (0)
#define log_info(fmt, args...) cl_log(LOG_INFO, fmt, ##args)
#define log_warn(fmt, args...) cl_log(LOG_WARNING, fmt, ##args)
#define log_error(fmt, args...) cl_log(LOG_ERR, fmt, ##args)
+/* all tk_* macros prepend "%(tk->name): " (the caller needs to
+ * have the ticket named tk!)
+ */
+#define tk_cl_log(sev, fmt, args...) cl_log(sev, "%s: " fmt, tk->name, ##args)
+
+#define tk_log_debug(fmt, args...) do { \
+ if (ANYDEBUG) tk_cl_log(LOG_DEBUG, fmt, ##args); } \
+ while (0)
+#define tk_log_info(fmt, args...) tk_cl_log(LOG_INFO, fmt, ##args)
+#define tk_log_warn(fmt, args...) tk_cl_log(LOG_WARNING, fmt, ##args)
+#define tk_log_error(fmt, args...) tk_cl_log(LOG_ERR, fmt, ##args)
+
#endif /* _LOG_H */
diff --git a/src/main.c b/src/main.c
index b7076a0..cf186d1 100644
--- a/src/main.c
+++ b/src/main.c
@@ -1,1335 +1,1340 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sched.h>
#include <errno.h>
#include <limits.h>
#include <sys/file.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/poll.h>
#include <pacemaker/crm/services.h>
#include <clplumbing/setproctitle.h>
#include <fcntl.h>
#include <string.h>
#include <assert.h>
#include <error.h>
#include <sys/ioctl.h>
#include <termios.h>
#include <signal.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include "log.h"
#include "booth.h"
#include "config.h"
#include "transport.h"
#include "inline-fn.h"
#include "pacemaker.h"
#include "ticket.h"
#define RELEASE_VERSION "1.0"
#define CLIENT_NALLOC 32
int daemonize = 0;
/** Structure for "clients".
* Filehandles with incoming data get registered here (and in pollfds),
* along with their callbacks.
* Because these can be reallocated with every new fd, addressing
* happens _only_ by their numeric index. */
struct client *clients = NULL;
struct pollfd *pollfds = NULL;
static int client_maxi;
static int client_size = 0;
static const struct booth_site _no_leader = {
.addr_string = "none",
.site_id = NO_ONE,
};
struct booth_site *no_leader = (struct booth_site*)& _no_leader;
typedef enum
{
BOOTHD_STARTED=0,
BOOTHD_STARTING
} BOOTH_DAEMON_STATE;
int poll_timeout = POLL_TIMEOUT;
struct booth_config *booth_conf;
struct command_line cl;
int do_read(int fd, void *buf, size_t count)
{
int rv, off = 0;
while (off < count) {
rv = read(fd, (char *)buf + off, count - off);
if (rv == 0)
return -1;
if (rv == -1 && errno == EINTR)
continue;
if (rv == -1)
return -1;
off += rv;
}
return 0;
}
int do_write(int fd, void *buf, size_t count)
{
int rv, off = 0;
retry:
rv = write(fd, (char *)buf + off, count);
if (rv == -1 && errno == EINTR)
goto retry;
/* If we cannot write _any_ data, we'd be in an (potential) loop. */
if (rv <= 0) {
log_error("write failed: %s (%d)", strerror(errno), errno);
return rv;
}
if (rv != count) {
count -= rv;
off += rv;
goto retry;
}
return 0;
}
static void client_alloc(void)
{
int i;
if (!clients) {
clients = malloc(CLIENT_NALLOC * sizeof(struct client));
pollfds = malloc(CLIENT_NALLOC * sizeof(struct pollfd));
} else {
clients = realloc(clients, (client_size + CLIENT_NALLOC) *
sizeof(struct client));
pollfds = realloc(pollfds, (client_size + CLIENT_NALLOC) *
sizeof(struct pollfd));
}
if (!clients || !pollfds) {
log_error("can't alloc for client array");
exit(1);
}
for (i = client_size; i < client_size + CLIENT_NALLOC; i++) {
clients[i].workfn = NULL;
clients[i].deadfn = NULL;
clients[i].fd = -1;
pollfds[i].fd = -1;
pollfds[i].revents = 0;
}
client_size += CLIENT_NALLOC;
}
static void client_dead(int ci)
{
if (clients[ci].fd != -1)
close(clients[ci].fd);
clients[ci].fd = -1;
clients[ci].workfn = NULL;
pollfds[ci].fd = -1;
}
int client_add(int fd, const struct booth_transport *tpt,
void (*workfn)(int ci),
void (*deadfn)(int ci))
{
int i;
struct client *c;
if (client_size + 2 >= client_maxi ) {
client_alloc();
}
for (i = 0; i < client_size; i++) {
c = clients + i;
if (c->fd != -1)
continue;
c->workfn = workfn;
if (deadfn)
c->deadfn = deadfn;
else
c->deadfn = client_dead;
c->transport = tpt;
c->fd = fd;
pollfds[i].fd = fd;
pollfds[i].events = POLLIN;
if (i > client_maxi)
client_maxi = i;
return i;
}
assert(!"no client");
}
/* Only used for client requests, TCP ???*/
void process_connection(int ci)
{
struct boothc_ticket_msg msg;
int rv, len, expr, fd;
void (*deadfn) (int ci);
fd = clients[ci].fd;
rv = do_read(fd, &msg.header, sizeof(msg.header));
if (rv < 0) {
if (errno == ECONNRESET)
log_debug("client %d connection reset for fd %d",
ci, clients[ci].fd);
goto kill;
}
if (check_boothc_header(&msg.header, -1) < 0)
goto kill;
/* Basic sanity checks already done. */
len = ntohl(msg.header.length);
if (len) {
if (len != sizeof(msg)) {
bad_len:
log_error("got wrong length %u", len);
return;
}
expr = len - sizeof(msg.header);
rv = do_read(clients[ci].fd, msg.header.data, expr);
if (rv < 0) {
log_error("connection %d read data error %d, wanted %d",
ci, rv, expr);
goto kill;
}
}
/* For CMD_GRANT and CMD_REVOKE:
* Don't close connection immediately, but send
* result a second later? */
switch (ntohl(msg.header.cmd)) {
case CMD_LIST:
ticket_answer_list(fd, &msg);
goto kill;
case CMD_GRANT:
/* Expect boothc_ticket_site_msg. */
if (len != sizeof(msg))
goto bad_len;
ticket_answer_grant(fd, &msg);
goto kill;
case CMD_REVOKE:
/* Expect boothc_ticket_site_msg. */
if (len != sizeof(msg))
goto bad_len;
ticket_answer_revoke(fd, &msg);
goto kill;
default:
log_error("connection %d cmd %x unknown",
ci, ntohl(msg.header.cmd));
init_header(&msg.header,CMR_GENERAL, 0, RLT_INVALID_ARG, 0, sizeof(msg.header));
send_header_only(fd, &msg.header);
goto kill;
}
assert(0);
return;
kill:
deadfn = clients[ci].deadfn;
if(deadfn) {
deadfn(ci);
}
return;
}
/** Callback function for the listening TCP socket. */
static void process_listener(int ci)
{
int fd, i;
fd = accept(clients[ci].fd, NULL, NULL);
if (fd < 0) {
log_error("process_listener: accept error for fd %d: %s (%d)",
clients[ci].fd, strerror(errno), errno);
if (clients[ci].deadfn)
clients[ci].deadfn(ci);
return;
}
i = client_add(fd, clients[ci].transport, process_connection, NULL);
log_debug("add client connection %d fd %d", i, fd);
}
static int setup_config(int type)
{
int rv;
rv = read_config(cl.configfile);
if (rv < 0)
goto out;
/* Set "local" pointer, ignoring errors. */
if (cl.type == DAEMON && cl.site[0]) {
if (!find_site_by_name(cl.site, &local, 1)) {
log_error("Cannot find \"%s\" in the configuration.",
cl.site);
return -EINVAL;
}
local->local = 1;
} else
find_myself(NULL, type == CLIENT);
rv = check_config(type);
if (rv < 0)
goto out;
/* Per default the PID file name is derived from the
* configuration name. */
if (!cl.lockfile[0]) {
snprintf(cl.lockfile, sizeof(cl.lockfile)-1,
"%s/%s.pid", BOOTH_RUN_DIR, booth_conf->name);
}
out:
return rv;
}
static int setup_transport(void)
{
int rv;
rv = transport()->init(message_recv);
if (rv < 0) {
log_error("failed to init booth_transport %s", transport()->name);
goto out;
}
rv = booth_transport[TCP].init(NULL);
if (rv < 0) {
log_error("failed to init booth_transport[TCP]");
goto out;
}
out:
return rv;
}
static int write_daemon_state(int fd, int state)
{
char buffer[1024];
int rv, size;
size = sizeof(buffer) - 1;
rv = snprintf(buffer, size,
"booth_pid=%d "
"booth_state=%s "
"booth_type=%s "
"booth_cfg_name='%s' "
"booth_addr_string='%s' "
"booth_port=%d\n",
getpid(),
( state == BOOTHD_STARTED ? "started" :
state == BOOTHD_STARTING ? "starting" :
"invalid"),
type_to_string(local->type),
booth_conf->name,
local->addr_string,
booth_conf->port);
if (rv < 0 || rv == size) {
log_error("Buffer filled up in write_daemon_state().");
return -1;
}
size = rv;
rv = ftruncate(fd, 0);
if (rv < 0) {
log_error("lockfile %s truncate error %d: %s",
cl.lockfile, errno, strerror(errno));
return rv;
}
rv = lseek(fd, 0, SEEK_SET);
if (rv < 0) {
log_error("lseek set fd(%d) offset to 0 error, return(%d), message(%s)",
fd, rv, strerror(errno));
rv = -1;
return rv;
}
rv = write(fd, buffer, size);
if (rv != size) {
log_error("write to fd(%d, %d) returned %d, errno %d, message(%s)",
fd, size,
rv, errno, strerror(errno));
return -1;
}
return 0;
}
static int loop(int fd)
{
void (*workfn) (int ci);
void (*deadfn) (int ci);
int rv, i;
rv = setup_transport();
if (rv < 0)
goto fail;
rv = setup_ticket();
if (rv < 0)
goto fail;
client_add(local->tcp_fd, booth_transport + TCP,
process_listener, NULL);
rv = write_daemon_state(fd, BOOTHD_STARTED);
if (rv != 0) {
log_error("write daemon state %d to lockfile error %s: %s",
BOOTHD_STARTED, cl.lockfile, strerror(errno));
goto fail;
}
if (cl.type == ARBITRATOR)
log_info("BOOTH arbitrator daemon started");
else if (cl.type == SITE)
log_info("BOOTH cluster site daemon started");
while (1) {
rv = poll(pollfds, client_maxi + 1, poll_timeout);
if (rv == -1 && errno == EINTR)
continue;
if (rv < 0) {
log_error("poll failed: %s (%d)", strerror(errno), errno);
goto fail;
}
for (i = 0; i <= client_maxi; i++) {
if (clients[i].fd < 0)
continue;
if (pollfds[i].revents & POLLIN) {
workfn = clients[i].workfn;
if (workfn)
workfn(i);
}
if (pollfds[i].revents &
(POLLERR | POLLHUP | POLLNVAL)) {
deadfn = clients[i].deadfn;
if (deadfn)
deadfn(i);
}
}
process_tickets();
}
return 0;
fail:
return -1;
}
static int query_get_string_answer(cmd_request_t cmd)
{
struct booth_site *site;
struct boothc_header reply;
char *data;
int data_len;
int rv;
struct booth_transport const *tpt;
data = NULL;
init_header(&cl.msg.header, cmd, cl.options, 0, 0, sizeof(cl.msg));
if (!*cl.site)
site = local;
else if (!find_site_by_name(cl.site, &site, 1)) {
log_error("cannot find site \"%s\"", cl.site);
rv = ENOENT;
goto out;
}
tpt = booth_transport + TCP;
rv = tpt->open(site);
if (rv < 0)
goto out_free;
rv = tpt->send(site, &cl.msg, sizeof(cl.msg));
if (rv < 0)
goto out_free;
rv = tpt->recv(site, &reply, sizeof(reply));
if (rv < 0)
goto out_free;
data_len = ntohl(reply.length) - sizeof(reply);
data = malloc(data_len);
if (!data) {
rv = -ENOMEM;
goto out_free;
}
rv = tpt->recv(site, data, data_len);
if (rv < 0)
goto out_free;
do_write(STDOUT_FILENO, data, data_len);
rv = 0;
out_free:
free(data);
tpt->close(site);
out:
return rv;
}
static int test_reply(int reply_code, cmd_request_t cmd)
{
int rv = 0;
const char *op_str;
if (cmd == CMD_GRANT)
op_str = "grant";
else if (cmd == CMD_REVOKE)
op_str = "revoke";
else {
log_error("internal error reading reply result!");
return -1;
}
switch (reply_code) {
case RLT_OVERGRANT:
log_info("You're granting a granted ticket. "
"If you wanted to migrate a ticket, "
"use revoke first, then use grant.");
rv = -1;
break;
+ case RLT_TICKET_IDLE:
+ log_info("ticket is not owned");
+ rv = 0;
+ break;
+
case RLT_ASYNC:
log_info("%s command sent, result will be returned "
"asynchronously. Please use \"booth list\" to "
"see the outcome.", op_str);
rv = 0;
break;
case RLT_SYNC_SUCC:
case RLT_SUCCESS:
log_info("%s succeeded!", op_str);
rv = 0;
break;
case RLT_SYNC_FAIL:
log_info("%s failed!", op_str);
rv = -1;
break;
case RLT_INVALID_ARG:
log_error("ticket \"%s\" does not exist",
cl.msg.ticket.id);
break;
case RLT_EXT_FAILED:
log_error("before-acquire-handler for ticket \"%s\" failed, grant denied",
cl.msg.ticket.id);
break;
case RLT_REDIRECT:
/* talk to another site */
rv = 1;
break;
default:
log_error("got an error code: %x", rv);
rv = -1;
}
return rv;
}
static int do_command(cmd_request_t cmd)
{
struct booth_site *site;
struct boothc_ticket_msg reply;
struct booth_transport const *tpt;
uint32_t leader_id;
int rv;
rv = 0;
site = NULL;
if (!*cl.site)
site = local;
else {
if (!find_site_by_name(cl.site, &site, 1)) {
log_error("Site \"%s\" not configured.", cl.site);
goto out_close;
}
}
if (site->type == ARBITRATOR) {
log_error("Site \"%s\" is an arbitrator, cannot grant/revoke ticket there.", cl.site);
goto out_close;
}
assert(site->type == SITE);
/* We don't check for existence of ticket, so that asking can be
* done without local configuration, too.
* Although, that means that the UDP port has to be specified, too. */
if (!cl.msg.ticket.id[0]) {
/* If the loaded configuration has only a single ticket defined, use that. */
if (booth_conf->ticket_count == 1) {
strcpy(cl.msg.ticket.id, booth_conf->ticket[0].name);
} else {
log_error("No ticket given.");
goto out_close;
}
}
redirect:
init_header(&cl.msg.header, cmd, cl.options, 0, 0, sizeof(cl.msg));
/* Always use TCP for client - at least for now. */
tpt = booth_transport + TCP;
rv = tpt->open(site);
if (rv < 0)
goto out_close;
rv = tpt->send(site, &cl.msg, sizeof(cl.msg));
if (rv < 0)
goto out_close;
rv = tpt->recv(site, &reply, sizeof(reply));
if (rv < 0)
goto out_close;
rv = test_reply(ntohl(reply.header.result), cmd);
if (rv == 1) {
local_transport->close(site);
leader_id = ntohl(reply.ticket.leader);
if (!find_site_by_id(leader_id, &site)) {
log_error("Message with unknown redirect site %x received", leader_id);
return rv;
}
goto redirect;
}
out_close:
if (site)
local_transport->close(site);
return rv;
}
static int do_grant(void)
{
return do_command(CMD_GRANT);
}
static int do_revoke(void)
{
return do_command(CMD_REVOKE);
}
static int _lockfile(int mode, int *fdp, pid_t *locked_by)
{
struct flock lock;
int fd, rv;
/* After reboot the directory may not yet exist.
* Try to create it, but ignore errors. */
if (strncmp(cl.lockfile, BOOTH_RUN_DIR,
strlen(BOOTH_RUN_DIR)) == 0)
mkdir(BOOTH_RUN_DIR, 0775);
if (locked_by)
*locked_by = 0;
*fdp = -1;
fd = open(cl.lockfile, mode, 0664);
if (fd < 0)
return errno;
*fdp = fd;
lock.l_type = F_WRLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
lock.l_pid = 0;
if (fcntl(fd, F_SETLK, &lock) == 0)
return 0;
rv = errno;
if (locked_by)
if (fcntl(fd, F_GETLK, &lock) == 0)
*locked_by = lock.l_pid;
return rv;
}
static inline int is_root(void)
{
/* TODO: getuid()? Better way to check? */
return geteuid() == 0;
}
static int create_lockfile(void)
{
int rv, fd;
fd = -1;
rv = _lockfile(O_CREAT | O_WRONLY, &fd, NULL);
if (fd == -1) {
log_error("lockfile %s open error %d: %s",
cl.lockfile, rv, strerror(rv));
return -1;
}
if (rv < 0) {
log_error("lockfile %s setlk error %d: %s",
cl.lockfile, rv, strerror(rv));
goto fail;
}
rv = write_daemon_state(fd, BOOTHD_STARTING);
if (rv != 0) {
log_error("write daemon state %d to lockfile error %s: %s",
BOOTHD_STARTING, cl.lockfile, strerror(errno));
goto fail;
}
if (is_root()) {
if (fchown(fd, booth_conf->uid, booth_conf->gid) < 0)
log_error("fchown() on lockfile said %d: %s",
errno, strerror(errno));
}
return fd;
fail:
close(fd);
return -1;
}
static void unlink_lockfile(int fd)
{
unlink(cl.lockfile);
close(fd);
}
static void print_usage(void)
{
printf("Usages:\n");
printf(" booth daemon [-c config] [-D]\n");
printf(" booth [client] {list|grant|revoke} [options]\n");
printf(" booth status [-c config] [-D]\n");
printf("\n");
printf("Client operations:\n");
printf(" list: List all the tickets\n");
printf(" grant: Grant ticket to site\n");
printf(" revoke: Revoke ticket from site\n");
printf("\n");
printf("Options:\n");
printf(" -c FILE Specify config file [default " BOOTH_DEFAULT_CONF "]\n");
printf(" Can be a path or a name without \".conf\" suffix\n");
printf(" -D Enable debugging to stderr and don't fork\n");
printf(" -S Systemd mode (no forking)\n");
printf(" -t ticket name\n");
printf(" -s site name\n");
printf(" -l LOCKFILE Specify lock file path (daemon only)\n");
printf(" -F Try to grant the ticket immediately (client only)\n");
printf(" -h Print this help, then exit\n");
printf("\n");
printf("Please see the man page for details.\n");
}
#define OPTION_STRING "c:Dl:t:s:FhS"
void safe_copy(char *dest, char *value, size_t buflen, const char *description) {
int content_len = buflen - 1;
if (strlen(value) >= content_len) {
fprintf(stderr, "'%s' exceeds maximum %s length of %d\n",
value, description, content_len);
exit(EXIT_FAILURE);
}
strncpy(dest, value, content_len);
dest[content_len] = 0;
}
static int host_convert(char *hostname, char *ip_str, size_t ip_size)
{
struct addrinfo *result = NULL, hints = {0};
int re = -1;
memset(&hints, 0, sizeof(hints));
hints.ai_family = BOOTH_PROTO_FAMILY;
hints.ai_socktype = SOCK_DGRAM;
re = getaddrinfo(hostname, NULL, &hints, &result);
if (re == 0) {
struct in_addr addr = ((struct sockaddr_in *)result->ai_addr)->sin_addr;
const char *re_ntop = inet_ntop(BOOTH_PROTO_FAMILY, &addr, ip_str, ip_size);
if (re_ntop == NULL) {
re = -1;
}
}
freeaddrinfo(result);
return re;
}
static int read_arguments(int argc, char **argv)
{
int optchar;
char *arg1 = argv[1];
char *op = NULL;
char *cp;
char site_arg[INET_ADDRSTRLEN] = {0};
int left;
if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") ||
!strcmp(arg1, "-h")) {
print_usage();
exit(EXIT_SUCCESS);
}
if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") ||
!strcmp(arg1, "-V")) {
printf("%s %s (built %s %s)\n",
argv[0], RELEASE_VERSION, __DATE__, __TIME__);
exit(EXIT_SUCCESS);
}
if (strcmp(arg1, "arbitrator") == 0 ||
strcmp(arg1, "site") == 0 ||
strcmp(arg1, "start") == 0 ||
strcmp(arg1, "daemon") == 0) {
cl.type = DAEMON;
optind = 2;
} else if (strcmp(arg1, "status") == 0) {
cl.type = STATUS;
optind = 2;
} else if (strcmp(arg1, "client") == 0) {
cl.type = CLIENT;
if (argc < 3) {
print_usage();
exit(EXIT_FAILURE);
}
op = argv[2];
optind = 3;
} else {
cl.type = CLIENT;
op = argv[1];
optind = 2;
}
if (cl.type == CLIENT) {
if (!strcmp(op, "list"))
cl.op = CMD_LIST;
else if (!strcmp(op, "grant"))
cl.op = CMD_GRANT;
else if (!strcmp(op, "revoke"))
cl.op = CMD_REVOKE;
else {
fprintf(stderr, "client operation \"%s\" is unknown\n",
op);
exit(EXIT_FAILURE);
}
}
while (optind < argc) {
optchar = getopt(argc, argv, OPTION_STRING);
switch (optchar) {
case 'c':
if (strchr(optarg, '/')) {
safe_copy(cl.configfile, optarg,
sizeof(cl.configfile), "config file");
} else {
/* If no "/" in there, use with default directory. */
strcpy(cl.configfile, BOOTH_DEFAULT_CONF_DIR);
cp = cl.configfile + strlen(BOOTH_DEFAULT_CONF_DIR);
assert(cp > cl.configfile);
assert(*(cp-1) == '/');
/* Write at the \0, ie. after the "/" */
safe_copy(cp, optarg,
(sizeof(cl.configfile) -
(cp - cl.configfile) -
strlen(BOOTH_DEFAULT_CONF_EXT)),
"config name");
/* If no extension, append ".conf".
* Space is available, see -strlen() above. */
if (!strchr(cp, '.'))
strcat(cp, BOOTH_DEFAULT_CONF_EXT);
}
break;
case 'D':
debug_level++;
/* Fall through */
case 'S':
daemonize = 1;
break;
case 'l':
safe_copy(cl.lockfile, optarg, sizeof(cl.lockfile), "lock file");
break;
case 't':
if (cl.op == CMD_GRANT || cl.op == CMD_REVOKE) {
safe_copy(cl.msg.ticket.id, optarg,
sizeof(cl.msg.ticket.id), "ticket name");
} else {
print_usage();
exit(EXIT_FAILURE);
}
break;
case 's':
/* For testing and debugging: allow "-s site" also for
* daemon start, so that the address that should be used
* can be set manually.
* This makes it easier to start multiple processes
* on one machine. */
if (cl.type == CLIENT ||
(cl.type == DAEMON && debug_level)) {
int re = host_convert(optarg, site_arg, INET_ADDRSTRLEN);
if (re == 0) {
safe_copy(cl.site, site_arg, sizeof(cl.site), "site name");
} else {
safe_copy(cl.site, optarg, sizeof(cl.site), "site name");
}
} else {
log_error("\"-s\" not allowed in daemon mode.");
exit(EXIT_FAILURE);
}
break;
case 'F':
if (cl.type != CLIENT || cl.op != CMD_GRANT) {
log_error("use \"-F\" only for client grant");
exit(EXIT_FAILURE);
}
cl.options |= OPT_IMMEDIATE;
break;
case 'h':
print_usage();
exit(EXIT_SUCCESS);
break;
case ':':
case '?':
fprintf(stderr, "Please use '-h' for usage.\n");
exit(EXIT_FAILURE);
break;
case -1:
/* No more parameters on cmdline, only arguments. */
goto extra_args;
default:
goto unknown;
};
}
return 0;
extra_args:
if (cl.type == CLIENT && !cl.msg.ticket.id[0]) {
/* Use additional argument as ticket name. */
safe_copy(cl.msg.ticket.id,
argv[optind],
sizeof(cl.msg.ticket.id),
"ticket name");
optind++;
}
if (optind == argc)
return 0;
left = argc - optind;
fprintf(stderr, "Superfluous argument%s: %s%s\n",
left == 1 ? "" : "s",
argv[optind],
left == 1 ? "" : "...");
exit(EXIT_FAILURE);
unknown:
fprintf(stderr, "unknown option: %s\n", argv[optind]);
exit(EXIT_FAILURE);
}
static void set_scheduler(void)
{
struct sched_param sched_param;
struct rlimit rlimit;
int rv;
rlimit.rlim_cur = RLIM_INFINITY;
rlimit.rlim_max = RLIM_INFINITY;
setrlimit(RLIMIT_MEMLOCK, &rlimit);
rv = mlockall(MCL_CURRENT | MCL_FUTURE);
if (rv < 0) {
log_error("mlockall failed");
}
rv = sched_get_priority_max(SCHED_RR);
if (rv != -1) {
sched_param.sched_priority = rv;
rv = sched_setscheduler(0, SCHED_RR, &sched_param);
if (rv == -1)
log_error("could not set SCHED_RR priority %d: %s (%d)",
sched_param.sched_priority,
strerror(errno), errno);
} else {
log_error("could not get maximum scheduler priority err %d",
errno);
}
}
static void set_oom_adj(int val)
{
FILE *fp;
fp = fopen("/proc/self/oom_adj", "w");
if (!fp)
return;
fprintf(fp, "%i", val);
fclose(fp);
}
static int do_status(int type)
{
pid_t pid;
int rv, lock_fd, ret;
const char *reason = NULL;
char lockfile_data[1024], *cp;
ret = PCMK_OCF_NOT_RUNNING;
/* TODO: query all, and return quit only if it's _cleanly_ not
* running, ie. _neither_ of port/lockfile/process is available?
*
* Currently a single failure says "not running", even if "only" the
* lockfile has been removed. */
rv = setup_config(type);
if (rv) {
reason = "Error reading configuration.";
ret = PCMK_OCF_UNKNOWN_ERROR;
goto quit;
}
if (!local) {
reason = "No Service IP active here.";
goto quit;
}
rv = _lockfile(O_RDWR, &lock_fd, &pid);
if (rv == 0) {
reason = "PID file not locked.";
goto quit;
}
if (lock_fd == -1) {
reason = "No PID file.";
goto quit;
}
if (pid) {
fprintf(stdout, "booth_lockpid=%d ", pid);
fflush(stdout);
}
rv = read(lock_fd, lockfile_data, sizeof(lockfile_data) - 1);
if (rv < 4) {
reason = "Cannot read lockfile data.";
ret = PCMK_LSB_UNKNOWN_ERROR;
goto quit;
}
lockfile_data[rv] = 0;
if (lock_fd != -1)
close(lock_fd);
/* Make sure it's only a single line */
cp = strchr(lockfile_data, '\r');
if (cp)
*cp = 0;
cp = strchr(lockfile_data, '\n');
if (cp)
*cp = 0;
rv = setup_tcp_listener(1);
if (rv == 0) {
reason = "TCP port not in use.";
goto quit;
}
fprintf(stdout, "booth_lockfile='%s' %s\n",
cl.lockfile, lockfile_data);
if (daemonize)
fprintf(stderr, "Booth at %s port %d seems to be running.\n",
local->addr_string, booth_conf->port);
return 0;
quit:
log_debug("not running: %s", reason);
/* Ie. "DEBUG" */
if (daemonize)
fprintf(stderr, "not running: %s\n", reason);
return ret;
}
static int limit_this_process(void)
{
int rv;
if (!is_root())
return 0;
if (setregid(booth_conf->gid, booth_conf->gid) < 0) {
rv = errno;
log_error("setregid() didn't work: %s", strerror(rv));
return rv;
}
if (setreuid(booth_conf->uid, booth_conf->uid) < 0) {
rv = errno;
log_error("setreuid() didn't work: %s", strerror(rv));
return rv;
}
/* TODO: ulimits? But that would restrict crm_ticket and handler
* scripts, too! */
return 0;
}
static int do_server(int type)
{
int lock_fd = -1;
int rv = -1;
static char log_ent[128] = DAEMON_NAME "-";
rv = setup_config(type);
if (rv < 0)
goto out;
if (!local) {
log_error("Cannot find myself in the configuration.");
exit(EXIT_FAILURE);
}
if (!daemonize) {
if (daemon(0, 0) < 0) {
perror("daemon error");
exit(EXIT_FAILURE);
}
}
/* The lockfile must be written to _after_ the call to daemon(), so
* that the lockfile contains the pid of the daemon, not the parent. */
lock_fd = create_lockfile();
if (lock_fd < 0)
return lock_fd;
strcat(log_ent, type_to_string(local->type));
cl_log_set_entity(log_ent);
cl_log_enable_stderr(debug_level ? TRUE : FALSE);
cl_log_set_facility(HA_LOG_FACILITY);
cl_inherit_logging_environment(0);
log_info("BOOTH %s daemon is starting, node id is 0x%08X (%d).",
type_to_string(local->type),
local->site_id, local->site_id);
signal(SIGUSR1, (__sighandler_t)tickets_log_info);
set_scheduler();
set_oom_adj(-16);
set_proc_title("%s %s for [%s]:%d",
DAEMON_NAME,
type_to_string(local->type),
local->addr_string,
booth_conf->port);
rv = limit_this_process();
if (rv)
return rv;
rv = loop(lock_fd);
out:
if (lock_fd >= 0) {
/* We might not be able to delete it, but at least
* make it empty. */
rv = ftruncate(lock_fd, 0);
(void)rv;
unlink_lockfile(lock_fd);
}
return rv;
}
static int do_client(void)
{
int rv = -1;
rv = setup_config(CLIENT);
if (rv < 0) {
log_error("cannot read config");
goto out;
}
switch (cl.op) {
case CMD_LIST:
rv = query_get_string_answer(CMD_LIST);
break;
case CMD_GRANT:
rv = do_grant();
break;
case CMD_REVOKE:
rv = do_revoke();
break;
}
out:
return rv;
}
int main(int argc, char *argv[], char *envp[])
{
int rv;
init_set_proc_title(argc, argv, envp);
memset(&cl, 0, sizeof(cl));
strncpy(cl.configfile,
BOOTH_DEFAULT_CONF, BOOTH_PATH_LEN - 1);
cl.lockfile[0] = 0;
debug_level = 0;
cl_log_set_entity("booth");
cl_log_enable_stderr(TRUE);
cl_log_set_facility(0);
rv = read_arguments(argc, argv);
if (rv < 0)
goto out;
switch (cl.type) {
case STATUS:
rv = do_status(cl.type);
break;
case ARBITRATOR:
case DAEMON:
case SITE:
rv = do_server(cl.type);
break;
case CLIENT:
rv = do_client();
break;
}
out:
/* Normalize values. 0x100 would be seen as "OK" by waitpid(). */
return (rv >= 0 && rv < 0x70) ? rv : 1;
}
diff --git a/src/raft.c b/src/raft.c
index be59a62..ba4d7ab 100644
--- a/src/raft.c
+++ b/src/raft.c
@@ -1,881 +1,905 @@
/*
* Copyright (C) 2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <inttypes.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include "booth.h"
#include "transport.h"
#include "inline-fn.h"
#include "config.h"
#include "raft.h"
#include "ticket.h"
#include "log.h"
inline static void clear_election(struct ticket_config *tk)
{
int i;
struct booth_site *site;
- log_debug("%s: clear election", tk->name);
+ tk_log_debug("clear election");
tk->votes_received = 0;
foreach_node(i, site)
tk->votes_for[site->index] = NULL;
}
inline static void record_vote(struct ticket_config *tk,
struct booth_site *who,
struct booth_site *vote)
{
- log_debug("%s: site %s votes for %s",
- tk->name, site_string(who),
+ tk_log_debug("site %s votes for %s",
+ site_string(who),
site_string(vote));
if (!tk->votes_for[who->index]) {
tk->votes_for[who->index] = vote;
tk->votes_received |= who->bitmask;
} else {
if (tk->votes_for[who->index] != vote)
- log_warn("voted previously (but in same term!) for %s...",
- site_string(tk->votes_for[who->index]));
+ tk_log_warn("%s voted previously "
+ "for %s and now wants to vote for %s (ignored)",
+ site_string(who),
+ site_string(tk->votes_for[who->index]),
+ site_string(vote));
}
}
static int cmp_msg_ticket(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg)
{
if (tk->current_term != ntohl(msg->ticket.term)) {
return tk->current_term - ntohl(msg->ticket.term);
}
/* compare commit_index only from the leader */
if (sender == leader) {
return tk->commit_index - ntohl(msg->ticket.leader_commit);
}
return 0;
}
static void update_term_from_msg(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
uint32_t i;
i = ntohl(msg->ticket.term);
/* if we failed to start the election, then accept the term
* from the leader
* */
if (tk->state == ST_CANDIDATE) {
tk->current_term = i;
} else {
tk->current_term = max(i, tk->current_term);
}
/* § 5.3 */
i = ntohl(msg->ticket.leader_commit);
tk->commit_index = max(i, tk->commit_index);
}
static void update_ticket_from_msg(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
int duration;
duration = tk->term_duration;
if (msg)
duration = min(duration, ntohl(msg->ticket.term_valid_for));
tk->term_expires = time(NULL) + duration;
if (msg) {
update_term_from_msg(tk, msg);
}
}
static void become_follower(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
update_ticket_from_msg(tk, msg);
tk->state = ST_FOLLOWER;
tk->delay_grant = 0;
}
struct booth_site *majority_votes(struct ticket_config *tk)
{
int i, n;
struct booth_site *v;
int count[MAX_NODES] = { 0, };
for(i=0; i<booth_conf->site_count; i++) {
v = tk->votes_for[i];
if (!v)
continue;
n = v->index;
count[n]++;
- log_debug("%s: Majority: %d %s wants %d %s => %d",
- tk->name,
+ tk_log_debug("Majority: %d %s wants %d %s => %d",
i, site_string(&booth_conf->site[i]),
n, site_string(v),
count[n]);
if (count[n]*2 <= booth_conf->site_count)
continue;
- log_debug("%s: Majority reached: %d of %d for %s",
- tk->name,
+ tk_log_debug("Majority reached: %d of %d for %s",
count[n], booth_conf->site_count,
site_string(v));
return v;
}
return NULL;
}
static int newer_term(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg,
int in_election)
{
uint32_t term;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (term > tk->current_term) {
tk->state = ST_FOLLOWER;
if (!in_election) {
tk->leader = leader;
- log_debug("%s: from %s: higher term %d vs. %d, following %s",
- tk->name,
+ tk_log_info("from %s: higher term %d vs. %d, following %s",
site_string(sender),
term, tk->current_term,
ticket_leader_string(tk));
} else {
tk->leader = no_leader;
- log_debug("%s: from %s: higher term %d vs. %d (election)",
- tk->name,
+ tk_log_debug("from %s: higher term %d vs. %d (election)",
site_string(sender),
term, tk->current_term);
}
tk->current_term = term;
return 1;
}
return 0;
}
static int term_too_low(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg)
{
uint32_t term;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (term < tk->current_term) {
- log_info("sending reject to %s, its term too low "
- "(%d vs. %d)", site_string(leader),
+ tk_log_info("sending reject to %s, its term too low "
+ "(%d vs. %d)", site_string(sender),
term, tk->current_term
);
send_reject(sender, tk, RLT_TERM_OUTDATED);
return 1;
}
return 0;
}
/* For follower. */
static int answer_HEARTBEAT (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
struct boothc_ticket_msg omsg;
term = ntohl(msg->ticket.term);
- log_debug("%s: leader: %s, have %s; term %d vs %d",
- tk->name,
+ tk_log_debug("leader: %s, have %s; term %d vs %d",
site_string(leader), ticket_leader_string(tk),
term, tk->current_term);
+ /* got heartbeat, no rejects expected anymore */
+ tk->expect_more_rejects = 0;
+
/* if we're candidate, it may be that we got a heartbeat from
* a legitimate leader, so don't ignore a lower term
*/
if (tk->state != ST_CANDIDATE && term < tk->current_term) {
- log_info("ignoring lower term %d vs. %d, from %s",
+ tk_log_info("ignoring lower term %d vs. %d, from %s",
term, tk->current_term,
ticket_leader_string(tk));
return 0;
}
/* Needed? */
newer_term(tk, sender, leader, msg, 0);
become_follower(tk, msg);
/* Racy??? */
assert(sender == leader || !leader);
tk->leader = leader;
/* Ack the heartbeat (we comply). */
init_ticket_msg(&omsg, OP_HEARTBEAT, RLT_SUCCESS, 0, tk);
return booth_udp_send(sender, &omsg, sizeof(omsg));
}
static int process_UPDATE (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
term = ntohl(msg->ticket.term);
- log_debug("%s: leader: %s, have %s; term %d vs %d",
- tk->name,
+ tk_log_debug("leader: %s, have %s; term %d vs %d",
site_string(leader), ticket_leader_string(tk),
term, tk->current_term);
/* No reject. (?) */
if (term < tk->current_term) {
- log_info("ignoring lower term %d vs. %d, from %s",
+ tk_log_info("ignoring lower term %d vs. %d, from %s",
term, tk->current_term,
ticket_leader_string(tk));
return 0;
}
update_ticket_from_msg(tk, msg);
ticket_write(tk);
/* run ticket_cron if the ticket expires */
set_ticket_wakeup(tk);
return 0;
}
static int process_REVOKE (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
if (tk->leader != sender) {
- log_error("from %s: non-leader wants to revoke ticket %s (ignoring)",
- site_string(sender), tk->name);
+ tk_log_error("%s wants to revoke ticket, "
+ "but it is not granted there (ignoring)",
+ site_string(sender));
return 1;
} else if (tk->state != ST_FOLLOWER) {
- log_error("from %s: unexpected ticket %s revoke in state %s (ignoring)",
- site_string(sender),
- state_to_string(tk->state),
- tk->name);
+ tk_log_error("unexpected ticket revoke from %s "
+ "(in state %s) (ignoring)",
+ site_string(sender),
+ state_to_string(tk->state));
return 1;
} else {
- log_info("from %s: leader revokes ticket %s",
- site_string(sender), tk->name);
+ tk_log_info("%s revokes ticket",
+ site_string(tk->leader));
reset_ticket(tk);
ticket_write(tk);
}
return 0;
}
/* is it safe to commit the grant?
* if we didn't hear from all sites on the initial grant, we may
* need to delay the commit
*
* TODO: investigate possibility to devise from history whether a
* missing site could be holding a ticket or not
*/
static int ticket_dangerous(struct ticket_config *tk)
{
if (!tk->delay_grant)
return 0;
if (tk->delay_grant < time(NULL) ||
all_sites_replied(tk)) {
tk->delay_grant = 0;
return 0;
}
return 1;
}
/* update the ticket on the leader, write it to the CIB, and
send out the update message to others with the new expiry
time
*/
int leader_update_ticket(struct ticket_config *tk)
{
struct boothc_ticket_msg msg;
int rv = 0;
if( tk->ticket_updated )
return 0;
tk->ticket_updated = 1;
tk->term_expires = time(NULL) + tk->term_duration;
if (!ticket_dangerous(tk)) {
ticket_write(tk);
init_ticket_msg(&msg, OP_UPDATE, RLT_SUCCESS, 0, tk);
rv = transport()->broadcast(&msg, sizeof(msg));
+ } else {
+ tk_log_info("delaying ticket commit to CIB until %s "
+ "(or all sites are reached)",
+ ctime(&tk->delay_grant));
}
set_ticket_wakeup(tk);
return rv;
}
/* For leader. */
static int process_HEARTBEAT(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
term = ntohl(msg->ticket.term);
if (newer_term(tk, sender, leader, msg, 0)) {
/* unexpected higher term */
- log_warn("got higher term from %s (%d vs. %d)",
+ tk_log_warn("got higher term from %s (%d vs. %d)",
site_string(sender),
term, tk->current_term);
return 0;
}
/* Don't send a reject. */
if (term < tk->current_term) {
/* Doesn't know what he's talking about - perhaps
* doesn't receive our packets? */
- log_warn("from %s: unexpected "
- "term %d instead of %d (ignoring)",
+ tk_log_warn("unexpected term "
+ "from %s (%d vs. %d) (ignoring)",
site_string(sender),
term, tk->current_term);
return 0;
}
if (term == tk->current_term &&
leader == tk->leader) {
if (majority_of_bits(tk, tk->acks_received) &&
!ticket_dangerous(tk)) {
/* OK, at least half of the nodes are reachable;
* Update the ticket and send update messages out
*/
return leader_update_ticket(tk);
}
}
return 0;
}
void leader_elected(
struct ticket_config *tk,
struct booth_site *new_leader
)
{
if (new_leader) {
tk->leader = new_leader;
tk->term_expires = time(NULL) + tk->term_duration;
tk->election_end = 0;
tk->voted_for = NULL;
tk->retry_number = 0;
if (new_leader == local) {
+ tk_log_info("the ticket is granted here");
tk->commit_index++;
tk->state = ST_LEADER;
send_heartbeat(tk);
ticket_activate_timeout(tk);
} else {
+ tk_log_info("ticket granted at %s",
+ site_string(new_leader));
become_follower(tk, NULL);
set_ticket_wakeup(tk);
}
}
}
static int process_VOTE_FOR(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
if (term_too_low(tk, sender, leader, msg))
return 0;
if (newer_term(tk, sender, leader, msg, 0)) {
clear_election(tk);
}
/* leader wants to step down? */
if (leader == no_leader && sender == tk->leader &&
(tk->state == ST_FOLLOWER || tk->state == ST_CANDIDATE)) {
- log_info("ticket %s owner %s wants to step down",
- tk->name, site_string(tk->leader));
+ tk_log_info("%s wants to give the ticket away",
+ site_string(tk->leader));
return new_round(tk, OR_STEPDOWN);
}
record_vote(tk, sender, leader);
if (tk->state != ST_CANDIDATE) {
/* lost candidate status, somebody rejected our proposal */
return 0;
}
/* only if all voted can we take the ticket now, otherwise
* wait for timeout in ticket_cron */
if (!tk->acks_expected) {
/* §5.2 */
leader_elected(tk, majority_votes(tk));
}
return 0;
}
static int process_REJECTED(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t rv;
rv = ntohl(msg->header.result);
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_OUTDATED) {
- log_warn("from %s: ticket %s outdated (term %d), following %s",
- site_string(sender),
- tk->name, ntohl(msg->ticket.term),
+ tk_log_warn("ticket outdated (term %d), granted at %s",
+ ntohl(msg->ticket.term),
site_string(leader)
);
tk->leader = leader;
+ tk->expect_more_rejects = 1;
become_follower(tk, msg);
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_STILL_VALID) {
- log_warn("from %s: there's a leader I didn't see: %s, following",
- site_string(sender),
+ tk_log_warn("ticket was granted at %s "
+ "(and we didn't know)",
site_string(leader));
tk->leader = leader;
+ tk->expect_more_rejects = 1;
become_follower(tk, msg);
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_YOU_OUTDATED) {
- log_warn("from %s: our ticket %s is outdated",
- site_string(sender),
- tk->name);
tk->leader = leader;
+ tk->expect_more_rejects = 1;
if (leader && leader != no_leader) {
+ tk_log_warn("our ticket is outdated, granted at %s",
+ site_string(leader));
become_follower(tk, msg);
} else {
+ tk_log_warn("our ticket is outdated and revoked");
update_ticket_from_msg(tk, msg);
tk->state = ST_INIT;
}
return 0;
}
- log_warn("from %s: in state %s, got %s (unexpected reject)",
- site_string(sender),
- state_to_string(tk->state),
- state_to_string(rv));
+ if (!tk->expect_more_rejects) {
+ tk_log_warn("from %s: in state %s, got %s (unexpected reject)",
+ site_string(sender),
+ state_to_string(tk->state),
+ state_to_string(rv));
+ }
+
return 0;
}
static int send_ticket (
int cmd,
struct ticket_config *tk,
struct booth_site *to_site
)
{
struct boothc_ticket_msg omsg;
+ if (cmd == OP_MY_INDEX) {
+ tk_log_info("sending status to %s",
+ site_string(to_site));
+ }
init_ticket_msg(&omsg, cmd, RLT_SUCCESS, 0, tk);
return booth_udp_send(to_site, &omsg, sizeof(omsg));
}
static int ticket_seems_ok(struct ticket_config *tk)
{
int time_left;
time_left = term_time_left(tk);
if (!time_left)
return 0; /* quite sure */
if (tk->state == ST_CANDIDATE)
return 0; /* in state of flux */
if (tk->state == ST_LEADER)
return 1; /* quite sure */
if (tk->state == ST_FOLLOWER &&
time_left >= tk->term_duration/3)
return 1; /* almost quite sure */
return 0;
}
static int test_reason(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int reason;
reason = ntohl(msg->header.reason);
if (reason == OR_TKT_LOST) {
if (tk->state == ST_INIT) {
- log_warn("%s claims that the ticket %s is lost, but it's in %s state",
- site_string(sender), tk->name,
+ tk_log_warn("%s claims that the ticket is lost, "
+ "but it's in %s state (reject sent)",
+ site_string(sender),
state_to_string(tk->state)
);
return RLT_YOU_OUTDATED;
}
if (ticket_seems_ok(tk)) {
- log_warn("%s claims that the ticket %s is lost, but it seems ok here",
- site_string(sender), tk->name
- );
+ tk_log_warn("%s claims that the ticket is lost, "
+ "but it is ok here (reject sent)",
+ site_string(sender));
return RLT_TERM_STILL_VALID;
}
}
return 0;
}
/* §5.2 */
static int answer_REQ_VOTE(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
int valid;
struct boothc_ticket_msg omsg;
cmd_result_t inappr_reason;
inappr_reason = test_reason(tk, sender, leader, msg);
if (inappr_reason)
return send_reject(sender, tk, inappr_reason);
term = ntohl(msg->ticket.term);
/* Important: Ignore duplicated packets! */
valid = term_time_left(tk);
if (valid &&
term == tk->current_term &&
sender == tk->leader) {
- log_debug("%s: Duplicate OP_VOTE_FOR ignored.",
- tk->name);
+ tk_log_debug("Duplicate OP_VOTE_FOR ignored.");
return 0;
}
if (valid) {
- log_warn("no election allowed for %s, term still valid for %d",
- tk->name, valid);
+ tk_log_warn("election rejected, term still valid for %ds", valid);
return send_reject(sender, tk, RLT_TERM_STILL_VALID);
}
if (term_too_low(tk, sender, leader, msg))
return 0;
/* if it's a newer term or ... */
if (newer_term(tk, sender, leader, msg, 1)) {
clear_election(tk);
goto vote_for_sender;
}
/* ... we didn't vote yet, then vote for the sender */
/* §5.2, §5.4 */
if (!tk->voted_for) {
vote_for_sender:
tk->voted_for = sender;
record_vote(tk, sender, leader);
}
init_ticket_msg(&omsg, OP_VOTE_FOR, RLT_SUCCESS, 0, tk);
omsg.ticket.leader = htonl(get_node_id(tk->voted_for));
return booth_udp_send(sender, &omsg, sizeof(omsg));
}
int new_election(struct ticket_config *tk,
struct booth_site *preference, int update_term, cmd_reason_t reason)
{
struct booth_site *new_leader;
time_t now;
static cmd_reason_t last_reason;
time(&now);
- log_debug("%s: start new election?, now=%" PRIi64 ", end %" PRIi64,
- tk->name,
+ tk_log_debug("start new election?, now=%" PRIi64 ", end %" PRIi64,
(int64_t)now, (int64_t)(tk->election_end));
if (now <= tk->election_end)
return 0;
/* §5.2 */
/* If there was _no_ answer, don't keep incrementing the term number
* indefinitely. If there was no peer, there'll probably be no one
* listening now either. However, we don't know if we were
* invoked due to a timeout (caller does).
*/
if (update_term)
tk->current_term++;
tk->term_expires = 0;
tk->election_end = now + tk->timeout;
- log_info("%s: starting new election, term=%d, until %" PRIi64,
- tk->name, tk->current_term, (int64_t)tk->election_end);
+ tk_log_info("starting new election (term=%d, until %s)",
+ tk->current_term, ctime(&tk->election_end));
clear_election(tk);
if(preference)
new_leader = preference;
else
new_leader = (local->type == SITE) ? local : NULL;
record_vote(tk, local, new_leader);
tk->voted_for = new_leader;
tk->state = ST_CANDIDATE;
/* some callers may want just to repeat on timeout */
if (reason == OR_AGAIN) {
reason = last_reason;
} else {
last_reason = reason;
}
expect_replies(tk, OP_VOTE_FOR);
ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS, reason);
ticket_activate_timeout(tk);
return 0;
}
/* we were a leader and somebody says that they have a more up
* to date ticket
* there was probably connectivity loss
* tricky
*/
static int leader_handle_newer_ticket(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
if (leader == no_leader || !leader || leader == local) {
/* at least nobody else owns the ticket */
/* it is not kosher to update from their copy, but since
* they don't own the ticket, nothing bad can happen
*/
update_term_from_msg(tk, msg);
/* get the ticket again, if we can
*/
+ tk_log_info("trying to reclaim the ticket");
return acquire_ticket(tk, OR_REACQUIRE);
}
/* eek, two leaders, split brain */
/* normally shouldn't happen; run election */
- log_error("from %s: ticket %s at %s! (disowning ticket)",
+ tk_log_error("from %s: ticket granted at %s! (revoking locally)",
site_string(sender),
- tk->name, site_string(leader)
+ site_string(leader)
);
- log_error("Two ticket owners! Possible bug. Please report at https://github.com/ClusterLabs/booth/issues/new.");
return new_round(tk, OR_SPLIT);
}
/* reply to STATUS */
static int process_MY_INDEX (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int i;
int rv;
if (!msg->ticket.term_valid_for) {
/* ticket not valid */
return 0;
}
i = cmp_msg_ticket(tk, sender, leader, msg);
if (i > 0) {
/* let them know about our newer ticket */
send_ticket(OP_MY_INDEX, tk, sender);
- if (tk->state == ST_LEADER)
+ if (tk->state == ST_LEADER) {
+ tk_log_info("sending update to %s",
+ site_string(sender));
return send_ticket(OP_UPDATE, tk, sender);
+ }
}
/* they have a newer ticket, trouble if we're already leader
* for it */
if (i < 0 && tk->state == ST_LEADER) {
- log_warn("from %s: more uptodate ticket %s at %s",
+ tk_log_warn("from %s: more up to date ticket at %s",
site_string(sender),
- tk->name,
site_string(leader)
);
return leader_handle_newer_ticket(tk, sender, leader, msg);
}
update_ticket_from_msg(tk, msg);
tk->leader = leader;
if (leader == local) {
rv = test_external_prog(tk, 1);
if (!rv) {
/* if we were the leader but we rebooted in the
* meantime; try to get the ticket again
*/
tk->state = ST_LEADER;
tk->retry_number = 0;
+ tk_log_info("trying to reclaim the ticket");
rv = send_heartbeat(tk);
ticket_activate_timeout(tk);
}
return rv;
} else {
- tk->state = (!leader || leader == no_leader) ?
- ST_INIT : ST_FOLLOWER;
+ if (!leader || leader == no_leader) {
+ tk_log_info("ticket is not granted");
+ tk->state = ST_INIT;
+ } else {
+ tk_log_info("ticket granted at %s (says %s)",
+ site_string(leader),
+ site_string(sender));
+ tk->state = ST_FOLLOWER;
+ }
set_ticket_wakeup(tk);
}
return 0;
}
int raft_answer(
struct ticket_config *tk,
struct booth_site *from,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int cmd;
int rv;
rv = 0;
cmd = ntohl(msg->header.cmd);
R(tk);
- log_debug("%s: got message %s from %s",
- tk->name,
+ tk_log_debug("got message %s from %s",
state_to_string(cmd),
site_string(from));
switch (cmd) {
case OP_REQ_VOTE:
rv = answer_REQ_VOTE(tk, from, leader, msg);
break;
case OP_VOTE_FOR:
rv = process_VOTE_FOR(tk, from, leader, msg);
break;
case OP_HEARTBEAT:
if (tk->leader == local &&
tk->state == ST_LEADER)
rv = process_HEARTBEAT(tk, from, leader, msg);
else if (tk->leader != local &&
(tk->state == ST_INIT ||tk->state == ST_FOLLOWER ||
tk->state == ST_CANDIDATE))
rv = answer_HEARTBEAT(tk, from, leader, msg);
else {
- log_warn("unexpected message %s, from %s",
+ tk_log_warn("unexpected message %s, from %s",
state_to_string(cmd),
site_string(from));
rv = -EINVAL;
}
break;
case OP_UPDATE:
if (tk->leader != local && tk->state == ST_FOLLOWER) {
rv = process_UPDATE(tk, from, leader, msg);
} else {
- log_warn("unexpected message %s, from %s",
+ tk_log_warn("unexpected message %s, from %s",
state_to_string(cmd),
site_string(from));
rv = -EINVAL;
}
break;
case OP_REJECTED:
rv = process_REJECTED(tk, from, leader, msg);
break;
case OP_REVOKE:
rv = process_REVOKE(tk, from, leader, msg);
break;
case OP_MY_INDEX:
rv = process_MY_INDEX(tk, from, leader, msg);
break;
case OP_STATUS:
rv = send_ticket(OP_MY_INDEX, tk, from);
break;
default:
- log_error("unknown message %s, from %s",
+ tk_log_error("unknown message %s, from %s",
state_to_string(cmd), site_string(from));
rv = -EINVAL;
}
R(tk);
return rv;
}
diff --git a/src/ticket.c b/src/ticket.c
index 0a643e6..5e16d08 100644
--- a/src/ticket.c
+++ b/src/ticket.c
@@ -1,707 +1,728 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <inttypes.h>
#include <stdio.h>
#include <assert.h>
#include <time.h>
#include <clplumbing/cl_random.h>
#include "ticket.h"
#include "config.h"
#include "pacemaker.h"
#include "inline-fn.h"
#include "log.h"
#include "booth.h"
#include "raft.h"
#include "handler.h"
#define TK_LINE 256
/* Untrusted input, must fit (incl. \0) in a buffer of max chars. */
int check_max_len_valid(const char *s, int max)
{
int i;
for(i=0; i<max; i++)
if (s[i] == 0)
return 1;
return 0;
}
int find_ticket_by_name(const char *ticket, struct ticket_config **found)
{
int i;
if (found)
*found = NULL;
for (i = 0; i < booth_conf->ticket_count; i++) {
if (!strcmp(booth_conf->ticket[i].name, ticket)) {
if (found)
*found = booth_conf->ticket + i;
return 1;
}
}
return 0;
}
int check_ticket(char *ticket, struct ticket_config **found)
{
if (found)
*found = NULL;
if (!booth_conf)
return 0;
if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name)))
return 0;
return find_ticket_by_name(ticket, found);
}
int check_site(char *site, int *is_local)
{
struct booth_site *node;
if (!check_max_len_valid(site, sizeof(node->addr_string)))
return 0;
if (find_site_by_name(site, &node, 0)) {
*is_local = node->local;
return 1;
}
return 0;
}
int ticket_write(struct ticket_config *tk)
{
if (local->type != SITE)
return -EINVAL;
disown_if_expired(tk);
if (tk->leader == local) {
pcmk_handler.grant_ticket(tk);
} else {
pcmk_handler.revoke_ticket(tk);
}
return 0;
}
/* Ask an external program whether getting the ticket
* makes sense.
* Eg. if the services have a failcount of INFINITY,
* we can't serve here anyway. */
int test_external_prog(struct ticket_config *tk,
int start_election)
{
int rv;
rv = run_handler(tk, tk->ext_verifier, 1);
if (rv) {
- log_warn("we are not allowed to acquire ticket %s",
- tk->name);
+ tk_log_warn("we are not allowed to acquire ticket");
/* Give it to somebody else.
* Just send a VOTE_FOR message, so the
* others can start elections. */
if (leader_and_valid(tk)) {
disown_ticket(tk);
if (start_election) {
ticket_broadcast(tk, OP_VOTE_FOR, RLT_SUCCESS, OR_LOCAL_FAIL);
}
}
}
return rv;
}
/* Try to acquire a ticket
* Could be manual grant or after ticket loss
*/
int acquire_ticket(struct ticket_config *tk, cmd_reason_t reason)
{
if (test_external_prog(tk, 0))
return RLT_EXT_FAILED;
return new_election(tk, local, 1, reason);
}
/** Try to get the ticket for the local site.
* */
int do_grant_ticket(struct ticket_config *tk, int options)
{
int rv;
+ tk_log_info("granting ticket");
+
if (tk->leader == local)
return RLT_SUCCESS;
if (is_owned(tk))
return RLT_OVERGRANT;
tk->delay_grant = time(NULL) +
tk->term_duration + tk->acquire_after;
if (options & OPT_IMMEDIATE) {
- log_warn("Grant ticket %s immediately! If there are "
+ tk_log_warn("granting ticket immediately! If there are "
"unreachable sites, _hope_ you are sure that they don't "
- "have the ticket!",
- tk->name);
+ "have the ticket!");
tk->delay_grant = 0;
}
rv = acquire_ticket(tk, OR_ADMIN);
if (rv)
tk->delay_grant = 0;
return rv;
}
/** Ticket revoke.
* Only to be started from the leader. */
int do_revoke_ticket(struct ticket_config *tk)
{
- log_info("revoking ticket %s", tk->name);
+ tk_log_info("revoking ticket");
reset_ticket(tk);
ticket_write(tk);
return ticket_broadcast(tk, OP_REVOKE, RLT_SUCCESS, OR_ADMIN);
}
int list_ticket(char **pdata, unsigned int *len)
{
struct ticket_config *tk;
char timeout_str[64];
char pending_str[64];
char *data, *cp;
int i, alloc;
*pdata = NULL;
*len = 0;
alloc = 256 +
booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128);
data = malloc(alloc);
if (!data)
return -ENOMEM;
cp = data;
foreach_ticket(i, tk) {
if (tk->term_expires != 0)
strftime(timeout_str, sizeof(timeout_str), "%F %T",
localtime(&tk->term_expires));
else
strcpy(timeout_str, "INF");
if (tk->leader == local && tk->delay_grant > time(NULL)) {
strcpy(pending_str, " (pending until ");
strftime(pending_str + strlen(" (pending until "),
sizeof(pending_str) - strlen(" (pending until ") - 1,
"%F %T", localtime(&tk->delay_grant));
strcat(pending_str, ")");
} else
*pending_str = '\0';
cp += snprintf(cp,
alloc - (cp - data),
"ticket: %s, leader: %s, expires: %s, commit: %d%s\n",
tk->name,
ticket_leader_string(tk),
timeout_str,
tk->commit_index,
pending_str);
if (alloc - (cp - data) <= 0)
return -ENOMEM;
}
*pdata = data;
*len = cp - data;
return 0;
}
void reset_ticket(struct ticket_config *tk)
{
disown_ticket(tk);
tk->state = ST_INIT;
tk->voted_for = NULL;
}
int setup_ticket(void)
{
struct ticket_config *tk;
int i;
foreach_ticket(i, tk) {
reset_ticket(tk);
if (local->type == SITE) {
pcmk_handler.load_ticket(tk);
if (time(NULL) >= tk->term_expires) {
reset_ticket(tk);
ticket_write(tk);
}
}
/* if the ticket is uptodate and belongs to us, try with
* the heartbeat
*/
if (tk->is_granted && tk->leader == local) {
if (!test_external_prog(tk, 1)) {
tk->state = ST_LEADER;
send_heartbeat(tk);
ticket_activate_timeout(tk);
}
} else {
/* otherwise, query status */
+ tk_log_info("broadcasting state query");
ticket_broadcast(tk, OP_STATUS, RLT_SUCCESS, 0);
}
}
return 0;
}
int ticket_answer_list(int fd, struct boothc_ticket_msg *msg)
{
char *data;
int olen, rv;
struct boothc_header hdr;
rv = list_ticket(&data, &olen);
if (rv < 0)
return rv;
init_header(&hdr, CMR_LIST, 0, RLT_SUCCESS, 0, sizeof(hdr) + olen);
return send_header_plus(fd, &hdr, data, olen);
}
int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg)
{
int rv;
struct ticket_config *tk;
if (!check_ticket(msg->ticket.id, &tk)) {
log_warn("client asked to grant unknown ticket %s",
msg->ticket.id);
rv = RLT_INVALID_ARG;
goto reply;
}
if (is_owned(tk)) {
log_warn("client wants to grant an (already granted!) ticket %s",
msg->ticket.id);
rv = RLT_OVERGRANT;
goto reply;
}
rv = do_grant_ticket(tk, ntohl(msg->header.options));
reply:
init_header(&msg->header, CMR_GRANT, 0, rv ?: RLT_ASYNC, 0, sizeof(*msg));
return send_ticket_msg(fd, msg);
}
int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg)
{
int rv;
struct ticket_config *tk;
if (!check_ticket(msg->ticket.id, &tk)) {
log_warn("client wants to revoke an unknown ticket %s",
msg->ticket.id);
rv = RLT_INVALID_ARG;
goto reply;
}
if (!is_owned(tk)) {
log_info("client wants to revoke a free ticket %s",
msg->ticket.id);
- /* Return a different result code? */
- rv = RLT_SUCCESS;
+ rv = RLT_TICKET_IDLE;
goto reply;
}
if (tk->leader != local) {
- log_info("we do not own the ticket %s, "
- "redirect to leader %s",
+ log_info("the ticket %s is not granted here, "
+ "redirect to %s",
msg->ticket.id, ticket_leader_string(tk));
rv = RLT_REDIRECT;
goto reply;
}
rv = do_revoke_ticket(tk);
if (rv == 0)
rv = RLT_ASYNC;
reply:
init_ticket_msg(msg, CMR_REVOKE, rv, 0, tk);
return send_ticket_msg(fd, msg);
}
int ticket_broadcast(struct ticket_config *tk,
cmd_request_t cmd, cmd_result_t res, cmd_reason_t reason)
{
struct boothc_ticket_msg msg;
init_ticket_msg(&msg, cmd, res, reason, tk);
- log_debug("broadcasting '%s' for %s (term=%d, valid=%d)",
- state_to_string(cmd), tk->name,
+ tk_log_debug("broadcasting '%s' (term=%d, valid=%d)",
+ state_to_string(cmd),
ntohl(msg.ticket.term),
ntohl(msg.ticket.term_valid_for));
return transport()->broadcast(&msg, sizeof(msg));
}
int new_round(struct ticket_config *tk, cmd_reason_t reason)
{
int rv = 0;
struct timespec delay;
disown_ticket(tk);
/* New vote round; §5.2 */
if (local->type == SITE) {
/* delay the next election start for up to 200ms */
delay.tv_sec = 0;
delay.tv_nsec = 1000000L * (long)cl_rand_from_interval(0, 200);
nanosleep(&delay, NULL);
rv = new_election(tk, NULL, 1, reason);
ticket_write(tk);
}
return rv;
}
+static void log_lost_servers(struct ticket_config *tk)
+{
+ struct booth_site *n;
+ int i;
+
+ for (i = 0; i < booth_conf->site_count; i++) {
+ n = booth_conf->site + i;
+ if (!(tk->acks_received & n->bitmask)) {
+ tk_log_warn("%s %s didn't acknowledge our request, "
+ "will retry %d times",
+ (n->type == ARBITRATOR ? "arbitrator" : "site"),
+ site_string(n),
+ tk->retries);
+ }
+ }
+}
+
static void ticket_cron(struct ticket_config *tk)
{
time_t now;
int ack_cnt;
struct booth_site *new_leader;
now = time(NULL);
R(tk);
/* Has an owner, has an expiry date, and expiry date in the past?
* Losing the ticket must happen in _every_ state. */
if (tk->term_expires &&
is_owned(tk) &&
now >= tk->term_expires) {
- log_warn("LOST ticket: %s no longer at %s",
- tk->name,
- ticket_leader_string(tk));
+ if (tk->leader != local) {
+ tk_log_warn("lost at %s", site_string(tk->leader));
+ } else {
+ tk_log_warn("lost majority (revoking locally)");
+ }
/* Couldn't renew in time - ticket lost. */
new_round(tk, OR_TKT_LOST);
return;
}
R(tk);
switch(tk->state) {
case ST_INIT:
/* init state, nothing to do */
break;
case ST_FOLLOWER:
/* nothing here either, ticket loss is caught earlier
* */
break;
case ST_CANDIDATE:
/* §5.2 */
/* not everybody answered, but if we have majority... */
new_leader = majority_votes(tk);
if (new_leader) {
leader_elected(tk, new_leader);
} else if (now > tk->election_end) {
/* This is previous election timed out */
new_election(tk, NULL, 0, OR_AGAIN);
}
break;
case ST_LEADER:
/* we get here after we broadcasted a heartbeat;
* by this time all sites should've acked the heartbeat
*/
if (tk->acks_expected) {
tk->retry_number ++;
if (!majority_of_bits(tk, tk->acks_received)) {
ack_cnt = count_bits(tk->acks_received) - 1;
if (!ack_cnt) {
- log_warn("no answers to heartbeat for ticket %s on try #%d, "
+ tk_log_warn("no answers to heartbeat (try #%d), "
"we are alone",
- tk->name,
tk->retry_number);
} else {
- log_warn("not enough answers to heartbeat for ticket %s on try #%d: "
+ tk_log_warn("not enough answers to heartbeat (try #%d): "
"only got %d answers",
- tk->name,
tk->retry_number,
ack_cnt);
}
/* Don't give up, though - there's still some time until leadership is lost. */
} else {
+ if (tk->retry_number <= 1) {
+ /* log those that we couldn't reach, but do
+ * that only on the first retry
+ */
+ log_lost_servers(tk);
+ }
/* we have the majority, update the ticket, at
* least the local copy if we're still not
* allowed to commit
*/
leader_update_ticket(tk);
}
} else {
/* this is ticket renewal, check what the
* external test says */
if (test_external_prog(tk, 1))
return;
}
send_heartbeat(tk);
if (tk->retry_number < tk->retries) {
ticket_activate_timeout(tk);
} else {
+ tk->retry_number = 0;
set_ticket_wakeup(tk);
}
break;
default:
break;
}
R(tk);
}
void process_tickets(void)
{
struct ticket_config *tk;
int i;
struct timeval now;
float sec_until;
gettimeofday(&now, NULL);
foreach_ticket(i, tk) {
sec_until = timeval_to_float(tk->next_cron) - timeval_to_float(now);
if (0)
- log_debug("%s next cron %" PRIx64 ".%03d, "
+ tk_log_debug("next cron %" PRIx64 ".%03d, "
"now %" PRIx64 "%03d, in %f",
- tk->name,
(uint64_t)tk->next_cron.tv_sec, timeval_msec(tk->next_cron),
(uint64_t)now.tv_sec, timeval_msec(now),
sec_until);
if (sec_until > 0.0)
continue;
- log_debug("ticket cron: doing %s", tk->name);
+ tk_log_debug("ticket cron");
/* Set next value, handler may override.
* This should already be handled via the state logic;
* but to be on the safe side the renew repetition is
* duplicated here, too. */
set_ticket_wakeup(tk);
ticket_cron(tk);
}
}
void tickets_log_info(void)
{
struct ticket_config *tk;
int i;
foreach_ticket(i, tk) {
- log_info("Ticket %s: state '%s' "
+ tk_log_info("state '%s' "
"commit index %d "
"leader %s "
"expires %-24.24s",
- tk->name,
state_to_string(tk->state),
tk->commit_index,
ticket_leader_string(tk),
ctime(&tk->term_expires));
}
}
static void update_acks(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t cmd;
cmd = ntohl(msg->header.cmd);
if (tk->acks_expected != cmd)
return;
/* got an ack! */
tk->acks_received |= sender->bitmask;
- log_debug("%s: got ACK from %s, %d/%d agree.",
- tk->name,
+ tk_log_debug("got ACK from %s, %d/%d agree.",
site_string(sender),
count_bits(tk->acks_received),
booth_conf->site_count);
if (all_replied(tk)) {
tk->acks_expected = 0;
}
}
/* UDP message receiver. */
int message_recv(struct boothc_ticket_msg *msg, int msglen)
{
uint32_t from;
struct booth_site *source;
struct ticket_config *tk;
struct booth_site *leader;
uint32_t leader_u;
if (check_boothc_header(&msg->header, sizeof(*msg)) < 0 ||
msglen != sizeof(*msg)) {
log_error("message receive error");
return -1;
}
from = ntohl(msg->header.from);
if (!find_site_by_id(from, &source) || !source) {
log_error("unknown sender: %08x", from);
return -1;
}
if (!check_ticket(msg->ticket.id, &tk)) {
log_warn("got invalid ticket name %s from %s",
msg->ticket.id, site_string(source));
return -EINVAL;
}
leader_u = ntohl(msg->ticket.leader);
if (!find_site_by_id(leader_u, &leader)) {
- log_error("Message with unknown owner %u received", leader_u);
+ tk_log_error("message with unknown leader %u received", leader_u);
return -EINVAL;
}
update_acks(tk, source, leader, msg);
return raft_answer(tk, source, leader, msg);
}
void set_ticket_wakeup(struct ticket_config *tk)
{
struct timeval tv, now;
/* At least every hour, perhaps sooner. */
ticket_next_cron_in(tk, 3600);
switch (tk->state) {
case ST_LEADER:
assert(tk->leader == local);
gettimeofday(&now, NULL);
tv = now;
tv.tv_sec = next_vote_starts_at(tk);
/* If timestamp is in the past, look again in one second. */
if (timeval_compare(tv, now) <= 0)
tv.tv_sec = now.tv_sec + 1;
ticket_next_cron_at(tk, tv);
break;
case ST_CANDIDATE:
assert(tk->election_end);
ticket_next_cron_at_coarse(tk, tk->election_end);
break;
case ST_INIT:
case ST_FOLLOWER:
/* If there is (or should be) some owner, check on it later on.
* If no one is interested - don't care. */
if (is_owned(tk) &&
(local->type == SITE))
ticket_next_cron_at_coarse(tk,
tk->term_expires + tk->acquire_after);
break;
default:
- log_error("unknown ticket state: %d", tk->state);
+ tk_log_error("unknown ticket state: %d", tk->state);
}
}
/* Given a state (in host byte order), return a human-readable (char*).
* An array is used so that multiple states can be printed in a single printf(). */
char *state_to_string(uint32_t state_ho)
{
union mu { cmd_request_t s; char c[5]; };
static union mu cache[6] = { { 0 } }, *cur;
static int current = 0;
current ++;
if (current >= sizeof(cache)/sizeof(cache[0]))
current = 0;
cur = cache + current;
cur->s = htonl(state_ho);
/* Shouldn't be necessary, union array is initialized with zeroes, and
* these bytes never get written. */
cur->c[4] = 0;
return cur->c;
}
int send_reject(struct booth_site *dest, struct ticket_config *tk, cmd_result_t code)
{
struct boothc_ticket_msg msg;
init_ticket_msg(&msg, OP_REJECTED, code, 0, tk);
return booth_udp_send(dest, &msg, sizeof(msg));
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Jan 25, 7:03 AM (1 d, 16 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1321405
Default Alt Text
(86 KB)
Attached To
Mode
rB Booth
Attached
Detach File
Event Timeline
Log In to Comment