Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/configure.ac b/configure.ac
index d7d4c33..add6b92 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1,433 +1,443 @@
# -*- Autoconf -*-
# Process this file with autoconf to produce a configure script.
# bootstrap / init
AC_PREREQ([2.61])
AC_INIT([booth], [0.2.0], [pacemaker@oss.clusterlabs.org])
AM_INIT_AUTOMAKE([-Wno-portability])
AC_CONFIG_SRCDIR([src/main.c])
AC_CONFIG_HEADER([src/b_config.h src/booth_config.h])
AC_CANONICAL_HOST
AC_LANG([C])
AC_SUBST(WITH_LIST, [""])
dnl Fix default variables - "prefix" variable if not specified
if test "$prefix" = "NONE"; then
prefix="/usr"
dnl Fix "localstatedir" variable if not specified
if test "$localstatedir" = "\${prefix}/var"; then
localstatedir="/var"
fi
dnl Fix "sysconfdir" variable if not specified
if test "$sysconfdir" = "\${prefix}/etc"; then
sysconfdir="/etc"
fi
dnl Fix "libdir" variable if not specified
if test "$libdir" = "\${exec_prefix}/lib"; then
if test -e /usr/lib64; then
libdir="/usr/lib64"
else
libdir="/usr/lib"
fi
fi
fi
if test "$srcdir" = "."; then
AC_MSG_NOTICE([building in place srcdir:$srcdir])
AC_DEFINE([BUILDING_IN_PLACE], 1, [building in place])
else
AC_MSG_NOTICE([building out of tree srcdir:$srcdir])
fi
# Checks for programs.
# check stolen from gnulib/m4/gnu-make.m4
if ! ${MAKE-make} --version /cannot/make/this >/dev/null 2>&1; then
AC_MSG_ERROR([you don't seem to have GNU make; it is required])
fi
AC_PROG_CC
AC_PROG_INSTALL
AC_PROG_LN_S
AC_PROG_MAKE_SET
AC_PROG_RANLIB
AC_PATH_PROGS(PKGCONFIG, pkg-config)
AC_PATH_PROGS(ASCIIDOC, asciidoc)
AM_CONDITIONAL(BUILD_ASCIIDOC, test x"${ASCIIDOC}" != x"")
# Checks for libraries.
AC_CHECK_LIB([socket], [socket])
AC_CHECK_LIB([nsl], [t_open])
AC_CHECK_LIB([gpl], [cl_log])
PKG_CHECK_MODULES(GLIB, [glib-2.0])
# Checks for header files.
AC_FUNC_ALLOCA
AC_HEADER_DIRENT
AC_HEADER_STDC
AC_HEADER_SYS_WAIT
AC_CHECK_HEADERS([arpa/inet.h fcntl.h limits.h netdb.h netinet/in.h stdint.h \
stdlib.h string.h sys/ioctl.h sys/param.h sys/socket.h \
sys/time.h syslog.h unistd.h sys/types.h getopt.h malloc.h \
sys/sockio.h utmpx.h])
AC_CHECK_HEADERS(heartbeat/glue_config.h)
# Checks for typedefs, structures, and compiler characteristics.
AC_C_CONST
AC_TYPE_UID_T
AC_C_INLINE
AC_TYPE_INT16_T
AC_TYPE_INT32_T
AC_TYPE_INT64_T
AC_TYPE_INT8_T
AC_TYPE_SIZE_T
AC_TYPE_SSIZE_T
AC_HEADER_TIME
AC_TYPE_UINT16_T
AC_TYPE_UINT32_T
AC_TYPE_UINT64_T
AC_TYPE_UINT8_T
AC_C_VOLATILE
# Checks for library functions.
AC_FUNC_CLOSEDIR_VOID
AC_FUNC_ERROR_AT_LINE
AC_REPLACE_FNMATCH
AC_FUNC_FORK
AC_PROG_GCC_TRADITIONAL
AC_FUNC_MALLOC
AC_FUNC_MEMCMP
AC_FUNC_REALLOC
AC_FUNC_SELECT_ARGTYPES
AC_TYPE_SIGNAL
AC_FUNC_VPRINTF
AC_CHECK_FUNCS([alarm alphasort atexit bzero dup2 endgrent endpwent fcntl \
getcwd getpeerucred getpeereid gettimeofday inet_ntoa memmove \
memset mkdir scandir select socket strcasecmp strchr strdup \
strerror strrchr strspn strstr \
sched_get_priority_max sched_setscheduler])
AC_CONFIG_FILES([Makefile
src/Makefile
docs/Makefile])
# ===============================================
# Helpers
# ===============================================
## helper for CC stuff
cc_supports_flag() {
local CFLAGS="-Werror $@"
AC_MSG_CHECKING(whether $CC supports "$@")
AC_COMPILE_IFELSE([AC_LANG_SOURCE([[int main(){return 0;}]])],
[RC=0; AC_MSG_RESULT(yes)],[RC=1; AC_MSG_RESULT(no)])
return $RC
}
## cleanup
AC_MSG_NOTICE(Sanitizing prefix: ${prefix})
case $prefix in
NONE) prefix=/usr/local;;
esac
AC_MSG_NOTICE(Sanitizing exec_prefix: ${exec_prefix})
case $exec_prefix in
dnl For consistency with Corosync, map NONE->$prefix
NONE) exec_prefix=$prefix;;
prefix) exec_prefix=$prefix;;
esac
## local defines
PACKAGE_FEATURES=""
LINT_FLAGS="-weak -unrecog +posixlib +ignoresigns -fcnuse \
-badflag -D__gnuc_va_list=va_list -D__attribute\(x\)="
# local options
AC_ARG_ENABLE([fatal-warnings],
[ --enable-fatal-warnings : enable fatal warnings. ],
[ default="no" ])
AC_ARG_ENABLE([debug],
[ --enable-debug : enable debug build. ],
[ default="no" ])
AC_ARG_ENABLE([user-flags],
[ --enable-user-flags : rely on user environment. ],
[ default="no" ])
AC_ARG_ENABLE([coverage],
[ --enable-coverage : coverage analysis of the codebase. ],
[ default="no" ])
AC_ARG_ENABLE([small-memory-footprint],
[ --enable-small-memory-footprint : Use small message queues and small messages sizes. ],
[ default="no" ])
AC_ARG_WITH([initddir],
[ --with-initddir=DIR : path to init script directory. ],
[ INITDDIR="$withval" ],
[ INITDDIR="$sysconfdir/init.d" ])
AC_ARG_ENABLE([resource-monitor],
[ --enable-resource-monitor : Enabling Resource Monitor ],
[ default="no" ])
# OS detection
# THIS SECTION MUST DIE!
CP=cp
OS_LDL="-ldl"
have_linux="no"
case "$host_os" in
*linux*)
AC_DEFINE_UNQUOTED([BOOTH_LINUX], [1],
[Compiling for Linux platform])
OS_CFLAGS=""
OS_CPPFLAGS="-D_GNU_SOURCE"
OS_LDFLAGS=""
OS_DYFLAGS="-rdynamic"
DARWIN_OPTS=""
have_linux="yes"
;;
darwin*)
AC_DEFINE_UNQUOTED([BOOTH_DARWIN], [1],
[Compiling for Darwin platform])
CP=rsync
OS_CFLAGS=""
OS_CPPFLAGS=""
OS_LDFLAGS=""
OS_DYFLAGS=""
DARWIN_OPTS="-dynamiclib -bind_at_load \
-current_version ${SONAME} \
-compatibility_version ${SONAME} -install_name \$(libdir)/\$(@)"
AC_DEFINE_UNQUOTED([MAP_ANONYMOUS], [MAP_ANON],
[Shared memory define for Darwin platform])
AC_DEFINE_UNQUOTED([PATH_MAX], [4096],
[Number of chars in a path name including nul])
AC_DEFINE_UNQUOTED([NAME_MAX], [255],
[Number of chars in a file name])
;;
*bsd*)
AC_DEFINE_UNQUOTED([BOOTH_BSD], [1],
[Compiling for BSD platform])
AC_DEFINE_UNQUOTED([MAP_ANONYMOUS], [MAP_ANON],
[Shared memory define for Darwin platform])
OS_CFLAGS=""
OS_CPPFLAGS="-I/usr/local/include"
OS_LDFLAGS="-L/usr/local/lib"
OS_DYFLAGS="-export-dynamic"
DARWIN_OPTS=""
OS_LDL=""
case "$host_os" in
*freebsd[[234567]]*)
;;
*freebsd*)
AC_DEFINE_UNQUOTED([BOOTH_FREEBSD_GE_8], [1],
[Compiling for FreeBSD >= 8 platform])
;;
esac
;;
*solaris*)
AC_DEFINE_UNQUOTED([BOOTH_SOLARIS], [1],
[Compiling for Solaris platform])
AC_DEFINE_UNQUOTED([TS_CLASS], [1],
[Prevent being scheduled RR])
AC_DEFINE_UNQUOTED([_SEM_SEMUN_UNDEFINED], [1],
[The semun structure is undefined])
CP=rsync
OS_CFLAGS=""
OS_CPPFLAGS="-D_REENTRANT"
OS_LDFLAGS=""
OS_DYFLAGS="-Wl,-z,lazyload"
DARWIN_OPTS=""
SOLARIS_OPTS=" "
;;
*)
AC_MSG_ERROR([Unsupported OS? hmmmm])
;;
esac
AC_SUBST(CP)
# *FLAGS handling goes here
ENV_CFLAGS="$CFLAGS"
ENV_CPPFLAGS="$CPPFLAGS"
ENV_LDFLAGS="$LDFLAGS"
# debug build stuff
if test "x${enable_debug}" = xyes; then
AC_DEFINE_UNQUOTED([DEBUG], [1], [Compiling Debugging code])
OPT_CFLAGS="-O0"
PACKAGE_FEATURES="$PACKAGE_FEATURES debug"
else
OPT_CFLAGS="-O3"
fi
# gdb flags
if test "x${GCC}" = xyes; then
GDB_FLAGS="-ggdb3"
else
GDB_FLAGS="-g"
fi
+dnl Check for POSIX clock_gettime
+dnl
+AC_CACHE_CHECK([have clock_gettime],ac_cv_HAVE_CLOCK_GETTIME,[
+AC_TRY_COMPILE([
+#include <time.h>
+],
+[ struct timespec tv; clock_gettime(CLOCK_REALTIME, &tv); return 0;],
+ac_cv_HAVE_CLOCK_GETTIME=yes,ac_cv_HAVE_CLOCK_GETTIME=no,ac_cv_HAVE_CLOCK_GETTIME=cross)])
+AM_CONDITIONAL(BUILD_TIMER_C, test x"$ac_cv_HAVE_CLOCK_GETTIME" = x"yes")
+
# extra warnings
EXTRA_WARNINGS=""
WARNLIST="
all
shadow
missing-prototypes
missing-declarations
strict-prototypes
declaration-after-statement
pointer-arith
write-strings
bad-function-cast
missing-format-attribute
format=2
format-security
format-nonliteral
no-long-long
unsigned-char
gnu89-inline
no-strict-aliasing
"
for j in $WARNLIST; do
if cc_supports_flag -W$j; then
EXTRA_WARNINGS="$EXTRA_WARNINGS -W$j";
fi
done
if test "x${enable_coverage}" = xyes && \
cc_supports_flag -ftest-coverage && \
cc_supports_flag -fprofile-arcs ; then
AC_MSG_NOTICE([Enabling Coverage (enable -O0 by default)])
OPT_CFLAGS="-O0"
COVERAGE_CFLAGS="-ftest-coverage -fprofile-arcs"
COVERAGE_LDFLAGS="-ftest-coverage -fprofile-arcs"
COVERAGE_LCRSO_EXTRA_LDFLAGS="-rdynamic"
PACKAGE_FEATURES="$PACKAGE_FEATURES coverage"
else
COVERAGE_CFLAGS=""
COVERAGE_LDFLAGS=""
COVERAGE_LCRSO_EXTRA_LDFLAGS=""
fi
if test "x${enable_small_memory_footprint}" = xyes ; then
AC_DEFINE_UNQUOTED([HAVE_SMALL_MEMORY_FOOTPRINT], 1, [have small_memory_footprint])
PACKAGE_FEATURES="$PACKAGE_FEATURES small-memory-footprint"
fi
if test "x${enable_ansi}" = xyes && \
cc_supports_flag -std=iso9899:199409 ; then
AC_MSG_NOTICE([Enabling ANSI Compatibility])
ANSI_CPPFLAGS="-ansi -D_GNU_SOURCE -DANSI_ONLY"
PACKAGE_FEATURES="$PACKAGE_FEATURES ansi"
else
ANSI_CPPFLAGS=""
fi
if test "x${enable_fatal_warnings}" = xyes && \
cc_supports_flag -Werror ; then
AC_MSG_NOTICE([Enabling Fatal Warnings (-Werror)])
WERROR_CFLAGS="-Werror"
PACKAGE_FEATURES="$PACKAGE_FEATURES fatal-warnings"
else
WERROR_CFLAGS=""
fi
# don't add addtional cflags
if test "x${enable_user_flags}" = xyes; then
OPT_CFLAGS=""
GDB_FLAGS=""
EXTRA_WARNINGS=""
fi
# final build of *FLAGS
CFLAGS="$ENV_CFLAGS $OPT_CFLAGS $GDB_FLAGS $OS_CFLAGS \
$COVERAGE_CFLAGS $EXTRA_WARNINGS $WERROR_CFLAGS $NSS_CFLAGS"
CPPFLAGS="$ENV_CPPFLAGS $ANSI_CPPFLAGS $OS_CPPFLAGS $GLIB_CFLAGS $RESMON_CFLAGS"
LDFLAGS="$ENV_LDFLAGS $COVERAGE_LDFLAGS $OS_LDFLAGS"
# substitute what we need:
AC_SUBST([INITDDIR])
AC_SUBST([COVERAGE_LCRSO_EXTRA_LDFLAGS])
AC_SUBST([OS_DYFLAGS])
AC_SUBST([OS_LDL])
AM_CONDITIONAL(BUILD_DARWIN, test -n "${DARWIN_OPTS}")
AM_CONDITIONAL(BUILD_SOLARIS, test -n "${SOLARIS_OPTS}")
AC_SUBST([DARWIN_OPTS])
AC_SUBST([SOLARIS_OPTS])
AM_CONDITIONAL(BUILD_HTML_DOCS, test -n "${GROFF}")
AC_SUBST([LINT_FLAGS])
AC_DEFINE_UNQUOTED([LCRSODIR], "$(eval echo ${LCRSODIR})", [LCRSO directory])
AC_DEFINE_UNQUOTED([SOCKETDIR], "$(eval echo ${SOCKETDIR})", [Socket directory])
AC_DEFINE_UNQUOTED([LOCALSTATEDIR], "$(eval echo ${localstatedir})", [localstate directory])
BOOTHSYSCONFDIR=${sysconfdir}/booth
AC_SUBST([HAVE_LOG_CIB_DIFF])
AC_SUBST([HAVE_XML_LOG_PATCHSET])
AC_SUBST([BOOTHSYSCONFDIR])
AC_DEFINE_UNQUOTED([BOOTHSYSCONFDIR], "$(eval echo ${BOOTHSYSCONFDIR})", [booth config directory])
AC_DEFINE_UNQUOTED([PACKAGE_FEATURES], "${PACKAGE_FEATURES}", [booth built-in features])
AC_OUTPUT
AC_MSG_RESULT([])
AC_MSG_RESULT([$PACKAGE configuration:])
AC_MSG_RESULT([ Version = ${VERSION}])
AC_MSG_RESULT([ Prefix = ${prefix}])
AC_MSG_RESULT([ Executables = ${sbindir}])
AC_MSG_RESULT([ Man pages = ${mandir}])
AC_MSG_RESULT([ Doc dir = ${docdir}])
AC_MSG_RESULT([ Libraries = ${libdir}])
AC_MSG_RESULT([ Header files = ${includedir}])
AC_MSG_RESULT([ Arch-independent files = ${datadir}])
AC_MSG_RESULT([ State information = ${localstatedir}])
AC_MSG_RESULT([ System configuration = ${sysconfdir}])
AC_MSG_RESULT([ System init.d directory = ${INITDDIR}])
AC_MSG_RESULT([ booth config dir = ${BOOTHSYSCONFDIR}])
AC_MSG_RESULT([ SOCKETDIR = ${SOCKETDIR}])
AC_MSG_RESULT([ Features =${PACKAGE_FEATURES}])
AC_MSG_RESULT([])
AC_MSG_RESULT([$PACKAGE build info:])
AC_MSG_RESULT([ Library SONAME = ${SONAME}])
LIB_MSG_RESULT(m4_shift(local_soname_list))dnl
AC_MSG_RESULT([ Default optimization = ${OPT_CFLAGS}])
AC_MSG_RESULT([ Default debug options = ${GDB_CFLAGS}])
AC_MSG_RESULT([ Extra compiler warnings = ${EXTRA_WARNING}])
AC_MSG_RESULT([ Env. defined CFLAG = ${ENV_CFLAGS}])
AC_MSG_RESULT([ Env. defined CPPFLAGS = ${ENV_CPPFLAGS}])
AC_MSG_RESULT([ Env. defined LDFLAGS = ${ENV_LDFLAGS}])
AC_MSG_RESULT([ OS defined CFLAGS = ${OS_CFLAGS}])
AC_MSG_RESULT([ OS defined CPPFLAGS = ${OS_CPPFLAGS}])
AC_MSG_RESULT([ OS defined LDFLAGS = ${OS_LDFLAGS}])
AC_MSG_RESULT([ OS defined LDL = ${OS_LDL}])
AC_MSG_RESULT([ OS defined DYFLAGS = ${OS_DYFLAGS}])
AC_MSG_RESULT([ ANSI defined CPPFLAGS = ${ANSI_CPPFLAGS}])
AC_MSG_RESULT([ Coverage CFLAGS = ${COVERAGE_CFLAGS}])
AC_MSG_RESULT([ Coverage LDFLAGS = ${COVERAGE_LDFLAGS}])
AC_MSG_RESULT([ Fatal War. CFLAGS = ${WERROR_CFLAGS}])
AC_MSG_RESULT([ Final CFLAGS = ${CFLAGS}])
AC_MSG_RESULT([ Final CPPFLAGS = ${CPPFLAGS}])
AC_MSG_RESULT([ Final LDFLAGS = ${LDFLAGS}])
diff --git a/src/Makefile.am b/src/Makefile.am
index b3b60dd..b43e511 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1,21 +1,25 @@
MAINTAINERCLEANFILES = Makefile.in
AM_CFLAGS = -fPIC -Werror -funsigned-char -Wno-pointer-sign
AM_CPPFLAGS = -I$(top_builddir)/include
sbin_PROGRAMS = boothd
boothd_SOURCES = config.c main.c raft.c ticket.c transport.c \
pacemaker.c handler.c
+if BUILD_TIMER_C
+boothd_SOURCES += timer.c
+endif
+
boothd_LDFLAGS = $(OS_DYFLAGS) -L./
boothd_LDADD = -lplumb -lplumbgpl -lz -lm
boothd_CPPFLAGS = $(GLIB_CFLAGS)
noinst_HEADERS = booth.h pacemaker.h \
config.h log.h raft.h ticket.h transport.h handler.h
lint:
-splint $(INCLUDES) $(LINT_FLAGS) $(CFLAGS) *.c
diff --git a/src/config.h b/src/config.h
index 48159a8..e8ef069 100644
--- a/src/config.h
+++ b/src/config.h
@@ -1,242 +1,235 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _CONFIG_H
#define _CONFIG_H
#include <stdint.h>
#include "booth.h"
+#include "timer.h"
#include "raft.h"
#include "transport.h"
/** @{ */
/** Definitions for in-RAM data. */
#define MAX_NODES 16
#define TICKET_ALLOC 16
struct ticket_config {
/** \name Configuration items.
* @{ */
/** Name of ticket. */
boothc_ticket name;
/** How many seconds a term lasts (if not refreshed). */
int term_duration;
/** Network related timeouts. */
int timeout;
/** Retries before giving up. */
int retries;
/** If >0, time to wait for a site to get fenced.
* The ticket may be acquired after that timespan by
* another site. */
int acquire_after; /* TODO: needed? */
/* Program to ask whether it makes sense to
* acquire the ticket */
char *ext_verifier;
/** Node weights. */
int weight[MAX_NODES];
/** @} */
/** \name Runtime values.
* @{ */
/** Current state. */
server_state_e state;
/** Next state. Used at startup. */
server_state_e next_state;
/** When something has to be done */
- struct timeval next_cron;
+ timetype next_cron;
/** Current leader. This is effectively the log[] in Raft. */
struct booth_site *leader;
/** Leader that got lost. */
struct booth_site *lost_leader;
/** Is the ticket granted? */
int is_granted;
/** Timestamp of leadership expiration */
time_t term_expires;
/** End of election period */
time_t election_end;
struct booth_site *voted_for;
/** Who the various sites vote for.
* NO_OWNER = no vote yet. */
struct booth_site *votes_for[MAX_NODES];
/* bitmap */
uint64_t votes_received;
/** Last voting round that was seen. */
uint32_t current_term;
/** Do ticket updates whenever we get enough heartbeats.
* But do that only once.
* This is reset to 0 whenever we broadcast heartbeat and set
* to 1 once enough acks are received.
* Increased to 2 when the ticket is commited to the CIB (see
* delay_commit).
*/
uint32_t ticket_updated;
/** @} */
/** */
uint32_t last_applied;
uint32_t next_index[MAX_NODES];
uint32_t match_index[MAX_NODES];
/* Why did we start the elections?
*/
cmd_reason_t election_reason;
/* if it is potentially dangerous to grant the ticket
* immediately, then this is set to some point in time,
* usually (now + term_duration + acquire_after)
*/
time_t delay_commit;
/* the last request RPC we sent
*/
uint32_t last_request;
/* if we expect some acks, then set this to the id of
* the RPC which others will send us; it is cleared once all
* replies were received
*/
uint32_t acks_expected;
/* bitmask of servers which sent acks
*/
uint64_t acks_received;
/* timestamp of the request, currently unused */
time_t req_sent_at;
/* we need to wait for MY_INDEX from other servers,
* hold the ticket processing for a while until they reply
*/
int start_postpone;
/* Do we need to update the copy in the CIB?
* Normally, the ticket is written only when it changes via
* the UPDATE RPC (for followers) and on expiration update
* (for leaders)
*/
int update_cib;
/* Is this ticket in election?
*/
int in_election;
/* don't log warnings unnecessarily
*/
int expect_more_rejects;
/** \name Needed while proposals are being done.
* @{ */
/* Need to keep the previous valid ticket in case we moved to
* start new elections and another server asks for the ticket
* status. It would be wrong to send our candidate ticket.
*/
struct ticket_config *last_valid_tk;
/** Whom to vote for the next time.
* Needed to push a ticket to someone else. */
#if 0
/** Bitmap of sites that acknowledge that state. */
uint64_t proposal_acknowledges;
/** When an incompletely acknowledged proposal gets done.
* If all peers agree, that happens sooner.
* See switch_state_to(). */
struct timeval proposal_switch;
/** Timestamp of proposal expiration. */
time_t proposal_expires;
#endif
/** Number of send retries left.
* Used on the new owner.
* Starts at 0, counts up. */
int retry_number;
/** @} */
};
struct booth_config {
char name[BOOTH_NAME_LEN];
transport_layer_t proto;
uint16_t port;
/** Stores the OR of sites bitmasks. */
uint64_t sites_bits;
/** Stores the OR of all members' bitmasks. */
uint64_t all_bits;
char site_user[BOOTH_NAME_LEN];
char site_group[BOOTH_NAME_LEN];
char arb_user[BOOTH_NAME_LEN];
char arb_group[BOOTH_NAME_LEN];
uid_t uid;
gid_t gid;
int site_count;
struct booth_site site[MAX_NODES];
int ticket_count;
int ticket_allocated;
struct ticket_config *ticket;
};
extern struct booth_config *booth_conf;
int read_config(const char *path, int type);
int check_config(int type);
int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type);
int find_site_by_id(uint32_t site_id, struct booth_site **node);
const char *type_to_string(int type);
-
-#include <stdio.h>
-#define R(tk_) do { if (ANYDEBUG) printf("## %12s:%3d state %s, %d, " \
- "leader %s, exp %s", __FILE__, __LINE__, \
- state_to_string(tk_->state), tk_->current_term, \
- site_string(tk_->leader), ctime(&tk_->term_expires)); } while(0)
-
-
#endif /* _CONFIG_H */
diff --git a/src/handler.c b/src/handler.c
index 885997c..cdfae4d 100644
--- a/src/handler.c
+++ b/src/handler.c
@@ -1,70 +1,70 @@
/*
* Copyright (C) 2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <inttypes.h>
#include <stdio.h>
#include <assert.h>
#include <time.h>
#include "ticket.h"
#include "config.h"
#include "inline-fn.h"
#include "log.h"
#include "pacemaker.h"
#include "booth.h"
#include "handler.h"
/** Runs an external handler.
* See eg. 'before-acquire-handler'.
* TODO: timeout, async operation?. */
int run_handler(struct ticket_config *tk,
const char *cmd, int synchronous)
{
int rv;
char expires[16];
if (!cmd)
return 0;
assert(synchronous);
- sprintf(expires, "%" PRId64, (int64_t)(tk->term_expires));
+ sprintf(expires, "%" PRId64, (int64_t)wall_ts(tk->term_expires));
rv = setenv("BOOTH_TICKET", tk->name, 1) ||
setenv("BOOTH_LOCAL", local->addr_string, 1) ||
setenv("BOOTH_CONF_NAME", booth_conf->name, 1) ||
setenv("BOOTH_CONF_PATH", cl.configfile, 1) ||
setenv("BOOTH_TICKET_EXPIRES", expires, 1);
if (rv) {
log_error("Cannot set environment: %s", strerror(errno));
} else {
rv = system(cmd);
if (rv)
tk_log_warn("handler \"%s\" exited with error %s",
cmd, interpret_rv(rv));
else
tk_log_debug("handler \"%s\" exited with success", cmd);
}
return rv;
}
diff --git a/src/inline-fn.h b/src/inline-fn.h
index 6d64461..2721782 100644
--- a/src/inline-fn.h
+++ b/src/inline-fn.h
@@ -1,325 +1,296 @@
/*
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _INLINE_FN_H
#define _INLINE_FN_H
#include <time.h>
#include <sys/time.h>
#include <assert.h>
#include <string.h>
+#include "timer.h"
#include "config.h"
#include "transport.h"
inline static uint32_t get_local_id(void)
{
return local ? local->site_id : -1;
}
inline static uint32_t get_node_id(struct booth_site *node)
{
return node ? node->site_id : NO_ONE;
}
inline static int term_time_left(const struct ticket_config *tk)
{
int left;
- left = tk->term_expires - time(NULL);
+ left = tk->term_expires - get_secs(NULL);
return (left < 0) ? 0 : left;
}
/** Returns number of seconds left, if any. */
inline static int leader_and_valid(const struct ticket_config *tk)
{
if (tk->leader != local)
return 0;
return term_time_left(tk);
}
/** Is this some leader? */
inline static int is_owned(const struct ticket_config *tk)
{
return (tk->leader && tk->leader != no_leader);
}
static inline void init_header_bare(struct boothc_header *h) {
h->magic = htonl(BOOTHC_MAGIC);
h->version = htonl(BOOTHC_VERSION);
h->from = htonl(local->site_id);
h->iv = htonl(0);
h->auth1 = htonl(0);
h->auth2 = htonl(0);
}
static inline void init_header(struct boothc_header *h,
int cmd, int request, int options,
int result, int reason, int data_len)
{
init_header_bare(h);
h->length = htonl(data_len);
h->cmd = htonl(cmd);
h->request = htonl(request);
h->options = htonl(options);
h->result = htonl(result);
h->reason = htonl(reason);
}
static inline void init_ticket_site_header(struct boothc_ticket_msg *msg, int cmd)
{
init_header(&msg->header, cmd, 0, 0, 0, 0, sizeof(*msg));
}
#define my_last_term(tk) \
(((tk)->state == ST_CANDIDATE && (tk)->last_valid_tk->current_term) ? \
(tk)->last_valid_tk->current_term : (tk)->current_term)
static inline void init_ticket_msg(struct boothc_ticket_msg *msg,
int cmd, int request, int rv, int reason,
struct ticket_config *tk)
{
assert(sizeof(msg->ticket.id) == sizeof(tk->name));
init_header(&msg->header, cmd, request, 0, rv, reason, sizeof(*msg));
if (!tk) {
memset(&msg->ticket, 0, sizeof(msg->ticket));
} else {
memcpy(msg->ticket.id, tk->name, sizeof(msg->ticket.id));
msg->ticket.leader = htonl(get_node_id(
(tk->leader && tk->leader != no_leader) ? tk->leader : tk->voted_for));
msg->ticket.term = htonl(tk->current_term);
msg->ticket.term_valid_for = htonl(term_time_left(tk));
}
}
static inline struct booth_transport const *transport(void)
{
return booth_transport + booth_conf->proto;
}
static inline const char *site_string(struct booth_site *site)
{
return site ? site->addr_string : "NONE";
}
static inline const char *ticket_leader_string(struct ticket_config *tk)
{
return site_string(tk->leader);
}
static inline void disown_ticket(struct ticket_config *tk)
{
tk->leader = NULL;
tk->is_granted = 0;
- time(&tk->term_expires);
+ get_secs(&tk->term_expires);
}
static inline int disown_if_expired(struct ticket_config *tk)
{
- if (time(NULL) >= tk->term_expires ||
+ if (get_secs(NULL) >= tk->term_expires ||
!tk->leader) {
disown_ticket(tk);
return 1;
}
return 0;
}
/* We allow half of the uint32_t to be used;
* half of that below, half of that above the current known "good" value.
* 0 UINT32_MAX
* |--------------------------+----------------+------------|
* | | |
* |--------+-------| allowed range
* |
* current commit index
*
* So, on overflow it looks like that:
* UINT32_MAX 0
* |--------------------------+-----------||---+------------|
* | | |
* |--------+-------| allowed range
* |
* current commit index
*
* This should be possible by using the same datatype and relying
* on the under/overflow semantics.
*
*
* Having 30 bits available, and assuming an expire time of
* one minute and a (high) commit index step of 64 == 2^6 (because
* of weights), we get 2^24 minutes of range - which is ~750
* years. "Should be enough for everybody."
*/
static inline int index_is_higher_than(uint32_t c_high, uint32_t c_low)
{
uint32_t diff;
if (c_high == c_low)
return 0;
diff = c_high - c_low;
if (diff < UINT32_MAX/4)
return 1;
diff = c_low - c_high;
if (diff < UINT32_MAX/4)
return 0;
assert(!"commit index out of range - invalid");
}
static inline uint32_t index_max2(uint32_t a, uint32_t b)
{
return index_is_higher_than(a, b) ? a : b;
}
static inline uint32_t index_max3(uint32_t a, uint32_t b, uint32_t c)
{
return index_max2( index_max2(a, b), c);
}
-static inline double timeval_to_float(struct timeval tv)
-{
- return tv.tv_sec + tv.tv_usec*(double)1.0e-6;
-}
-
-static inline int timeval_msec(struct timeval tv)
-{
- int m;
-
- m = tv.tv_usec / 1000;
- if (m >= 1000)
- m = 999;
- return m;
-}
-
-
-static inline int timeval_compare(struct timeval tv1, struct timeval tv2)
-{
- if (tv1.tv_sec < tv2.tv_sec)
- return -1;
- if (tv1.tv_sec > tv2.tv_sec)
- return +1;
- if (tv1.tv_usec < tv2.tv_usec)
- return -1;
- if (tv1.tv_usec > tv2.tv_usec)
- return +1;
- return 0;
-}
-
-
static inline time_t next_vote_starts_at(struct ticket_config *tk)
{
time_t half_exp, retries_needed, t;
/* If not owner, don't renew. */
if (tk->leader != local)
return 0;
/* Try to renew at half of expiry time. */
half_exp = tk->term_expires - tk->term_duration/2;
/* Also start renewal if we couldn't get
* a few message retransmission in the alloted
* expiry time. */
retries_needed = tk->term_expires - tk->timeout * tk->retries/2;
/* Return earlier timestamp. */
t = min(half_exp, retries_needed);
return t;
}
static inline int should_start_renewal(struct ticket_config *tk)
{
time_t now, when;
when = next_vote_starts_at(tk);
if (!when)
return 0;
- time(&now);
+ get_secs(&now);
return when <= now;
}
static inline void expect_replies(struct ticket_config *tk,
int reply_type)
{
tk->retry_number = 0;
tk->acks_expected = reply_type;
tk->acks_received = local->bitmask;
- tk->req_sent_at = time(NULL);
+ tk->req_sent_at = get_secs(NULL);
tk->ticket_updated = 0;
}
static inline void no_resends(struct ticket_config *tk)
{
tk->retry_number = 0;
tk->acks_expected = 0;
}
static inline struct booth_site *my_vote(struct ticket_config *tk)
{
return tk->votes_for[ local->index ];
}
static inline int count_bits(uint64_t val) {
return __builtin_popcount(val);
}
static inline int majority_of_bits(struct ticket_config *tk, uint64_t val)
{
/* Use ">" to get majority decision, even for an even number
* of participants. */
return count_bits(val) * 2 >
booth_conf->site_count;
}
static inline int all_replied(struct ticket_config *tk)
{
return !(tk->acks_received ^ booth_conf->all_bits);
}
static inline int all_sites_replied(struct ticket_config *tk)
{
return !((tk->acks_received & booth_conf->sites_bits) ^ booth_conf->sites_bits);
}
#endif
diff --git a/src/main.c b/src/main.c
index 376cf6a..cb60927 100644
--- a/src/main.c
+++ b/src/main.c
@@ -1,1367 +1,1367 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sched.h>
#include <errno.h>
#include <limits.h>
#include <sys/file.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/poll.h>
#include <pacemaker/crm/services.h>
#include <clplumbing/setproctitle.h>
#include <sys/prctl.h>
#include <clplumbing/coredumps.h>
#include <fcntl.h>
#include <string.h>
#include <assert.h>
#include <error.h>
#include <sys/ioctl.h>
#include <termios.h>
#include <signal.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include "log.h"
#include "booth.h"
#include "config.h"
#include "transport.h"
#include "inline-fn.h"
#include "pacemaker.h"
#include "ticket.h"
#define RELEASE_VERSION "0.2.0"
#define CLIENT_NALLOC 32
int daemonize = 0;
int enable_stderr = 0;
time_t start_time;
/** Structure for "clients".
* Filehandles with incoming data get registered here (and in pollfds),
* along with their callbacks.
* Because these can be reallocated with every new fd, addressing
* happens _only_ by their numeric index. */
struct client *clients = NULL;
struct pollfd *pollfds = NULL;
static int client_maxi;
static int client_size = 0;
static const struct booth_site _no_leader = {
.addr_string = "none",
.site_id = NO_ONE,
};
struct booth_site *no_leader = (struct booth_site*)& _no_leader;
typedef enum
{
BOOTHD_STARTED=0,
BOOTHD_STARTING
} BOOTH_DAEMON_STATE;
int poll_timeout = POLL_TIMEOUT;
struct booth_config *booth_conf;
struct command_line cl;
int do_read(int fd, void *buf, size_t count)
{
int rv, off = 0;
while (off < count) {
rv = read(fd, (char *)buf + off, count - off);
if (rv == 0)
return -1;
if (rv == -1 && errno == EINTR)
continue;
if (rv == -1)
return -1;
off += rv;
}
return 0;
}
int do_write(int fd, void *buf, size_t count)
{
int rv, off = 0;
retry:
rv = write(fd, (char *)buf + off, count);
if (rv == -1 && errno == EINTR)
goto retry;
/* If we cannot write _any_ data, we'd be in an (potential) loop. */
if (rv <= 0) {
log_error("write failed: %s (%d)", strerror(errno), errno);
return rv;
}
if (rv != count) {
count -= rv;
off += rv;
goto retry;
}
return 0;
}
static void client_alloc(void)
{
int i;
if (!clients) {
clients = malloc(CLIENT_NALLOC * sizeof(struct client));
pollfds = malloc(CLIENT_NALLOC * sizeof(struct pollfd));
} else {
clients = realloc(clients, (client_size + CLIENT_NALLOC) *
sizeof(struct client));
pollfds = realloc(pollfds, (client_size + CLIENT_NALLOC) *
sizeof(struct pollfd));
}
if (!clients || !pollfds) {
log_error("can't alloc for client array");
exit(1);
}
for (i = client_size; i < client_size + CLIENT_NALLOC; i++) {
clients[i].workfn = NULL;
clients[i].deadfn = NULL;
clients[i].fd = -1;
pollfds[i].fd = -1;
pollfds[i].revents = 0;
}
client_size += CLIENT_NALLOC;
}
static void client_dead(int ci)
{
if (clients[ci].fd != -1)
close(clients[ci].fd);
clients[ci].fd = -1;
clients[ci].workfn = NULL;
pollfds[ci].fd = -1;
}
int client_add(int fd, const struct booth_transport *tpt,
void (*workfn)(int ci),
void (*deadfn)(int ci))
{
int i;
struct client *c;
if (client_size + 2 >= client_maxi ) {
client_alloc();
}
for (i = 0; i < client_size; i++) {
c = clients + i;
if (c->fd != -1)
continue;
c->workfn = workfn;
if (deadfn)
c->deadfn = deadfn;
else
c->deadfn = client_dead;
c->transport = tpt;
c->fd = fd;
pollfds[i].fd = fd;
pollfds[i].events = POLLIN;
if (i > client_maxi)
client_maxi = i;
return i;
}
assert(!"no client");
}
/* Only used for client requests, TCP ???*/
void process_connection(int ci)
{
struct boothc_ticket_msg msg;
int rv, len, expr, fd;
void (*deadfn) (int ci);
fd = clients[ci].fd;
rv = do_read(fd, &msg.header, sizeof(msg.header));
if (rv < 0) {
if (errno == ECONNRESET)
log_debug("client %d connection reset for fd %d",
ci, clients[ci].fd);
goto kill;
}
if (check_boothc_header(&msg.header, -1) < 0)
goto kill;
/* Basic sanity checks already done. */
len = ntohl(msg.header.length);
if (len) {
if (len != sizeof(msg)) {
bad_len:
log_error("got wrong length %u", len);
return;
}
expr = len - sizeof(msg.header);
rv = do_read(clients[ci].fd, msg.header.data, expr);
if (rv < 0) {
log_error("connection %d read data error %d, wanted %d",
ci, rv, expr);
goto kill;
}
}
/* For CMD_GRANT and CMD_REVOKE:
* Don't close connection immediately, but send
* result a second later? */
switch (ntohl(msg.header.cmd)) {
case CMD_LIST:
ticket_answer_list(fd, &msg);
goto kill;
case CMD_GRANT:
/* Expect boothc_ticket_site_msg. */
if (len != sizeof(msg))
goto bad_len;
ticket_answer_grant(fd, &msg);
goto kill;
case CMD_REVOKE:
/* Expect boothc_ticket_site_msg. */
if (len != sizeof(msg))
goto bad_len;
ticket_answer_revoke(fd, &msg);
goto kill;
default:
log_error("connection %d cmd %x unknown",
ci, ntohl(msg.header.cmd));
init_header(&msg.header,CMR_GENERAL, 0, 0, RLT_INVALID_ARG, 0, sizeof(msg.header));
send_header_only(fd, &msg.header);
goto kill;
}
assert(0);
return;
kill:
deadfn = clients[ci].deadfn;
if(deadfn) {
deadfn(ci);
}
return;
}
/** Callback function for the listening TCP socket. */
static void process_listener(int ci)
{
int fd, i;
fd = accept(clients[ci].fd, NULL, NULL);
if (fd < 0) {
log_error("process_listener: accept error for fd %d: %s (%d)",
clients[ci].fd, strerror(errno), errno);
if (clients[ci].deadfn)
clients[ci].deadfn(ci);
return;
}
i = client_add(fd, clients[ci].transport, process_connection, NULL);
log_debug("add client connection %d fd %d", i, fd);
}
static int setup_config(int type)
{
int rv;
rv = read_config(cl.configfile, type);
if (rv < 0)
goto out;
/* Set "local" pointer, ignoring errors. */
if (cl.type == DAEMON && cl.site[0]) {
if (!find_site_by_name(cl.site, &local, 1)) {
log_error("Cannot find \"%s\" in the configuration.",
cl.site);
return -EINVAL;
}
local->local = 1;
} else
find_myself(NULL, type == CLIENT);
rv = check_config(type);
if (rv < 0)
goto out;
/* Per default the PID file name is derived from the
* configuration name. */
if (!cl.lockfile[0]) {
snprintf(cl.lockfile, sizeof(cl.lockfile)-1,
"%s/%s.pid", BOOTH_RUN_DIR, booth_conf->name);
}
out:
return rv;
}
static int setup_transport(void)
{
int rv;
rv = transport()->init(message_recv);
if (rv < 0) {
log_error("failed to init booth_transport %s", transport()->name);
goto out;
}
rv = booth_transport[TCP].init(NULL);
if (rv < 0) {
log_error("failed to init booth_transport[TCP]");
goto out;
}
out:
return rv;
}
static int write_daemon_state(int fd, int state)
{
char buffer[1024];
int rv, size;
size = sizeof(buffer) - 1;
rv = snprintf(buffer, size,
"booth_pid=%d "
"booth_state=%s "
"booth_type=%s "
"booth_cfg_name='%s' "
"booth_id=%d "
"booth_addr_string='%s' "
"booth_port=%d\n",
getpid(),
( state == BOOTHD_STARTED ? "started" :
state == BOOTHD_STARTING ? "starting" :
"invalid"),
type_to_string(local->type),
booth_conf->name,
local->site_id,
local->addr_string,
booth_conf->port);
if (rv < 0 || rv == size) {
log_error("Buffer filled up in write_daemon_state().");
return -1;
}
size = rv;
rv = ftruncate(fd, 0);
if (rv < 0) {
log_error("lockfile %s truncate error %d: %s",
cl.lockfile, errno, strerror(errno));
return rv;
}
rv = lseek(fd, 0, SEEK_SET);
if (rv < 0) {
log_error("lseek set fd(%d) offset to 0 error, return(%d), message(%s)",
fd, rv, strerror(errno));
rv = -1;
return rv;
}
rv = write(fd, buffer, size);
if (rv != size) {
log_error("write to fd(%d, %d) returned %d, errno %d, message(%s)",
fd, size,
rv, errno, strerror(errno));
return -1;
}
return 0;
}
static int loop(int fd)
{
void (*workfn) (int ci);
void (*deadfn) (int ci);
int rv, i;
rv = setup_transport();
if (rv < 0)
goto fail;
rv = setup_ticket();
if (rv < 0)
goto fail;
client_add(local->tcp_fd, booth_transport + TCP,
process_listener, NULL);
rv = write_daemon_state(fd, BOOTHD_STARTED);
if (rv != 0) {
log_error("write daemon state %d to lockfile error %s: %s",
BOOTHD_STARTED, cl.lockfile, strerror(errno));
goto fail;
}
if (cl.type == ARBITRATOR)
log_info("BOOTH arbitrator daemon started");
else if (cl.type == SITE)
log_info("BOOTH cluster site daemon started");
while (1) {
rv = poll(pollfds, client_maxi + 1, poll_timeout);
if (rv == -1 && errno == EINTR)
continue;
if (rv < 0) {
log_error("poll failed: %s (%d)", strerror(errno), errno);
goto fail;
}
for (i = 0; i <= client_maxi; i++) {
if (clients[i].fd < 0)
continue;
if (pollfds[i].revents & POLLIN) {
workfn = clients[i].workfn;
if (workfn)
workfn(i);
}
if (pollfds[i].revents &
(POLLERR | POLLHUP | POLLNVAL)) {
deadfn = clients[i].deadfn;
if (deadfn)
deadfn(i);
}
}
process_tickets();
}
return 0;
fail:
return -1;
}
static int query_get_string_answer(cmd_request_t cmd)
{
struct booth_site *site;
struct boothc_header reply;
char *data;
int data_len;
int rv;
struct booth_transport const *tpt;
data = NULL;
init_header(&cl.msg.header, cmd, 0, cl.options, 0, 0, sizeof(cl.msg));
if (!*cl.site)
site = local;
else if (!find_site_by_name(cl.site, &site, 1)) {
log_error("cannot find site \"%s\"", cl.site);
rv = ENOENT;
goto out;
}
tpt = booth_transport + TCP;
rv = tpt->open(site);
if (rv < 0)
goto out_free;
rv = tpt->send(site, &cl.msg, sizeof(cl.msg));
if (rv < 0)
goto out_free;
rv = tpt->recv(site, &reply, sizeof(reply));
if (rv < 0)
goto out_free;
data_len = ntohl(reply.length) - sizeof(reply);
data = malloc(data_len);
if (!data) {
rv = -ENOMEM;
goto out_free;
}
rv = tpt->recv(site, data, data_len);
if (rv < 0)
goto out_free;
do_write(STDOUT_FILENO, data, data_len);
rv = 0;
out_free:
free(data);
tpt->close(site);
out:
return rv;
}
static int test_reply(int reply_code, cmd_request_t cmd)
{
int rv = 0;
const char *op_str;
if (cmd == CMD_GRANT)
op_str = "grant";
else if (cmd == CMD_REVOKE)
op_str = "revoke";
else {
log_error("internal error reading reply result!");
return -1;
}
switch (reply_code) {
case RLT_OVERGRANT:
log_info("You're granting a granted ticket. "
"If you wanted to migrate a ticket, "
"use revoke first, then use grant.");
rv = -1;
break;
case RLT_TICKET_IDLE:
log_info("ticket is not owned");
rv = 0;
break;
case RLT_ASYNC:
log_info("%s command sent, result will be returned "
"asynchronously. Please use \"booth list\" to "
"see the outcome.", op_str);
rv = 0;
break;
case RLT_SYNC_SUCC:
case RLT_SUCCESS:
log_info("%s succeeded!", op_str);
rv = 0;
break;
case RLT_SYNC_FAIL:
log_info("%s failed!", op_str);
rv = -1;
break;
case RLT_INVALID_ARG:
log_error("ticket \"%s\" does not exist",
cl.msg.ticket.id);
break;
case RLT_EXT_FAILED:
log_error("before-acquire-handler for ticket \"%s\" failed, grant denied",
cl.msg.ticket.id);
break;
case RLT_REDIRECT:
/* talk to another site */
rv = 1;
break;
default:
log_error("got an error code: %x", rv);
rv = -1;
}
return rv;
}
static int do_command(cmd_request_t cmd)
{
struct booth_site *site;
struct boothc_ticket_msg reply;
struct booth_transport const *tpt;
uint32_t leader_id;
int rv;
rv = 0;
site = NULL;
if (!*cl.site)
site = local;
else {
if (!find_site_by_name(cl.site, &site, 1)) {
log_error("Site \"%s\" not configured.", cl.site);
goto out_close;
}
}
if (site->type == ARBITRATOR) {
log_error("Site \"%s\" is an arbitrator, cannot grant/revoke ticket there.", cl.site);
goto out_close;
}
assert(site->type == SITE);
/* We don't check for existence of ticket, so that asking can be
* done without local configuration, too.
* Although, that means that the UDP port has to be specified, too. */
if (!cl.msg.ticket.id[0]) {
/* If the loaded configuration has only a single ticket defined, use that. */
if (booth_conf->ticket_count == 1) {
strcpy(cl.msg.ticket.id, booth_conf->ticket[0].name);
} else {
log_error("No ticket given.");
goto out_close;
}
}
redirect:
init_header(&cl.msg.header, cmd, 0, cl.options, 0, 0, sizeof(cl.msg));
/* Always use TCP for client - at least for now. */
tpt = booth_transport + TCP;
rv = tpt->open(site);
if (rv < 0)
goto out_close;
rv = tpt->send(site, &cl.msg, sizeof(cl.msg));
if (rv < 0)
goto out_close;
rv = tpt->recv(site, &reply, sizeof(reply));
if (rv < 0)
goto out_close;
rv = test_reply(ntohl(reply.header.result), cmd);
if (rv == 1) {
local_transport->close(site);
leader_id = ntohl(reply.ticket.leader);
if (!find_site_by_id(leader_id, &site)) {
log_error("Message with unknown redirect site %x received", leader_id);
return rv;
}
goto redirect;
}
out_close:
if (site)
local_transport->close(site);
return rv;
}
static int do_grant(void)
{
return do_command(CMD_GRANT);
}
static int do_revoke(void)
{
return do_command(CMD_REVOKE);
}
static int _lockfile(int mode, int *fdp, pid_t *locked_by)
{
struct flock lock;
int fd, rv;
/* After reboot the directory may not yet exist.
* Try to create it, but ignore errors. */
if (strncmp(cl.lockfile, BOOTH_RUN_DIR,
strlen(BOOTH_RUN_DIR)) == 0)
mkdir(BOOTH_RUN_DIR, 0775);
if (locked_by)
*locked_by = 0;
*fdp = -1;
fd = open(cl.lockfile, mode, 0664);
if (fd < 0)
return errno;
*fdp = fd;
lock.l_type = F_WRLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
lock.l_pid = 0;
if (fcntl(fd, F_SETLK, &lock) == 0)
return 0;
rv = errno;
if (locked_by)
if (fcntl(fd, F_GETLK, &lock) == 0)
*locked_by = lock.l_pid;
return rv;
}
static inline int is_root(void)
{
/* TODO: getuid()? Better way to check? */
return geteuid() == 0;
}
static int create_lockfile(void)
{
int rv, fd;
fd = -1;
rv = _lockfile(O_CREAT | O_WRONLY, &fd, NULL);
if (fd == -1) {
log_error("lockfile %s open error %d: %s",
cl.lockfile, rv, strerror(rv));
return -1;
}
if (rv < 0) {
log_error("lockfile %s setlk error %d: %s",
cl.lockfile, rv, strerror(rv));
goto fail;
}
rv = write_daemon_state(fd, BOOTHD_STARTING);
if (rv != 0) {
log_error("write daemon state %d to lockfile error %s: %s",
BOOTHD_STARTING, cl.lockfile, strerror(errno));
goto fail;
}
if (is_root()) {
if (fchown(fd, booth_conf->uid, booth_conf->gid) < 0)
log_error("fchown() on lockfile said %d: %s",
errno, strerror(errno));
}
return fd;
fail:
close(fd);
return -1;
}
static void unlink_lockfile(int fd)
{
unlink(cl.lockfile);
close(fd);
}
static void print_usage(void)
{
printf("Usages:\n");
printf(" booth daemon [-c config] [-D]\n");
printf(" booth [client] {list|grant|revoke} [options]\n");
printf(" booth status [-c config] [-D]\n");
printf("\n");
printf("Client operations:\n");
printf(" list: List all the tickets\n");
printf(" grant: Grant ticket to site\n");
printf(" revoke: Revoke ticket from site\n");
printf("\n");
printf("Options:\n");
printf(" -c FILE Specify config file [default " BOOTH_DEFAULT_CONF "]\n");
printf(" Can be a path or a name without \".conf\" suffix\n");
printf(" -D Enable debugging to stderr and don't fork\n");
printf(" -S Systemd mode (no forking)\n");
printf(" -t ticket name\n");
printf(" -s site name\n");
printf(" -l LOCKFILE Specify lock file path (daemon only)\n");
printf(" -F Try to grant the ticket immediately (client only)\n");
printf(" -h Print this help, then exit\n");
printf("\n");
printf("Please see the man page for details.\n");
}
#define OPTION_STRING "c:Dl:t:s:FhS"
void safe_copy(char *dest, char *value, size_t buflen, const char *description) {
int content_len = buflen - 1;
if (strlen(value) >= content_len) {
fprintf(stderr, "'%s' exceeds maximum %s length of %d\n",
value, description, content_len);
exit(EXIT_FAILURE);
}
strncpy(dest, value, content_len);
dest[content_len] = 0;
}
static int host_convert(char *hostname, char *ip_str, size_t ip_size)
{
struct addrinfo *result = NULL, hints = {0};
int re = -1;
memset(&hints, 0, sizeof(hints));
hints.ai_family = BOOTH_PROTO_FAMILY;
hints.ai_socktype = SOCK_DGRAM;
re = getaddrinfo(hostname, NULL, &hints, &result);
if (re == 0) {
struct in_addr addr = ((struct sockaddr_in *)result->ai_addr)->sin_addr;
const char *re_ntop = inet_ntop(BOOTH_PROTO_FAMILY, &addr, ip_str, ip_size);
if (re_ntop == NULL) {
re = -1;
}
}
freeaddrinfo(result);
return re;
}
static int read_arguments(int argc, char **argv)
{
int optchar;
char *arg1 = argv[1];
char *op = NULL;
char *cp;
char site_arg[INET_ADDRSTRLEN] = {0};
int left;
if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") ||
!strcmp(arg1, "-h")) {
print_usage();
exit(EXIT_SUCCESS);
}
if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") ||
!strcmp(arg1, "-V")) {
printf("%s %s (built %s %s)\n",
argv[0], RELEASE_VERSION, __DATE__, __TIME__);
exit(EXIT_SUCCESS);
}
if (strcmp(arg1, "arbitrator") == 0 ||
strcmp(arg1, "site") == 0 ||
strcmp(arg1, "start") == 0 ||
strcmp(arg1, "daemon") == 0) {
cl.type = DAEMON;
optind = 2;
} else if (strcmp(arg1, "status") == 0) {
cl.type = STATUS;
optind = 2;
} else if (strcmp(arg1, "client") == 0) {
cl.type = CLIENT;
if (argc < 3) {
print_usage();
exit(EXIT_FAILURE);
}
op = argv[2];
optind = 3;
} else {
cl.type = CLIENT;
op = argv[1];
optind = 2;
}
if (cl.type == CLIENT) {
if (!strcmp(op, "list"))
cl.op = CMD_LIST;
else if (!strcmp(op, "grant"))
cl.op = CMD_GRANT;
else if (!strcmp(op, "revoke"))
cl.op = CMD_REVOKE;
else {
fprintf(stderr, "client operation \"%s\" is unknown\n",
op);
exit(EXIT_FAILURE);
}
}
while (optind < argc) {
optchar = getopt(argc, argv, OPTION_STRING);
switch (optchar) {
case 'c':
if (strchr(optarg, '/')) {
safe_copy(cl.configfile, optarg,
sizeof(cl.configfile), "config file");
} else {
/* If no "/" in there, use with default directory. */
strcpy(cl.configfile, BOOTH_DEFAULT_CONF_DIR);
cp = cl.configfile + strlen(BOOTH_DEFAULT_CONF_DIR);
assert(cp > cl.configfile);
assert(*(cp-1) == '/');
/* Write at the \0, ie. after the "/" */
safe_copy(cp, optarg,
(sizeof(cl.configfile) -
(cp - cl.configfile) -
strlen(BOOTH_DEFAULT_CONF_EXT)),
"config name");
/* If no extension, append ".conf".
* Space is available, see -strlen() above. */
if (!strchr(cp, '.'))
strcat(cp, BOOTH_DEFAULT_CONF_EXT);
}
break;
case 'D':
debug_level++;
enable_stderr = 1;
/* Fall through */
case 'S':
daemonize = 1;
break;
case 'l':
safe_copy(cl.lockfile, optarg, sizeof(cl.lockfile), "lock file");
break;
case 't':
if (cl.op == CMD_GRANT || cl.op == CMD_REVOKE) {
safe_copy(cl.msg.ticket.id, optarg,
sizeof(cl.msg.ticket.id), "ticket name");
} else {
print_usage();
exit(EXIT_FAILURE);
}
break;
case 's':
/* For testing and debugging: allow "-s site" also for
* daemon start, so that the address that should be used
* can be set manually.
* This makes it easier to start multiple processes
* on one machine. */
if (cl.type == CLIENT ||
(cl.type == DAEMON && debug_level)) {
int re = host_convert(optarg, site_arg, INET_ADDRSTRLEN);
if (re == 0) {
safe_copy(cl.site, site_arg, sizeof(cl.site), "site name");
} else {
safe_copy(cl.site, optarg, sizeof(cl.site), "site name");
}
} else {
log_error("\"-s\" not allowed in daemon mode.");
exit(EXIT_FAILURE);
}
break;
case 'F':
if (cl.type != CLIENT || cl.op != CMD_GRANT) {
log_error("use \"-F\" only for client grant");
exit(EXIT_FAILURE);
}
cl.options |= OPT_IMMEDIATE;
break;
case 'h':
print_usage();
exit(EXIT_SUCCESS);
break;
case ':':
case '?':
fprintf(stderr, "Please use '-h' for usage.\n");
exit(EXIT_FAILURE);
break;
case -1:
/* No more parameters on cmdline, only arguments. */
goto extra_args;
default:
goto unknown;
};
}
return 0;
extra_args:
if (cl.type == CLIENT && !cl.msg.ticket.id[0]) {
/* Use additional argument as ticket name. */
safe_copy(cl.msg.ticket.id,
argv[optind],
sizeof(cl.msg.ticket.id),
"ticket name");
optind++;
}
if (optind == argc)
return 0;
left = argc - optind;
fprintf(stderr, "Superfluous argument%s: %s%s\n",
left == 1 ? "" : "s",
argv[optind],
left == 1 ? "" : "...");
exit(EXIT_FAILURE);
unknown:
fprintf(stderr, "unknown option: %s\n", argv[optind]);
exit(EXIT_FAILURE);
}
static void set_scheduler(void)
{
struct sched_param sched_param;
struct rlimit rlimit;
int rv;
rlimit.rlim_cur = RLIM_INFINITY;
rlimit.rlim_max = RLIM_INFINITY;
setrlimit(RLIMIT_MEMLOCK, &rlimit);
rv = mlockall(MCL_CURRENT | MCL_FUTURE);
if (rv < 0) {
log_error("mlockall failed");
}
rv = sched_get_priority_max(SCHED_RR);
if (rv != -1) {
sched_param.sched_priority = rv;
rv = sched_setscheduler(0, SCHED_RR, &sched_param);
if (rv == -1)
log_error("could not set SCHED_RR priority %d: %s (%d)",
sched_param.sched_priority,
strerror(errno), errno);
} else {
log_error("could not get maximum scheduler priority err %d",
errno);
}
}
static void set_oom_adj(int val)
{
FILE *fp;
fp = fopen("/proc/self/oom_adj", "w");
if (!fp)
return;
fprintf(fp, "%i", val);
fclose(fp);
}
static int do_status(int type)
{
pid_t pid;
int rv, lock_fd, ret;
const char *reason = NULL;
char lockfile_data[1024], *cp;
ret = PCMK_OCF_NOT_RUNNING;
/* TODO: query all, and return quit only if it's _cleanly_ not
* running, ie. _neither_ of port/lockfile/process is available?
*
* Currently a single failure says "not running", even if "only" the
* lockfile has been removed. */
rv = setup_config(type);
if (rv) {
reason = "Error reading configuration.";
ret = PCMK_OCF_UNKNOWN_ERROR;
goto quit;
}
if (!local) {
reason = "No Service IP active here.";
goto quit;
}
rv = _lockfile(O_RDWR, &lock_fd, &pid);
if (rv == 0) {
reason = "PID file not locked.";
goto quit;
}
if (lock_fd == -1) {
reason = "No PID file.";
goto quit;
}
if (pid) {
fprintf(stdout, "booth_lockpid=%d ", pid);
fflush(stdout);
}
rv = read(lock_fd, lockfile_data, sizeof(lockfile_data) - 1);
if (rv < 4) {
reason = "Cannot read lockfile data.";
ret = PCMK_LSB_UNKNOWN_ERROR;
goto quit;
}
lockfile_data[rv] = 0;
if (lock_fd != -1)
close(lock_fd);
/* Make sure it's only a single line */
cp = strchr(lockfile_data, '\r');
if (cp)
*cp = 0;
cp = strchr(lockfile_data, '\n');
if (cp)
*cp = 0;
rv = setup_tcp_listener(1);
if (rv == 0) {
reason = "TCP port not in use.";
goto quit;
}
fprintf(stdout, "booth_lockfile='%s' %s\n",
cl.lockfile, lockfile_data);
if (daemonize)
fprintf(stderr, "Booth at %s port %d seems to be running.\n",
local->addr_string, booth_conf->port);
return 0;
quit:
log_debug("not running: %s", reason);
/* Ie. "DEBUG" */
if (daemonize)
fprintf(stderr, "not running: %s\n", reason);
return ret;
}
static int limit_this_process(void)
{
int rv;
if (!is_root())
return 0;
if (setregid(booth_conf->gid, booth_conf->gid) < 0) {
rv = errno;
log_error("setregid() didn't work: %s", strerror(rv));
return rv;
}
if (setreuid(booth_conf->uid, booth_conf->uid) < 0) {
rv = errno;
log_error("setreuid() didn't work: %s", strerror(rv));
return rv;
}
/* TODO: ulimits? But that would restrict crm_ticket and handler
* scripts, too! */
return 0;
}
static int lock_fd = -1;
static void server_exit(void)
{
int rv;
if (lock_fd >= 0) {
/* We might not be able to delete it, but at least
* make it empty. */
rv = ftruncate(lock_fd, 0);
(void)rv;
unlink_lockfile(lock_fd);
}
log_info("exiting");
}
static void sig_exit_handler(int sig)
{
log_info("caught signal %d", sig);
exit(0);
}
static int do_server(int type)
{
int rv = -1;
static char log_ent[128] = DAEMON_NAME "-";
rv = setup_config(type);
if (rv < 0)
return rv;
if (!local) {
log_error("Cannot find myself in the configuration.");
exit(EXIT_FAILURE);
}
if (!daemonize) {
if (daemon(0, 0) < 0) {
perror("daemon error");
exit(EXIT_FAILURE);
}
}
/* The lockfile must be written to _after_ the call to daemon(), so
* that the lockfile contains the pid of the daemon, not the parent. */
lock_fd = create_lockfile();
if (lock_fd < 0)
return lock_fd;
atexit(server_exit);
strcat(log_ent, type_to_string(local->type));
cl_log_set_entity(log_ent);
cl_log_enable_stderr(enable_stderr ? TRUE : FALSE);
cl_log_set_facility(HA_LOG_FACILITY);
cl_inherit_logging_environment(0);
log_info("BOOTH %s daemon is starting, node id is 0x%08X (%d).",
type_to_string(local->type),
local->site_id, local->site_id);
signal(SIGUSR1, (__sighandler_t)tickets_log_info);
signal(SIGTERM, (__sighandler_t)sig_exit_handler);
signal(SIGINT, (__sighandler_t)sig_exit_handler);
set_scheduler();
set_oom_adj(-16);
set_proc_title("%s %s for [%s]:%d",
DAEMON_NAME,
type_to_string(local->type),
local->addr_string,
booth_conf->port);
rv = limit_this_process();
if (rv)
return rv;
if (cl_enable_coredumps(TRUE) < 0){
cl_log(LOG_ERR, "enabling core dump failed");
}
cl_cdtocoredir();
prctl(PR_SET_DUMPABLE, (unsigned long)TRUE, 0UL, 0UL, 0UL);
rv = loop(lock_fd);
return rv;
}
static int do_client(void)
{
int rv = -1;
rv = setup_config(CLIENT);
if (rv < 0) {
log_error("cannot read config");
goto out;
}
switch (cl.op) {
case CMD_LIST:
rv = query_get_string_answer(CMD_LIST);
break;
case CMD_GRANT:
rv = do_grant();
break;
case CMD_REVOKE:
rv = do_revoke();
break;
}
out:
return rv;
}
int main(int argc, char *argv[], char *envp[])
{
int rv;
init_set_proc_title(argc, argv, envp);
- time(&start_time);
+ get_secs(&start_time);
memset(&cl, 0, sizeof(cl));
strncpy(cl.configfile,
BOOTH_DEFAULT_CONF, BOOTH_PATH_LEN - 1);
cl.lockfile[0] = 0;
debug_level = 0;
cl_log_set_entity("booth");
cl_log_enable_stderr(TRUE);
cl_log_set_facility(0);
rv = read_arguments(argc, argv);
if (rv < 0)
goto out;
switch (cl.type) {
case STATUS:
rv = do_status(cl.type);
break;
case ARBITRATOR:
case DAEMON:
case SITE:
rv = do_server(cl.type);
break;
case CLIENT:
rv = do_client();
break;
}
out:
/* Normalize values. 0x100 would be seen as "OK" by waitpid(). */
return (rv >= 0 && rv < 0x70) ? rv : 1;
}
diff --git a/src/pacemaker.c b/src/pacemaker.c
index 24a3700..25a7e56 100644
--- a/src/pacemaker.c
+++ b/src/pacemaker.c
@@ -1,340 +1,340 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <inttypes.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "log.h"
#include "pacemaker.h"
#include "inline-fn.h"
enum atomic_ticket_supported {
YES=0,
NO,
FILENOTFOUND, /* Ie. UNKNOWN */
UNKNOWN = FILENOTFOUND,
};
/* http://thedailywtf.com/Articles/What_Is_Truth_0x3f_.aspx */
enum atomic_ticket_supported atomicity = UNKNOWN;
#define COMMAND_MAX 1024
/** Determines whether the installed crm_ticket can do atomic ticket grants,
* _including_ multiple attribute changes.
*
* See
* https://bugzilla.novell.com/show_bug.cgi?id=855099
*
* Run "crm_ticket" without "--force";
* - the old version asks for "Y/N" via STDIN, and returns 0
* when reading "no";
* - the new version just reports an error without asking.
*/
static void test_atomicity(void)
{
int rv;
if (atomicity != UNKNOWN)
return;
rv = system("echo n | crm_ticket -g -t any-ticket-name > /dev/null 2> /dev/null");
if (rv == -1) {
log_error("Cannot run \"crm_ticket\"!");
/* BIG problem. Abort. */
exit(1);
}
if (WIFSIGNALED(rv)) {
log_error("\"crm_ticket\" terminated by a signal!");
/* Problem. Abort. */
exit(1);
}
switch (WEXITSTATUS(rv)) {
case 0:
atomicity = NO;
log_info("Old \"crm_ticket\" found, using non-atomic ticket updates.");
break;
case 1:
atomicity = YES;
log_info("New \"crm_ticket\" found, using atomic ticket updates.");
break;
default:
log_error("Unexpected return value from \"crm_ticket\" (%d), "
"falling back to non-atomic ticket updates.",
rv);
atomicity = NO;
}
assert(atomicity == YES || atomicity == NO);
}
const char * interpret_rv(int rv)
{
static char text[64];
int p;
if (rv == 0)
return "0";
p = sprintf(text, "rv %d", WEXITSTATUS(rv));
if (WIFSIGNALED(rv))
sprintf(text + p, " signal %d", WTERMSIG(rv));
return text;
}
static int pcmk_write_ticket_atomic(struct ticket_config *tk, int grant)
{
char cmd[COMMAND_MAX];
int rv;
/* The values are appended to "-v", so that NO_ONE
* (which is -1) isn't seen as another option. */
snprintf(cmd, COMMAND_MAX,
"crm_ticket -t '%s' "
"%s --force "
"-S owner -v%" PRIi32 " "
"-S expires -v%" PRIi64 " "
"-S term -v%" PRIi64,
tk->name,
(grant > 0 ? "-g" :
grant < 0 ? "-r" :
""),
(int32_t)get_node_id(tk->leader),
- (int64_t)tk->term_expires,
+ (int64_t)wall_ts(tk->term_expires),
(int64_t)tk->current_term);
rv = system(cmd);
log_debug("command: '%s' was executed", cmd);
if (rv != 0)
log_error("error: \"%s\" failed, %s", cmd, interpret_rv(rv));
return rv;
}
static int pcmk_store_ticket_nonatomic(struct ticket_config *tk);
static int pcmk_grant_ticket(struct ticket_config *tk)
{
char cmd[COMMAND_MAX];
int rv;
test_atomicity();
if (atomicity == YES)
return pcmk_write_ticket_atomic(tk, +1);
rv = pcmk_store_ticket_nonatomic(tk);
if (rv)
return rv;
snprintf(cmd, COMMAND_MAX, "crm_ticket -t %s -g --force",
tk->name);
log_debug("command: '%s' was executed", cmd);
rv = system(cmd);
if (rv != 0)
log_error("error: \"%s\" failed, %s", cmd, interpret_rv(rv));
return rv;
}
static int pcmk_revoke_ticket(struct ticket_config *tk)
{
char cmd[COMMAND_MAX];
int rv;
test_atomicity();
if (atomicity == YES)
return pcmk_write_ticket_atomic(tk, -1);
rv = pcmk_store_ticket_nonatomic(tk);
if (rv)
return rv;
snprintf(cmd, COMMAND_MAX, "crm_ticket -t %s -r --force",
tk->name);
log_debug("command: '%s' was executed", cmd);
rv = system(cmd);
if (rv != 0)
log_error("error: \"%s\" failed, %s", cmd, interpret_rv(rv));
return rv;
}
static int crm_ticket_set(const struct ticket_config *tk, const char *attr, int64_t val)
{
char cmd[COMMAND_MAX];
int i, rv;
snprintf(cmd, COMMAND_MAX,
"crm_ticket -t '%s' -S '%s' -v %" PRIi64,
tk->name, attr, val);
/* If there are errors, there's not much we can do but retry ... */
for (i=0; i<3 &&
(rv = system(cmd));
i++) ;
log_debug("'%s' gave result %s", cmd, interpret_rv(rv));
return rv;
}
static int pcmk_store_ticket_nonatomic(struct ticket_config *tk)
{
int rv;
/* Always try to store *each* attribute, even if there's an error
* for one of them. */
rv = crm_ticket_set(tk, "owner", (int32_t)get_node_id(tk->leader));
- rv = crm_ticket_set(tk, "expires", tk->term_expires) || rv;
+ rv = crm_ticket_set(tk, "expires", wall_ts(tk->term_expires)) || rv;
rv = crm_ticket_set(tk, "term", tk->current_term) || rv;
if (rv)
log_error("setting crm_ticket attributes failed; %s",
interpret_rv(rv));
else
log_info("setting crm_ticket attributes successful");
return rv;
}
static int crm_ticket_get(struct ticket_config *tk,
const char *attr, int64_t *data)
{
char cmd[COMMAND_MAX];
char line[256];
int rv;
int64_t v;
FILE *p;
*data = -1;
v = 0;
snprintf(cmd, COMMAND_MAX,
"crm_ticket -t '%s' -G '%s' --quiet",
tk->name, attr);
p = popen(cmd, "r");
if (p == NULL) {
rv = errno;
log_error("popen error %d (%s) for \"%s\"",
rv, strerror(rv), cmd);
return rv || -EINVAL;
}
if (fgets(line, sizeof(line) - 1, p) == NULL) {
rv = ENODATA;
goto out;
}
rv = EINVAL;
if (!strncmp(line, "false", 5)) {
v = 0;
rv = 0;
} else if (!strncmp(line, "true", 4)) {
v = 1;
rv = 0;
} else if (sscanf(line, "%" PRIi64, &v) == 1) {
rv = 0;
}
*data = v;
out:
rv = pclose(p);
log_debug("command \"%s\" returned %s, value %" PRIi64, cmd, interpret_rv(rv), v);
return rv;
}
static int pcmk_load_ticket(struct ticket_config *tk)
{
int rv;
int64_t v;
/* This here gets run during startup; testing that here means that
* normal operation won't be interrupted with that test. */
test_atomicity();
rv = crm_ticket_get(tk, "expires", &v);
if (!rv) {
- tk->term_expires = v;
+ tk->term_expires = unwall_ts(v);
}
rv = crm_ticket_get(tk, "term", &v);
if (!rv) {
tk->current_term = v;
}
rv = crm_ticket_get(tk, "granted", &v);
if (!rv) {
tk->is_granted = v;
}
rv = crm_ticket_get(tk, "owner", &v);
if (!rv) {
/* No check, node could have been deconfigured. */
if (!find_site_by_id(v, &tk->leader)) {
/* Hmm, no site found for the ticket we have in the
* CIB!?
* Assume that the ticket belonged to us if it was
* granted here!
*/
tk_log_warn("no site matches; site got reconfigured?");
if (tk->is_granted) {
tk_log_warn("granted here, assume it belonged to us");
tk->leader = local;
}
}
}
return rv;
}
struct ticket_handler pcmk_handler = {
.grant_ticket = pcmk_grant_ticket,
.revoke_ticket = pcmk_revoke_ticket,
.load_ticket = pcmk_load_ticket,
};
diff --git a/src/raft.c b/src/raft.c
index edbae04..b31be63 100644
--- a/src/raft.c
+++ b/src/raft.c
@@ -1,930 +1,931 @@
/*
* Copyright (C) 2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <inttypes.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include "booth.h"
+#include "timer.h"
#include "transport.h"
#include "inline-fn.h"
#include "config.h"
#include "raft.h"
#include "ticket.h"
#include "log.h"
inline static void clear_election(struct ticket_config *tk)
{
int i;
struct booth_site *site;
tk_log_debug("clear election");
tk->votes_received = 0;
foreach_node(i, site)
tk->votes_for[site->index] = NULL;
}
inline static void record_vote(struct ticket_config *tk,
struct booth_site *who,
struct booth_site *vote)
{
tk_log_debug("site %s votes for %s",
site_string(who),
site_string(vote));
if (!tk->votes_for[who->index]) {
tk->votes_for[who->index] = vote;
tk->votes_received |= who->bitmask;
} else {
if (tk->votes_for[who->index] != vote)
tk_log_warn("%s voted previously "
"for %s and now wants to vote for %s (ignored)",
site_string(who),
site_string(tk->votes_for[who->index]),
site_string(vote));
}
}
static int cmp_msg_ticket(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg)
{
if (my_last_term(tk) != ntohl(msg->ticket.term)) {
return my_last_term(tk) - ntohl(msg->ticket.term);
}
return 0;
}
static void update_term_from_msg(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
uint32_t i;
i = ntohl(msg->ticket.term);
/* if we failed to start the election, then accept the term
* from the leader
* */
if (tk->state == ST_CANDIDATE) {
tk->current_term = i;
} else {
tk->current_term = max(i, tk->current_term);
}
}
static void update_ticket_from_msg(struct ticket_config *tk,
struct booth_site *sender,
struct boothc_ticket_msg *msg)
{
int duration;
tk_log_debug("updating from %s (%d/%d)",
site_string(sender),
ntohl(msg->ticket.term), ntohl(msg->ticket.term_valid_for));
duration = min(tk->term_duration, ntohl(msg->ticket.term_valid_for));
- tk->term_expires = time(NULL) + duration;
+ tk->term_expires = get_secs(NULL) + duration;
update_term_from_msg(tk, msg);
}
static void copy_ticket_from_msg(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
- tk->term_expires = time(NULL) + ntohl(msg->ticket.term_valid_for);
+ tk->term_expires = get_secs(NULL) + ntohl(msg->ticket.term_valid_for);
tk->current_term = ntohl(msg->ticket.term);
}
static void become_follower(struct ticket_config *tk,
struct boothc_ticket_msg *msg)
{
copy_ticket_from_msg(tk, msg);
tk->state = ST_FOLLOWER;
tk->delay_commit = 0;
tk->in_election = 0;
/* if we're following and the ticket was granted here
* then commit to CIB right away (we're probably restarting)
*/
if (tk->is_granted) {
disown_ticket(tk);
ticket_write(tk);
}
}
static void won_elections(struct ticket_config *tk)
{
tk->leader = local;
tk->state = ST_LEADER;
- tk->term_expires = time(NULL) + tk->term_duration;
+ tk->term_expires = get_secs(NULL) + tk->term_duration;
tk->election_end = 0;
tk->voted_for = NULL;
ticket_broadcast(tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0);
ticket_activate_timeout(tk);
}
static int is_tie(struct ticket_config *tk)
{
int i;
struct booth_site *v;
int count[MAX_NODES] = { 0, };
int max_votes = 0, max_cnt = 0;
for(i=0; i<booth_conf->site_count; i++) {
v = tk->votes_for[i];
if (!v)
continue;
count[v->index]++;
max_votes = max(max_votes, count[v->index]);
}
for(i=0; i<booth_conf->site_count; i++) {
if (count[i] == max_votes)
max_cnt++;
}
return max_cnt > 1;
}
static struct booth_site *majority_votes(struct ticket_config *tk)
{
int i, n;
struct booth_site *v;
int count[MAX_NODES] = { 0, };
for(i=0; i<booth_conf->site_count; i++) {
v = tk->votes_for[i];
if (!v)
continue;
n = v->index;
count[n]++;
tk_log_debug("Majority: %d %s wants %d %s => %d",
i, site_string(&booth_conf->site[i]),
n, site_string(v),
count[n]);
if (count[n]*2 <= booth_conf->site_count)
continue;
tk_log_debug("Majority reached: %d of %d for %s",
count[n], booth_conf->site_count,
site_string(v));
return v;
}
return NULL;
}
void elections_end(struct ticket_config *tk)
{
time_t now;
struct booth_site *new_leader;
- now = time(NULL);
+ now = get_secs(NULL);
if (now > tk->election_end) {
/* This is previous election timed out */
tk_log_info("elections finished");
}
tk->in_election = 0;
new_leader = majority_votes(tk);
if (new_leader == local) {
tk_log_info("granted successfully here");
won_elections(tk);
} else if (new_leader) {
tk_log_info("ticket granted at %s",
site_string(new_leader));
} else {
tk_log_info("nobody won elections, new elections");
if (!new_election(tk, NULL, is_tie(tk), OR_AGAIN)) {
ticket_activate_timeout(tk);
}
}
}
static int newer_term(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg,
int in_election)
{
uint32_t term;
/* it may happen that we hear about our newer term */
if (leader == local)
return 0;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (term > tk->current_term) {
tk->state = ST_FOLLOWER;
if (!in_election) {
tk->leader = leader;
tk_log_info("from %s: higher term %d vs. %d, following %s",
site_string(sender),
term, tk->current_term,
ticket_leader_string(tk));
} else {
tk_log_debug("from %s: higher term %d vs. %d (election)",
site_string(sender),
term, tk->current_term);
}
tk->current_term = term;
return 1;
}
return 0;
}
static int term_too_low(struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg)
{
uint32_t term;
term = ntohl(msg->ticket.term);
/* §5.1 */
if (term < tk->current_term) {
tk_log_info("sending reject to %s, its term too low "
"(%d vs. %d)", site_string(sender),
term, tk->current_term
);
send_reject(sender, tk, RLT_TERM_OUTDATED, msg);
return 1;
}
return 0;
}
/* For follower. */
static int answer_HEARTBEAT (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
term = ntohl(msg->ticket.term);
tk_log_debug("heartbeat from leader: %s, have %s; term %d vs %d",
site_string(leader), ticket_leader_string(tk),
term, tk->current_term);
if (term < tk->current_term) {
if (sender == tk->leader) {
tk_log_info("trusting leader %s with a lower term (%d vs %d)",
site_string(leader), term, tk->current_term);
} else if (is_owned(tk)) {
tk_log_warn("different leader %s with a lower term "
"(%d vs %d), sending reject",
site_string(leader), term, tk->current_term);
return send_reject(sender, tk, RLT_TERM_OUTDATED, msg);
}
}
/* got heartbeat, no rejects expected anymore */
tk->expect_more_rejects = 0;
/* Needed? */
newer_term(tk, sender, leader, msg, 0);
become_follower(tk, msg);
/* Racy??? */
assert(sender == leader || !leader);
tk->leader = leader;
/* Ack the heartbeat (we comply). */
return send_msg(OP_ACK, tk, sender, msg);
}
static int process_UPDATE (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
if (is_owned(tk) && sender != tk->leader) {
tk_log_warn("different leader %s wants to update "
"our ticket, sending reject",
site_string(leader));
return send_reject(sender, tk, RLT_TERM_OUTDATED, msg);
}
tk_log_debug("leader %s wants to update our ticket",
site_string(leader));
tk->leader = leader;
copy_ticket_from_msg(tk, msg);
ticket_write(tk);
/* run ticket_cron if the ticket expires */
set_ticket_wakeup(tk);
return send_msg(OP_ACK, tk, sender, msg);
}
static int process_REVOKE (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int rv;
if (tk->state == ST_INIT && tk->leader == no_leader) {
/* assume that our ack got lost */
rv = send_msg(OP_ACK, tk, sender, msg);
} else if (tk->leader != sender) {
tk_log_error("%s wants to revoke ticket, "
"but it is not granted there (ignoring)",
site_string(sender));
return 1;
} else if (tk->state != ST_FOLLOWER) {
tk_log_error("unexpected ticket revoke from %s "
"(in state %s) (ignoring)",
site_string(sender),
state_to_string(tk->state));
return 1;
} else {
tk_log_info("%s revokes ticket",
site_string(tk->leader));
reset_ticket(tk);
tk->leader = no_leader;
ticket_write(tk);
rv = send_msg(OP_ACK, tk, sender, msg);
}
return rv;
}
/* For leader. */
static int process_ACK(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t term;
term = ntohl(msg->ticket.term);
if (newer_term(tk, sender, leader, msg, 0)) {
/* unexpected higher term */
tk_log_warn("got higher term from %s (%d vs. %d)",
site_string(sender),
term, tk->current_term);
return 0;
}
/* Don't send a reject. */
if (term < tk->current_term) {
/* Doesn't know what he's talking about - perhaps
* doesn't receive our packets? */
tk_log_warn("unexpected term "
"from %s (%d vs. %d) (ignoring)",
site_string(sender),
term, tk->current_term);
return 0;
}
/* if the ticket is to be revoked, further processing is not
* interesting (and dangerous) */
if (tk->next_state == ST_INIT || tk->state == ST_INIT)
return 0;
/* for heartbeats we make do with the majority */
if (tk->last_request == OP_HEARTBEAT &&
term == tk->current_term &&
leader == tk->leader) {
if (majority_of_bits(tk, tk->acks_received)) {
/* OK, at least half of the nodes are reachable;
* Update the ticket and send update messages out
*/
return leader_update_ticket(tk);
}
}
return 0;
}
static int process_VOTE_FOR(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
/* leader wants to step down? */
if (leader == no_leader && sender == tk->leader &&
(tk->state == ST_FOLLOWER || tk->state == ST_CANDIDATE)) {
tk_log_info("%s wants to give the ticket away",
site_string(tk->leader));
reset_ticket(tk);
tk->state = ST_FOLLOWER;
if (local->type == SITE) {
ticket_write(tk);
schedule_election(tk, OR_STEPDOWN);
}
return 0;
}
if (tk->state != ST_CANDIDATE) {
/* lost candidate status, somebody rejected our proposal */
tk_log_debug("candidate status lost, ignoring vote_for from %s",
site_string(sender));
return 0;
}
if (term_too_low(tk, sender, leader, msg))
return 0;
if (newer_term(tk, sender, leader, msg, 0)) {
clear_election(tk);
}
record_vote(tk, sender, leader);
/* only if all voted can we take the ticket now, otherwise
* wait for timeout in ticket_cron */
if (!tk->acks_expected) {
/* §5.2 */
elections_end(tk);
}
return 0;
}
static int process_REJECTED(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t rv;
rv = ntohl(msg->header.result);
if (tk->state == ST_CANDIDATE &&
leader == local) {
/* the sender has us as the leader (!)
* the elections will time out, then we can try again
*/
tk_log_warn("ticket was granted to us "
"(and we didn't know)");
tk->expect_more_rejects = 1;
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_OUTDATED) {
tk_log_warn("ticket outdated (term %d), granted to %s",
ntohl(msg->ticket.term),
site_string(leader)
);
tk->leader = leader;
tk->expect_more_rejects = 1;
become_follower(tk, msg);
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_TERM_STILL_VALID) {
if (tk->lost_leader == leader) {
if (tk->election_reason == OR_TKT_LOST) {
tk_log_warn("%s still has the ticket valid, "
"we'll backup a bit",
site_string(sender));
} else {
tk_log_warn("%s unexpectedly rejects elections",
site_string(sender));
}
} else {
tk_log_warn("ticket was granted to %s "
"(and we didn't know)",
site_string(leader));
}
tk->leader = leader;
become_follower(tk, msg);
tk->expect_more_rejects = 1;
return 0;
}
if (tk->state == ST_CANDIDATE &&
rv == RLT_YOU_OUTDATED) {
tk->leader = leader;
tk->expect_more_rejects = 1;
if (leader && leader != no_leader) {
tk_log_warn("our ticket is outdated, granted to %s",
site_string(leader));
become_follower(tk, msg);
} else {
tk_log_warn("our ticket is outdated and revoked");
update_ticket_from_msg(tk, sender, msg);
tk->state = ST_INIT;
}
return 0;
}
if (!tk->expect_more_rejects) {
tk_log_warn("from %s: in state %s, got %s (unexpected reject)",
site_string(sender),
state_to_string(tk->state),
state_to_string(rv));
}
return 0;
}
static int ticket_seems_ok(struct ticket_config *tk)
{
int time_left;
time_left = term_time_left(tk);
if (!time_left)
return 0; /* quite sure */
if (tk->state == ST_CANDIDATE)
return 0; /* in state of flux */
if (tk->state == ST_LEADER)
return 1; /* quite sure */
if (tk->state == ST_FOLLOWER &&
time_left >= tk->term_duration/3)
return 1; /* almost quite sure */
return 0;
}
static int test_reason(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int reason;
reason = ntohl(msg->header.reason);
if (reason == OR_TKT_LOST) {
if (tk->state == ST_INIT &&
tk->leader == no_leader) {
tk_log_warn("%s claims that the ticket is lost, "
"but it's in %s state (reject sent)",
site_string(sender),
state_to_string(tk->state)
);
return RLT_YOU_OUTDATED;
}
if (ticket_seems_ok(tk)) {
tk_log_warn("%s claims that the ticket is lost, "
"but it is ok here (reject sent)",
site_string(sender));
return RLT_TERM_STILL_VALID;
}
}
return 0;
}
/* §5.2 */
static int answer_REQ_VOTE(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int valid;
struct boothc_ticket_msg omsg;
cmd_result_t inappr_reason;
inappr_reason = test_reason(tk, sender, leader, msg);
if (inappr_reason)
return send_reject(sender, tk, inappr_reason, msg);
valid = term_time_left(tk);
/* allow the leader to start new elections on valid tickets */
if (sender != tk->leader && valid) {
tk_log_warn("election from %s rejected "
"(we have %s as ticket owner), ticket still valid for %ds",
site_string(sender), site_string(tk->leader), valid);
return send_reject(sender, tk, RLT_TERM_STILL_VALID, msg);
}
if (term_too_low(tk, sender, leader, msg))
return 0;
/* set this, so that we know not to send status for the
* ticket */
tk->in_election = 1;
/* if it's a newer term or ... */
if (newer_term(tk, sender, leader, msg, 1)) {
clear_election(tk);
goto vote_for_sender;
}
/* ... we didn't vote yet, then vote for the sender */
/* §5.2, §5.4 */
if (!tk->voted_for) {
vote_for_sender:
tk->voted_for = sender;
record_vote(tk, sender, leader);
}
init_ticket_msg(&omsg, OP_VOTE_FOR, OP_REQ_VOTE, RLT_SUCCESS, 0, tk);
omsg.ticket.leader = htonl(get_node_id(tk->voted_for));
return booth_udp_send(sender, &omsg, sizeof(omsg));
}
int new_election(struct ticket_config *tk,
struct booth_site *preference, int update_term, cmd_reason_t reason)
{
struct booth_site *new_leader;
time_t now;
if (local->type != SITE)
return 0;
- time(&now);
+ get_secs(&now);
tk_log_debug("start new election?, now=%" PRIi64 ", end %" PRIi64,
- (int64_t)now, (int64_t)(tk->election_end));
+ (int64_t)wall_ts(now), (int64_t)(wall_ts(tk->election_end)));
if (now < tk->election_end)
return 1;
/* §5.2 */
/* If there was _no_ answer, don't keep incrementing the term number
* indefinitely. If there was no peer, there'll probably be no one
* listening now either. However, we don't know if we were
* invoked due to a timeout (caller does).
*/
if (update_term) {
/* save the previous term, we may need to send out the
* MY_INDEX message */
if (tk->state != ST_CANDIDATE) {
memcpy(tk->last_valid_tk, tk, sizeof(struct ticket_config));
}
tk->current_term++;
}
tk->term_expires = 0;
tk->election_end = now + tk->timeout;
tk->in_election = 1;
tk_log_info("starting new election (term=%d)",
tk->current_term);
clear_election(tk);
if(preference)
new_leader = preference;
else
new_leader = (local->type == SITE) ? local : NULL;
record_vote(tk, local, new_leader);
tk->voted_for = new_leader;
tk->leader = no_leader;
tk->state = ST_CANDIDATE;
/* some callers may want just to repeat on timeout */
if (reason == OR_AGAIN) {
reason = tk->election_reason;
} else {
tk->election_reason = reason;
}
ticket_broadcast(tk, OP_REQ_VOTE, OP_VOTE_FOR, RLT_SUCCESS, reason);
ticket_activate_timeout(tk);
add_random_delay(tk);
return 0;
}
/* we were a leader and somebody says that they have a more up
* to date ticket
* there was probably connectivity loss
* tricky
*/
static int leader_handle_newer_ticket(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
update_term_from_msg(tk, msg);
if (leader != no_leader && leader && leader != local) {
/* eek, two leaders, split brain */
/* normally shouldn't happen; run election */
tk_log_error("from %s: ticket granted to %s! (revoking locally)",
site_string(sender),
site_string(leader)
);
} else if (term_time_left(tk)) {
/* eek, two leaders, split brain */
/* normally shouldn't happen; run election */
tk_log_error("from %s: ticket granted to %s! (revoking locally)",
site_string(sender),
site_string(leader)
);
}
tk->next_state = ST_LEADER;
return 0;
}
/* reply to STATUS */
static int process_MY_INDEX (
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int i;
int expired;
expired = !msg->ticket.term_valid_for;
i = cmp_msg_ticket(tk, sender, leader, msg);
if (i > 0) {
/* let them know about our newer ticket */
/* but if we're voting in elections, our ticket is not
* valid yet, don't send it */
if (!tk->in_election)
send_msg(OP_MY_INDEX, tk, sender, msg);
if (tk->state == ST_LEADER) {
tk_log_info("sending ticket update to %s",
site_string(sender));
return send_msg(OP_UPDATE, tk, sender, msg);
}
}
/* we have a newer or equal ticket and theirs is expired,
* nothing more to do here */
if (i >= 0 && expired) {
return 0;
}
if (tk->state == ST_LEADER) {
/* we're the leader, thread carefully */
if (expired) {
/* if their ticket is expired,
* nothing more to do */
return 0;
}
if (i < 0) {
/* they have a newer ticket, trouble if we're already leader
* for it */
tk_log_warn("from %s: more up to date ticket at %s",
site_string(sender),
site_string(leader)
);
return leader_handle_newer_ticket(tk, sender, leader, msg);
} else {
/* we have the ticket and we don't care */
return 0;
}
}
/* their ticket is either newer or not expired, don't
* ignore it */
update_ticket_from_msg(tk, sender, msg);
tk->leader = leader;
update_ticket_state(tk, sender);
set_ticket_wakeup(tk);
return 0;
}
int raft_answer(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
int cmd, req;
int rv;
rv = 0;
cmd = ntohl(msg->header.cmd);
req = ntohl(msg->header.request);
if (req)
tk_log_debug("got %s (req %s) from %s",
state_to_string(cmd),
state_to_string(req),
site_string(sender));
else
tk_log_debug("got %s from %s",
state_to_string(cmd),
site_string(sender));
switch (cmd) {
case OP_REQ_VOTE:
rv = answer_REQ_VOTE(tk, sender, leader, msg);
break;
case OP_VOTE_FOR:
rv = process_VOTE_FOR(tk, sender, leader, msg);
break;
case OP_ACK:
if (tk->leader == local &&
tk->state == ST_LEADER)
rv = process_ACK(tk, sender, leader, msg);
break;
case OP_HEARTBEAT:
if ((tk->leader != local || !term_time_left(tk)) &&
(tk->state == ST_INIT || tk->state == ST_FOLLOWER ||
tk->state == ST_CANDIDATE))
rv = answer_HEARTBEAT(tk, sender, leader, msg);
else {
tk_log_warn("unexpected message %s, from %s",
state_to_string(cmd),
site_string(sender));
if (ticket_seems_ok(tk))
send_reject(sender, tk, RLT_TERM_STILL_VALID, msg);
rv = -EINVAL;
}
break;
case OP_UPDATE:
if (((tk->leader != local && tk->leader == leader) || !is_owned(tk)) &&
(tk->state == ST_INIT || tk->state == ST_FOLLOWER ||
tk->state == ST_CANDIDATE)) {
rv = process_UPDATE(tk, sender, leader, msg);
} else {
tk_log_warn("unexpected message %s, from %s",
state_to_string(cmd),
site_string(sender));
if (ticket_seems_ok(tk))
send_reject(sender, tk, RLT_TERM_STILL_VALID, msg);
rv = -EINVAL;
}
break;
case OP_REJECTED:
rv = process_REJECTED(tk, sender, leader, msg);
break;
case OP_REVOKE:
rv = process_REVOKE(tk, sender, leader, msg);
break;
case OP_MY_INDEX:
rv = process_MY_INDEX(tk, sender, leader, msg);
break;
case OP_STATUS:
if (!tk->in_election)
rv = send_msg(OP_MY_INDEX, tk, sender, msg);
break;
default:
tk_log_error("unknown message %s, from %s",
state_to_string(cmd), site_string(sender));
rv = -EINVAL;
}
return rv;
}
diff --git a/src/ticket.c b/src/ticket.c
index 7702f46..72b5e09 100644
--- a/src/ticket.c
+++ b/src/ticket.c
@@ -1,1079 +1,1076 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <inttypes.h>
#include <stdio.h>
#include <assert.h>
#include <time.h>
#include <clplumbing/cl_random.h>
#include "ticket.h"
#include "config.h"
#include "pacemaker.h"
#include "inline-fn.h"
#include "log.h"
#include "booth.h"
#include "raft.h"
#include "handler.h"
#define TK_LINE 256
/* Untrusted input, must fit (incl. \0) in a buffer of max chars. */
int check_max_len_valid(const char *s, int max)
{
int i;
for(i=0; i<max; i++)
if (s[i] == 0)
return 1;
return 0;
}
int find_ticket_by_name(const char *ticket, struct ticket_config **found)
{
int i;
if (found)
*found = NULL;
for (i = 0; i < booth_conf->ticket_count; i++) {
if (!strcmp(booth_conf->ticket[i].name, ticket)) {
if (found)
*found = booth_conf->ticket + i;
return 1;
}
}
return 0;
}
int check_ticket(char *ticket, struct ticket_config **found)
{
if (found)
*found = NULL;
if (!booth_conf)
return 0;
if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name)))
return 0;
return find_ticket_by_name(ticket, found);
}
int check_site(char *site, int *is_local)
{
struct booth_site *node;
if (!check_max_len_valid(site, sizeof(node->addr_string)))
return 0;
if (find_site_by_name(site, &node, 0)) {
*is_local = node->local;
return 1;
}
return 0;
}
int ticket_write(struct ticket_config *tk)
{
if (local->type != SITE)
return -EINVAL;
if (tk->leader == local) {
pcmk_handler.grant_ticket(tk);
} else {
pcmk_handler.revoke_ticket(tk);
}
tk->update_cib = 0;
return 0;
}
/* Ask an external program whether getting the ticket
* makes sense.
* Eg. if the services have a failcount of INFINITY,
* we can't serve here anyway. */
int test_external_prog(struct ticket_config *tk,
int start_election)
{
int rv;
rv = run_handler(tk, tk->ext_verifier, 1);
if (rv) {
tk_log_warn("we are not allowed to acquire ticket");
/* Give it to somebody else.
* Just send a VOTE_FOR message, so the
* others can start elections. */
if (leader_and_valid(tk)) {
reset_ticket(tk);
ticket_write(tk);
if (start_election) {
ticket_broadcast(tk, OP_VOTE_FOR, OP_REQ_VOTE, RLT_SUCCESS, OR_LOCAL_FAIL);
}
}
}
return rv;
}
/* Try to acquire a ticket
* Could be manual grant or after ticket loss
*/
int acquire_ticket(struct ticket_config *tk, cmd_reason_t reason)
{
if (test_external_prog(tk, 0))
return RLT_EXT_FAILED;
return new_election(tk, local, 1, reason);
}
/** Try to get the ticket for the local site.
* */
int do_grant_ticket(struct ticket_config *tk, int options)
{
int rv;
tk_log_info("granting ticket");
if (tk->leader == local)
return RLT_SUCCESS;
if (is_owned(tk))
return RLT_OVERGRANT;
- tk->delay_commit = time(NULL) +
+ tk->delay_commit = get_secs(NULL) +
tk->term_duration + tk->acquire_after;
if (options & OPT_IMMEDIATE) {
tk_log_warn("granting ticket immediately! If there are "
"unreachable sites, _hope_ you are sure that they don't "
"have the ticket!");
tk->delay_commit = 0;
}
rv = acquire_ticket(tk, OR_ADMIN);
if (rv)
tk->delay_commit = 0;
return rv;
}
static int start_revoke_ticket(struct ticket_config *tk)
{
tk_log_info("revoking ticket");
reset_ticket(tk);
tk->leader = no_leader;
ticket_write(tk);
ticket_activate_timeout(tk);
return ticket_broadcast(tk, OP_REVOKE, OP_ACK, RLT_SUCCESS, OR_ADMIN);
}
/** Ticket revoke.
* Only to be started from the leader. */
int do_revoke_ticket(struct ticket_config *tk)
{
if (tk->acks_expected) {
tk_log_info("delay ticket revoke until the current operation finishes");
tk->next_state = ST_INIT;
return 0;
} else {
return start_revoke_ticket(tk);
}
}
int list_ticket(char **pdata, unsigned int *len)
{
struct ticket_config *tk;
char timeout_str[64];
char pending_str[64];
char *data, *cp;
int i, alloc;
+ time_t ts;
*pdata = NULL;
*len = 0;
alloc = 256 +
booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128);
data = malloc(alloc);
if (!data)
return -ENOMEM;
cp = data;
foreach_ticket(i, tk) {
- if (tk->term_expires != 0)
+ if (tk->term_expires != 0) {
+ ts = wall_ts(tk->term_expires);
strftime(timeout_str, sizeof(timeout_str), "%F %T",
- localtime(&tk->term_expires));
- else
+ localtime(&ts));
+ } else
strcpy(timeout_str, "N/A");
- if (tk->leader == local && tk->delay_commit > time(NULL)) {
+ if (tk->leader == local && tk->delay_commit > get_secs(NULL)) {
+ ts = wall_ts(tk->delay_commit);
strcpy(pending_str, " (commit pending until ");
strftime(pending_str + strlen(" (commit pending until "),
sizeof(pending_str) - strlen(" (commit pending until ") - 1,
- "%F %T", localtime(&tk->delay_commit));
+ "%F %T", localtime(&ts));
strcat(pending_str, ")");
} else
*pending_str = '\0';
cp += snprintf(cp,
alloc - (cp - data),
"ticket: %s, leader: %s",
tk->name,
ticket_leader_string(tk));
if (is_owned(tk)) {
cp += snprintf(cp,
alloc - (cp - data),
", expires: %s%s\n",
timeout_str,
pending_str);
} else {
cp += snprintf(cp, alloc - (cp - data), "\n");
}
if (alloc - (cp - data) <= 0)
return -ENOMEM;
}
*pdata = data;
*len = cp - data;
return 0;
}
void reset_ticket(struct ticket_config *tk)
{
disown_ticket(tk);
tk->state = ST_INIT;
tk->voted_for = NULL;
}
static void reacquire_ticket(struct ticket_config *tk)
{
int valid;
const char *where_granted = "\0";
char buff[64];
- valid = (tk->term_expires >= time(NULL));
+ valid = (tk->term_expires >= get_secs(NULL));
if (tk->leader == local) {
where_granted = "granted here";
} else {
snprintf(buff, sizeof(buff), "granted to %s",
site_string(tk->leader));
where_granted = buff;
}
if (!valid) {
tk_log_warn("%s, but not valid "
"anymore (will try to reacquire)", where_granted);
}
if (tk->is_granted && tk->leader != local) {
if (tk->leader && tk->leader != no_leader) {
tk_log_error("granted here, but also %s, "
"that's really too bad (will try to reacquire)",
where_granted);
} else {
tk_log_warn("granted here, but we're "
"not recorded as the grantee (will try to reacquire)");
}
}
/* try to acquire the
* ticket through new elections
*/
acquire_ticket(tk, OR_REACQUIRE);
}
void update_ticket_state(struct ticket_config *tk, struct booth_site *sender)
{
if (tk->state == ST_CANDIDATE) {
tk_log_info("learned from %s about "
"newer ticket, stopping elections",
site_string(sender));
/* there could be rejects coming from others; don't log
* warnings unnecessarily */
tk->expect_more_rejects = 1;
}
if (tk->leader == local || tk->is_granted) {
/* message from a live leader with valid ticket? */
if (sender == tk->leader && term_time_left(tk)) {
if (tk->is_granted) {
tk_log_warn("ticket was granted here, "
"but it's live at %s (revoking here)",
site_string(sender));
} else {
tk_log_info("ticket live at %s",
site_string(sender));
}
disown_ticket(tk);
ticket_write(tk);
tk->state = ST_FOLLOWER;
tk->next_state = ST_FOLLOWER;
} else {
if (tk->state == ST_CANDIDATE) {
tk->state = ST_FOLLOWER;
}
tk->next_state = ST_LEADER;
}
} else {
if (!tk->leader || tk->leader == no_leader) {
if (sender)
tk_log_info("ticket is not granted");
else
tk_log_info("ticket is not granted (from CIB)");
tk->state = ST_INIT;
} else {
if (sender)
tk_log_info("ticket granted to %s (says %s)",
site_string(tk->leader),
site_string(sender));
else
tk_log_info("ticket granted to %s (from CIB)",
site_string(tk->leader));
tk->state = ST_FOLLOWER;
/* just make sure that we check the ticket soon */
tk->next_state = ST_FOLLOWER;
}
}
}
int setup_ticket(void)
{
struct ticket_config *tk;
int i;
foreach_ticket(i, tk) {
reset_ticket(tk);
if (local->type == SITE) {
if (!pcmk_handler.load_ticket(tk)) {
update_ticket_state(tk, NULL);
}
tk->update_cib = 1;
}
tk_log_info("broadcasting state query");
/* wait until all send their status (or the first
* timeout) */
tk->start_postpone = 1;
ticket_broadcast(tk, OP_STATUS, OP_MY_INDEX, RLT_SUCCESS, 0);
}
return 0;
}
int ticket_answer_list(int fd, struct boothc_ticket_msg *msg)
{
char *data;
int olen, rv;
struct boothc_header hdr;
rv = list_ticket(&data, &olen);
if (rv < 0)
return rv;
init_header(&hdr, CMR_LIST, 0, 0, RLT_SUCCESS, 0, sizeof(hdr) + olen);
return send_header_plus(fd, &hdr, data, olen);
}
int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg)
{
int rv;
struct ticket_config *tk;
if (!check_ticket(msg->ticket.id, &tk)) {
log_warn("client asked to grant unknown ticket %s",
msg->ticket.id);
rv = RLT_INVALID_ARG;
goto reply;
}
if (is_owned(tk)) {
log_warn("client wants to grant an (already granted!) ticket %s",
msg->ticket.id);
rv = RLT_OVERGRANT;
goto reply;
}
rv = do_grant_ticket(tk, ntohl(msg->header.options));
reply:
init_header(&msg->header, CMR_GRANT, 0, 0, rv ?: RLT_ASYNC, 0, sizeof(*msg));
return send_ticket_msg(fd, msg);
}
int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg)
{
int rv;
struct ticket_config *tk;
if (!check_ticket(msg->ticket.id, &tk)) {
log_warn("client wants to revoke an unknown ticket %s",
msg->ticket.id);
rv = RLT_INVALID_ARG;
goto reply;
}
if (!is_owned(tk)) {
log_info("client wants to revoke a free ticket %s",
msg->ticket.id);
rv = RLT_TICKET_IDLE;
goto reply;
}
if (tk->leader != local) {
log_info("the ticket %s is not granted here, "
"redirect to %s",
msg->ticket.id, ticket_leader_string(tk));
rv = RLT_REDIRECT;
goto reply;
}
rv = do_revoke_ticket(tk);
if (rv == 0)
rv = RLT_ASYNC;
reply:
init_ticket_msg(msg, CMR_REVOKE, 0, rv, 0, tk);
return send_ticket_msg(fd, msg);
}
int ticket_broadcast(struct ticket_config *tk,
cmd_request_t cmd, cmd_request_t expected_reply,
cmd_result_t res, cmd_reason_t reason)
{
struct boothc_ticket_msg msg;
init_ticket_msg(&msg, cmd, 0, res, reason, tk);
tk_log_debug("broadcasting '%s' (term=%d, valid=%d)",
state_to_string(cmd),
ntohl(msg.ticket.term),
ntohl(msg.ticket.term_valid_for));
tk->last_request = cmd;
if (expected_reply) {
expect_replies(tk, expected_reply);
}
return transport()->broadcast(&msg, sizeof(msg));
}
/* is it safe to commit the grant?
* if we didn't hear from all sites on the initial grant, we may
* need to delay the commit
*
* TODO: investigate possibility to devise from history whether a
* missing site could be holding a ticket or not
*/
static int ticket_dangerous(struct ticket_config *tk)
{
if (!tk->delay_commit)
return 0;
- if (tk->delay_commit <= time(NULL) ||
+ if (tk->delay_commit <= get_secs(NULL) ||
all_sites_replied(tk)) {
tk_log_debug("ticket delay commit expired");
tk->delay_commit = 0;
return 0;
} else {
tk_log_debug("delay ticket commit for %ds",
- (int)(tk->delay_commit - time(NULL)));
+ (int)(tk->delay_commit - get_secs(NULL)));
}
return 1;
}
/* update the ticket on the leader, write it to the CIB, and
send out the update message to others with the new expiry
time
*/
int leader_update_ticket(struct ticket_config *tk)
{
int rv = 0;
if (tk->ticket_updated >= 2)
return 0;
if (tk->ticket_updated < 1) {
tk->ticket_updated = 1;
- tk->term_expires = time(NULL) + tk->term_duration;
+ tk->term_expires = get_secs(NULL) + tk->term_duration;
rv = ticket_broadcast(tk, OP_UPDATE, OP_ACK, RLT_SUCCESS, 0);
}
if (tk->ticket_updated < 2) {
if (!ticket_dangerous(tk)) {
tk->ticket_updated = 2;
ticket_write(tk);
} else {
/* log just once, on the first retry */
if (tk->retry_number == 1)
tk_log_info("delaying ticket commit to CIB for %ds "
"(or all sites are reached)",
- (int)(tk->delay_commit - time(NULL)));
+ (int)(tk->delay_commit - get_secs(NULL)));
}
}
return rv;
}
static void log_lost_servers(struct ticket_config *tk)
{
struct booth_site *n;
int i;
if (tk->retry_number > 1)
/* log those that we couldn't reach, but do
* that only on the first retry
*/
return;
for (i = 0; i < booth_conf->site_count; i++) {
n = booth_conf->site + i;
if (!(tk->acks_received & n->bitmask)) {
tk_log_warn("%s %s didn't acknowledge our request, "
"will retry %d times",
(n->type == ARBITRATOR ? "arbitrator" : "site"),
site_string(n),
tk->retries);
}
}
}
static void resend_msg(struct ticket_config *tk)
{
struct booth_site *n;
int i;
if (!(tk->acks_received ^ local->bitmask)) {
ticket_broadcast(tk, tk->last_request, 0, RLT_SUCCESS, 0);
} else {
for (i = 0; i < booth_conf->site_count; i++) {
n = booth_conf->site + i;
if (!(tk->acks_received & n->bitmask)) {
tk_log_debug("resending %s to %s",
state_to_string(tk->last_request),
site_string(n)
);
send_msg(tk->last_request, tk, n, NULL);
}
}
}
}
static void handle_resends(struct ticket_config *tk)
{
int ack_cnt;
if (++tk->retry_number > tk->retries) {
tk_log_debug("giving up on sending retries");
no_resends(tk);
set_ticket_wakeup(tk);
return;
}
/* try to reach some sites again if we just stepped down */
if (tk->last_request == OP_VOTE_FOR) {
tk_log_warn("no answers to our request (try #%d), "
"we are alone",
tk->retry_number);
goto just_resend;
}
if (!majority_of_bits(tk, tk->acks_received)) {
ack_cnt = count_bits(tk->acks_received) - 1;
if (!ack_cnt) {
tk_log_warn("no answers to our request (try #%d), "
"we are alone",
tk->retry_number);
} else {
tk_log_warn("not enough answers to our request (try #%d): "
"only got %d answers",
tk->retry_number,
ack_cnt);
}
} else {
log_lost_servers(tk);
if (tk->last_request == OP_HEARTBEAT &&
is_owned(tk)) {
/* we have the majority, update the ticket, at
* least the local copy if we're still not
* allowed to commit
*/
leader_update_ticket(tk);
}
}
just_resend:
resend_msg(tk);
ticket_activate_timeout(tk);
}
int postpone_ticket_processing(struct ticket_config *tk)
{
extern time_t start_time;
return tk->start_postpone &&
- ((time(NULL) - start_time) < tk->timeout);
+ ((get_secs(NULL) - start_time) < tk->timeout);
}
static void process_next_state(struct ticket_config *tk)
{
switch(tk->next_state) {
case ST_LEADER:
reacquire_ticket(tk);
break;
case ST_INIT:
no_resends(tk);
start_revoke_ticket(tk);
break;
/* wanting to be follower is not much of an ambition; no
* processing, just return; don't reset start_postpone until
* we got some replies to status */
case ST_FOLLOWER:
return;
default:
break;
}
tk->start_postpone = 0;
}
static void ticket_lost(struct ticket_config *tk)
{
if (tk->leader != local) {
tk_log_warn("lost at %s", site_string(tk->leader));
} else {
tk_log_warn("lost majority (revoking locally)");
}
tk->lost_leader = tk->leader;
reset_ticket(tk);
tk->state = ST_FOLLOWER;
if (local->type == SITE) {
ticket_write(tk);
schedule_election(tk, OR_TKT_LOST);
}
}
static void next_action(struct ticket_config *tk)
{
switch(tk->state) {
case ST_INIT:
/* init state, handle resends for ticket revoke */
/* and rebroadcast if stepping down */
if (tk->acks_expected) {
handle_resends(tk);
}
break;
case ST_FOLLOWER:
/* leader/ticket lost? and we didn't vote yet */
tk_log_debug("leader: %s, voted_for: %s",
site_string(tk->leader),
site_string(tk->voted_for));
if (!tk->leader && !tk->voted_for) {
disown_ticket(tk);
if (!new_election(tk, NULL, 1, OR_AGAIN)) {
ticket_activate_timeout(tk);
}
}
break;
case ST_CANDIDATE:
/* elections timed out? */
elections_end(tk);
break;
case ST_LEADER:
/* timeout or ticket renewal? */
if (tk->acks_expected) {
handle_resends(tk);
} else {
/* this is ticket renewal, run local test */
if (!test_external_prog(tk, 1)) {
ticket_broadcast(tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0);
ticket_activate_timeout(tk);
}
}
break;
default:
break;
}
}
static void ticket_cron(struct ticket_config *tk)
{
time_t now;
/* don't process the tickets too early after start */
if (postpone_ticket_processing(tk)) {
tk_log_debug("ticket processing postponed (start_postpone=%d)",
tk->start_postpone);
/* but run again soon */
ticket_activate_timeout(tk);
return;
}
/* no need for status resends, we hope we got at least one
* my_index back */
if (tk->acks_expected == OP_MY_INDEX) {
no_resends(tk);
}
/* after startup, we need to decide what to do based on the
* current ticket state; tk->next_state has a hint
* also used for revokes which had to be delayed
*/
if (tk->next_state) {
process_next_state(tk);
goto out;
}
/* Has an owner, has an expiry date, and expiry date in the past?
* Losing the ticket must happen in _every_ state. */
- now = time(NULL);
+ now = get_secs(NULL);
if (!tk->in_election &&
tk->term_expires &&
is_owned(tk) &&
now >= tk->term_expires) {
ticket_lost(tk);
goto out;
}
next_action(tk);
out:
tk->next_state = 0;
if (!tk->in_election && tk->update_cib)
ticket_write(tk);
}
void process_tickets(void)
{
struct ticket_config *tk;
int i;
- struct timeval now, last_cron;
- float sec_until;
+ timetype now, last_cron;
- gettimeofday(&now, NULL);
+ get_time(&now);
foreach_ticket(i, tk) {
- sec_until = timeval_to_float(tk->next_cron) - timeval_to_float(now);
- if (0)
- tk_log_debug("next cron %" PRIx64 ".%03d, "
- "now %" PRIx64 "%03d, in %f",
- (uint64_t)tk->next_cron.tv_sec, timeval_msec(tk->next_cron),
- (uint64_t)now.tv_sec, timeval_msec(now),
- sec_until);
- if (sec_until > 0.0)
+ if (time_cmp(&tk->next_cron, &now, >))
continue;
tk_log_debug("ticket cron");
last_cron = tk->next_cron;
ticket_cron(tk);
- if (!timercmp(&last_cron, &tk->next_cron, !=)) {
+ if (!time_cmp(&last_cron, &tk->next_cron, !=)) {
tk_log_debug("nobody set ticket wakeup");
set_ticket_wakeup(tk);
}
}
}
void tickets_log_info(void)
{
struct ticket_config *tk;
int i;
+ time_t ts;
foreach_ticket(i, tk) {
+ ts = wall_ts(tk->term_expires);
tk_log_info("state '%s' "
"term %d "
"leader %s "
"expires %-24.24s",
state_to_string(tk->state),
tk->current_term,
ticket_leader_string(tk),
- ctime(&tk->term_expires));
+ ctime(&ts));
}
}
static void update_acks(
struct ticket_config *tk,
struct booth_site *sender,
struct booth_site *leader,
struct boothc_ticket_msg *msg
)
{
uint32_t cmd;
uint32_t req;
cmd = ntohl(msg->header.cmd);
req = ntohl(msg->header.request);
if (req != tk->last_request ||
(tk->acks_expected != cmd &&
tk->acks_expected != OP_REJECTED))
return;
/* got an ack! */
tk->acks_received |= sender->bitmask;
if (cmd == OP_HEARTBEAT)
tk_log_debug("got ACK from %s, %d/%d agree.",
site_string(sender),
count_bits(tk->acks_received),
booth_conf->site_count);
if (tk->delay_commit && all_sites_replied(tk)) {
tk->delay_commit = 0;
}
if (all_replied(tk) ||
/* we just stepped down, need only one site to start
* elections */
(cmd == OP_REQ_VOTE && tk->last_request == OP_VOTE_FOR)) {
no_resends(tk);
tk->start_postpone = 0;
set_ticket_wakeup(tk);
}
}
/* UDP message receiver. */
int message_recv(struct boothc_ticket_msg *msg, int msglen)
{
uint32_t from;
struct booth_site *source;
struct ticket_config *tk;
struct booth_site *leader;
uint32_t leader_u;
if (check_boothc_header(&msg->header, sizeof(*msg)) < 0 ||
msglen != sizeof(*msg)) {
log_error("message receive error");
return -1;
}
from = ntohl(msg->header.from);
if (!find_site_by_id(from, &source) || !source) {
log_error("unknown sender: %08x", from);
return -1;
}
if (!check_ticket(msg->ticket.id, &tk)) {
log_warn("got invalid ticket name %s from %s",
msg->ticket.id, site_string(source));
return -EINVAL;
}
leader_u = ntohl(msg->ticket.leader);
if (!find_site_by_id(leader_u, &leader)) {
tk_log_error("message with unknown leader %u received", leader_u);
return -EINVAL;
}
update_acks(tk, source, leader, msg);
return raft_answer(tk, source, leader, msg);
}
static void log_next_wakeup(struct ticket_config *tk)
{
- struct timeval now, res;
+ timetype now, res;
- gettimeofday(&now, NULL);
- timersub(&tk->next_cron, &now, &res);
- tk_log_debug("set ticket wakeup in %d.%06d",
- (int)res.tv_sec, (int)res.tv_usec);
+ get_time(&now);
+ time_sub(&tk->next_cron, &now, &res);
+ tk_log_debug("set ticket wakeup in %d.%03d",
+ (int)res.tv_sec, (int)msecs(res));
}
/* New vote round; §5.2 */
/* delay the next election start for up to 1s */
void add_random_delay(struct ticket_config *tk)
{
- struct timeval delay, tv;
+ timetype delay, tv;
- delay.tv_sec = 0;
- delay.tv_usec = cl_rand_from_interval(0, 1000000);
- timeradd(&tk->next_cron, &delay, &tv);
+ rand_time_ms(delay, 1000);
+ time_add(&tk->next_cron, &delay, &tv);
ticket_next_cron_at(tk, tv);
if (ANYDEBUG) {
log_next_wakeup(tk);
}
}
void set_ticket_wakeup(struct ticket_config *tk)
{
- struct timeval tv, now;
+ timetype tv, now, res;
/* At least every hour, perhaps sooner. */
ticket_next_cron_in(tk, 3600);
- gettimeofday(&now, NULL);
+ get_time(&now);
switch (tk->state) {
case ST_LEADER:
assert(tk->leader == local);
tv = now;
tv.tv_sec = next_vote_starts_at(tk);
- /* If timestamp is in the past, wakeup at the expiry
- * time. */
- if (timeval_compare(tv, now) <= 0) {
- tk_log_debug("next ts in the past (%f)",
- timeval_to_float(tv) - timeval_to_float(now));
- tv.tv_sec = tk->term_expires;
+ /* If timestamp is in the past, wakeup in
+ * one second. */
+ if (time_cmp(&tv, &now, <)) {
+ time_sub(&tv, &now, &res);
+ tk_log_debug("next ts in the past (%d.%03d)",
+ (int)res.tv_sec, (int)msecs(res));
+ tv.tv_sec = now.tv_sec + 1;
}
ticket_next_cron_at(tk, tv);
break;
case ST_CANDIDATE:
assert(tk->election_end);
ticket_next_cron_at_coarse(tk, tk->election_end);
break;
case ST_INIT:
case ST_FOLLOWER:
/* If there is (or should be) some owner, check on it later on.
* If no one is interested - don't care. */
if (is_owned(tk) &&
(local->type == SITE))
ticket_next_cron_at_coarse(tk,
tk->term_expires + tk->acquire_after);
break;
default:
tk_log_error("unknown ticket state: %d", tk->state);
}
if (tk->next_state) {
/* we need to do something soon here */
if (!tk->acks_expected) {
ticket_next_cron_at(tk, now);
} else {
ticket_activate_timeout(tk);
}
}
if (ANYDEBUG) {
log_next_wakeup(tk);
}
}
void schedule_election(struct ticket_config *tk, cmd_reason_t reason)
{
if (local->type != SITE)
return;
tk->election_reason = reason;
- gettimeofday(&tk->next_cron, NULL);
+ get_time(&tk->next_cron);
/* introduce a short delay before starting election */
add_random_delay(tk);
}
/* Given a state (in host byte order), return a human-readable (char*).
* An array is used so that multiple states can be printed in a single printf(). */
char *state_to_string(uint32_t state_ho)
{
union mu { cmd_request_t s; char c[5]; };
static union mu cache[6] = { { 0 } }, *cur;
static int current = 0;
current ++;
if (current >= sizeof(cache)/sizeof(cache[0]))
current = 0;
cur = cache + current;
cur->s = htonl(state_ho);
/* Shouldn't be necessary, union array is initialized with zeroes, and
* these bytes never get written. */
cur->c[4] = 0;
return cur->c;
}
int send_reject(struct booth_site *dest, struct ticket_config *tk,
cmd_result_t code, struct boothc_ticket_msg *in_msg)
{
int req = ntohl(in_msg->header.cmd);
struct boothc_ticket_msg msg;
tk_log_debug("sending reject to %s",
site_string(dest));
init_ticket_msg(&msg, OP_REJECTED, req, code, 0, tk);
return booth_udp_send(dest, &msg, sizeof(msg));
}
int send_msg (
int cmd,
struct ticket_config *current_tk,
struct booth_site *dest,
struct boothc_ticket_msg *in_msg
)
{
int req = 0;
struct ticket_config *tk = current_tk;
struct boothc_ticket_msg msg;
if (cmd == OP_MY_INDEX) {
if (current_tk->state == ST_CANDIDATE &&
current_tk->last_valid_tk->current_term) {
tk = current_tk->last_valid_tk;
}
tk_log_info("sending status to %s",
site_string(dest));
}
if (in_msg)
req = ntohl(in_msg->header.cmd);
init_ticket_msg(&msg, cmd, req, RLT_SUCCESS, 0, tk);
return booth_udp_send(dest, &msg, sizeof(msg));
}
diff --git a/src/ticket.h b/src/ticket.h
index f4c2770..24e796e 100644
--- a/src/ticket.h
+++ b/src/ticket.h
@@ -1,114 +1,114 @@
/*
* Copyright (C) 2011 Jiaju Zhang <jjzhang@suse.de>
* Copyright (C) 2013-2014 Philipp Marek <philipp.marek@linbit.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _TICKET_H
#define _TICKET_H
#include <time.h>
#include <sys/time.h>
#include <math.h>
+#include "timer.h"
#include "config.h"
#include "log.h"
#define DEFAULT_TICKET_EXPIRY 600
#define DEFAULT_TICKET_TIMEOUT 5
#define DEFAULT_RETRIES 10
#define foreach_ticket(i_,t_) for(i=0; (t_=booth_conf->ticket+i, i<booth_conf->ticket_count); i++)
#define foreach_node(i_,n_) for(i=0; (n_=booth_conf->site+i, i<booth_conf->site_count); i++)
int check_ticket(char *ticket, struct ticket_config **tc);
int check_site(char *site, int *local);
int grant_ticket(struct ticket_config *ticket);
int revoke_ticket(struct ticket_config *ticket);
int list_ticket(char **pdata, unsigned int *len);
int message_recv(struct boothc_ticket_msg *msg, int msglen);
void reset_ticket(struct ticket_config *tk);
void update_ticket_state(struct ticket_config *tk, struct booth_site *sender);
int setup_ticket(void);
int check_max_len_valid(const char *s, int max);
int do_grant_ticket(struct ticket_config *ticket, int options);
int do_revoke_ticket(struct ticket_config *tk);
int find_ticket_by_name(const char *ticket, struct ticket_config **found);
void set_ticket_wakeup(struct ticket_config *tk);
int postpone_ticket_processing(struct ticket_config *tk);
int test_external_prog(struct ticket_config *tk, int start_election);
int acquire_ticket(struct ticket_config *tk, cmd_reason_t reason);
int ticket_answer_list(int fd, struct boothc_ticket_msg *msg);
int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg);
int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg);
int ticket_broadcast_proposed_state(struct ticket_config *tk, cmd_request_t state);
int ticket_write(struct ticket_config *tk);
void process_tickets(void);
void tickets_log_info(void);
char *state_to_string(uint32_t state_ho);
int send_reject(struct booth_site *dest, struct ticket_config *tk,
cmd_result_t code, struct boothc_ticket_msg *in_msg);
int send_msg (int cmd, struct ticket_config *tk,
struct booth_site *dest, struct boothc_ticket_msg *in_msg);
int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_request_t expected_reply, cmd_result_t res, cmd_reason_t reason);
int leader_update_ticket(struct ticket_config *tk);
void add_random_delay(struct ticket_config *tk);
void schedule_election(struct ticket_config *tk, cmd_reason_t reason);
-static inline void ticket_next_cron_at(struct ticket_config *tk, struct timeval when)
+static inline void ticket_next_cron_at(struct ticket_config *tk, timetype when)
{
tk->next_cron = when;
}
static inline void ticket_next_cron_at_coarse(struct ticket_config *tk, time_t when)
{
+ memset(&tk->next_cron, 0, sizeof(tk->next_cron));
tk->next_cron.tv_sec = when;
- tk->next_cron.tv_usec = 0;
}
-static inline void ticket_next_cron_in(struct ticket_config *tk, float seconds)
+static inline void ticket_next_cron_in(struct ticket_config *tk, time_t seconds)
{
- struct timeval tv;
+ timetype tv;
- gettimeofday(&tv, NULL);
- tv.tv_sec += trunc(seconds);
- tv.tv_usec += (seconds - trunc(seconds)) * 1e6;
+ get_time(&tv);
+ tv.tv_sec += seconds;
ticket_next_cron_at(tk, tv);
}
static inline void ticket_activate_timeout(struct ticket_config *tk)
{
/* TODO: increase timeout when no answers */
tk_log_debug("activate ticket timeout in %d", tk->timeout);
ticket_next_cron_in(tk, tk->timeout);
}
#endif /* _TICKET_H */
diff --git a/src/timer.c b/src/timer.c
new file mode 100644
index 0000000..b550fdd
--- /dev/null
+++ b/src/timer.c
@@ -0,0 +1,77 @@
+/*
+ * Copyright (C) 2014 Dejan Muhamedagic <dejan@suse.de>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include "timer.h"
+
+void time_sub(struct timespec *a, struct timespec *b, struct timespec *res)
+{
+ if (a->tv_nsec < b->tv_nsec) {
+ res->tv_sec = a->tv_sec - b->tv_sec - 1;
+ res->tv_nsec = a->tv_nsec + (1000000000 - b->tv_nsec);
+ } else {
+ res->tv_sec = a->tv_sec - b->tv_sec;
+ res->tv_nsec = a->tv_nsec - b->tv_nsec;
+ }
+}
+
+
+void time_add(struct timespec *a, struct timespec *b, struct timespec *res)
+{
+ res->tv_nsec = (a->tv_nsec + b->tv_nsec) % 1000000000;
+ res->tv_sec = a->tv_sec + b->tv_sec + ((a->tv_nsec + b->tv_nsec) / 1000000000);
+}
+
+time_t get_secs(time_t *p)
+{
+ struct timespec tv;
+ time_t secs;
+
+ get_time(&tv);
+ secs = tv.tv_sec;
+ if (p)
+ *p = secs;
+ return secs;
+}
+
+/* time booth_clk_t is a time since boot or similar, return
+ * something humans can understand */
+time_t wall_ts(time_t booth_clk_t)
+{
+ struct timespec booth_clk_now, now_tv, res;
+ struct timeval now;
+
+ get_time(&booth_clk_now);
+ gettimeofday(&now, NULL);
+ TIMEVAL_TO_TIMESPEC(&now, &now_tv);
+ time_sub(&now_tv, &booth_clk_now, &res);
+ return booth_clk_t + res.tv_sec;
+}
+
+/* time t is wall clock time, convert to time compatible
+ * with our clock_gettime clock */
+time_t unwall_ts(time_t t)
+{
+ struct timespec booth_clk_now, now_tv, res;
+ struct timeval now;
+
+ get_time(&booth_clk_now);
+ gettimeofday(&now, NULL);
+ TIMEVAL_TO_TIMESPEC(&now, &now_tv);
+ time_sub(&now_tv, &booth_clk_now, &res);
+ return t - res.tv_sec;
+}
diff --git a/src/timer.h b/src/timer.h
new file mode 100644
index 0000000..05c7da8
--- /dev/null
+++ b/src/timer.h
@@ -0,0 +1,79 @@
+/*
+ * Copyright (C) 2014 Dejan Muhamedagic <dejan@suse.de>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#ifndef _TIMER_H
+#define _TIMER_H
+
+#include <time.h>
+#include <unistd.h>
+#include <sys/time.h>
+
+#if _POSIX_TIMERS > 0
+
+#if defined(CLOCK_MONOTONIC)
+# define BOOTH_CLOCK CLOCK_MONOTONIC
+#else
+# define BOOTH_CLOCK CLOCK_REALTIME
+#endif
+
+typedef struct timespec timetype;
+
+#define get_time(p) clock_gettime(BOOTH_CLOCK, p)
+
+#define time_cmp(a, b, CMP) \
+ (((a)->tv_sec == (b)->tv_sec) ? \
+ ((a)->tv_nsec CMP (b)->tv_nsec) : \
+ ((a)->tv_sec CMP (b)->tv_sec))
+
+void time_sub(struct timespec *a, struct timespec *b, struct timespec *res);
+void time_add(struct timespec *a, struct timespec *b, struct timespec *res);
+time_t get_secs(time_t *p);
+time_t wall_ts(time_t t);
+time_t unwall_ts(time_t t);
+
+#define msecs(tv) ((tv).tv_nsec/1000000)
+
+/* random time from 0 to t milliseconds */
+#define rand_time_ms(tv, t) do { \
+ tv.tv_sec = 0; \
+ tv.tv_nsec = t * cl_rand_from_interval(0, 1000000); \
+ } while(0)
+
+#else
+
+typedef struct timeval timetype;
+#define get_time(p) gettimeofday(p, NULL)
+#define time_sub timersub
+#define time_add timeradd
+#define time_cmp timercmp
+#define get_secs time
+
+#define msecs(tv) ((tv).tv_usec/1000)
+
+/* random time from 0 to t milliseconds */
+#define rand_time_ms(tv, t) do { \
+ tv.tv_sec = 0; \
+ tv.tv_usec = t * cl_rand_from_interval(0, 1000); \
+ } while(0)
+
+#define wall_ts(t) (t)
+#define unwall_ts(t) (t)
+
+#endif
+
+#endif

File Metadata

Mime Type
text/x-diff
Expires
Sat, Jan 25, 11:29 AM (1 d, 12 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1320292
Default Alt Text
(124 KB)

Event Timeline