diff --git a/configure.ac b/configure.ac index 8049c2b..0fb4314 100644 --- a/configure.ac +++ b/configure.ac @@ -1,430 +1,430 @@ # -*- Autoconf -*- # Process this file with autoconf to produce a configure script. # bootstrap / init AC_PREREQ([2.61]) -AC_INIT([booth], [0.1.7], [pacemaker@oss.clusterlabs.org]) +AC_INIT([booth], [0.2.0], [pacemaker@oss.clusterlabs.org]) AM_INIT_AUTOMAKE([-Wno-portability]) AC_CONFIG_SRCDIR([src/main.c]) AC_CONFIG_HEADER([src/b_config.h src/booth_config.h]) AC_CANONICAL_HOST AC_LANG([C]) AC_SUBST(WITH_LIST, [""]) dnl Fix default variables - "prefix" variable if not specified if test "$prefix" = "NONE"; then prefix="/usr" dnl Fix "localstatedir" variable if not specified if test "$localstatedir" = "\${prefix}/var"; then localstatedir="/var" fi dnl Fix "sysconfdir" variable if not specified if test "$sysconfdir" = "\${prefix}/etc"; then sysconfdir="/etc" fi dnl Fix "libdir" variable if not specified if test "$libdir" = "\${exec_prefix}/lib"; then if test -e /usr/lib64; then libdir="/usr/lib64" else libdir="/usr/lib" fi fi fi if test "$srcdir" = "."; then AC_MSG_NOTICE([building in place srcdir:$srcdir]) AC_DEFINE([BUILDING_IN_PLACE], 1, [building in place]) else AC_MSG_NOTICE([building out of tree srcdir:$srcdir]) fi # Checks for programs. # check stolen from gnulib/m4/gnu-make.m4 if ! ${MAKE-make} --version /cannot/make/this >/dev/null 2>&1; then AC_MSG_ERROR([you don't seem to have GNU make; it is required]) fi AC_PROG_CC AC_PROG_INSTALL AC_PROG_LN_S AC_PROG_MAKE_SET AC_PROG_RANLIB AC_PATH_PROGS(HELP2MAN, help2man) AC_PATH_PROGS(PKGCONFIG, pkg-config) AM_CONDITIONAL(HAVE_HELP2MAN, test x"${HELP2MAN}" != x"") # Checks for libraries. AC_CHECK_LIB([socket], [socket]) AC_CHECK_LIB([nsl], [t_open]) AC_CHECK_LIB([gpl], [cl_log]) PKG_CHECK_MODULES(GLIB, [glib-2.0]) # Checks for header files. AC_FUNC_ALLOCA AC_HEADER_DIRENT AC_HEADER_STDC AC_HEADER_SYS_WAIT AC_CHECK_HEADERS([arpa/inet.h fcntl.h limits.h netdb.h netinet/in.h stdint.h \ stdlib.h string.h sys/ioctl.h sys/param.h sys/socket.h \ sys/time.h syslog.h unistd.h sys/types.h getopt.h malloc.h \ sys/sockio.h utmpx.h]) AC_CHECK_HEADERS(heartbeat/glue_config.h) # Checks for typedefs, structures, and compiler characteristics. AC_C_CONST AC_TYPE_UID_T AC_C_INLINE AC_TYPE_INT16_T AC_TYPE_INT32_T AC_TYPE_INT64_T AC_TYPE_INT8_T AC_TYPE_SIZE_T AC_TYPE_SSIZE_T AC_HEADER_TIME AC_TYPE_UINT16_T AC_TYPE_UINT32_T AC_TYPE_UINT64_T AC_TYPE_UINT8_T AC_C_VOLATILE # Checks for library functions. AC_FUNC_CLOSEDIR_VOID AC_FUNC_ERROR_AT_LINE AC_REPLACE_FNMATCH AC_FUNC_FORK AC_PROG_GCC_TRADITIONAL AC_FUNC_MALLOC AC_FUNC_MEMCMP AC_FUNC_REALLOC AC_FUNC_SELECT_ARGTYPES AC_TYPE_SIGNAL AC_FUNC_VPRINTF AC_CHECK_FUNCS([alarm alphasort atexit bzero dup2 endgrent endpwent fcntl \ getcwd getpeerucred getpeereid gettimeofday inet_ntoa memmove \ memset mkdir scandir select socket strcasecmp strchr strdup \ strerror strrchr strspn strstr \ sched_get_priority_max sched_setscheduler]) AC_CONFIG_FILES([Makefile src/Makefile]) # =============================================== # Helpers # =============================================== ## helper for CC stuff cc_supports_flag() { local CFLAGS="-Werror $@" AC_MSG_CHECKING(whether $CC supports "$@") AC_COMPILE_IFELSE([int main(){return 0;}] ,[RC=0; AC_MSG_RESULT(yes)],[RC=1; AC_MSG_RESULT(no)]) return $RC } ## cleanup AC_MSG_NOTICE(Sanitizing prefix: ${prefix}) case $prefix in NONE) prefix=/usr/local;; esac AC_MSG_NOTICE(Sanitizing exec_prefix: ${exec_prefix}) case $exec_prefix in dnl For consistency with Corosync, map NONE->$prefix NONE) exec_prefix=$prefix;; prefix) exec_prefix=$prefix;; esac ## local defines PACKAGE_FEATURES="" LINT_FLAGS="-weak -unrecog +posixlib +ignoresigns -fcnuse \ -badflag -D__gnuc_va_list=va_list -D__attribute\(x\)=" # local options AC_ARG_ENABLE([fatal-warnings], [ --enable-fatal-warnings : enable fatal warnings. ], [ default="no" ]) AC_ARG_ENABLE([debug], [ --enable-debug : enable debug build. ], [ default="no" ]) AC_ARG_ENABLE([user-flags], [ --enable-user-flags : rely on user environment. ], [ default="no" ]) AC_ARG_ENABLE([coverage], [ --enable-coverage : coverage analysis of the codebase. ], [ default="no" ]) AC_ARG_ENABLE([small-memory-footprint], [ --enable-small-memory-footprint : Use small message queues and small messages sizes. ], [ default="no" ]) AC_ARG_WITH([initddir], [ --with-initddir=DIR : path to init script directory. ], [ INITDDIR="$withval" ], [ INITDDIR="$sysconfdir/init.d" ]) AC_ARG_ENABLE([resource-monitor], [ --enable-resource-monitor : Enabling Resource Monitor ], [ default="no" ]) # OS detection # THIS SECTION MUST DIE! CP=cp OS_LDL="-ldl" have_linux="no" case "$host_os" in *linux*) AC_DEFINE_UNQUOTED([BOOTH_LINUX], [1], [Compiling for Linux platform]) OS_CFLAGS="" OS_CPPFLAGS="-D_GNU_SOURCE" OS_LDFLAGS="" OS_DYFLAGS="-rdynamic" DARWIN_OPTS="" have_linux="yes" ;; darwin*) AC_DEFINE_UNQUOTED([BOOTH_DARWIN], [1], [Compiling for Darwin platform]) CP=rsync OS_CFLAGS="" OS_CPPFLAGS="" OS_LDFLAGS="" OS_DYFLAGS="" DARWIN_OPTS="-dynamiclib -bind_at_load \ -current_version ${SONAME} \ -compatibility_version ${SONAME} -install_name \$(libdir)/\$(@)" AC_DEFINE_UNQUOTED([MAP_ANONYMOUS], [MAP_ANON], [Shared memory define for Darwin platform]) AC_DEFINE_UNQUOTED([PATH_MAX], [4096], [Number of chars in a path name including nul]) AC_DEFINE_UNQUOTED([NAME_MAX], [255], [Number of chars in a file name]) ;; *bsd*) AC_DEFINE_UNQUOTED([BOOTH_BSD], [1], [Compiling for BSD platform]) AC_DEFINE_UNQUOTED([MAP_ANONYMOUS], [MAP_ANON], [Shared memory define for Darwin platform]) OS_CFLAGS="" OS_CPPFLAGS="-I/usr/local/include" OS_LDFLAGS="-L/usr/local/lib" OS_DYFLAGS="-export-dynamic" DARWIN_OPTS="" OS_LDL="" case "$host_os" in *freebsd[[234567]]*) ;; *freebsd*) AC_DEFINE_UNQUOTED([BOOTH_FREEBSD_GE_8], [1], [Compiling for FreeBSD >= 8 platform]) ;; esac ;; *solaris*) AC_DEFINE_UNQUOTED([BOOTH_SOLARIS], [1], [Compiling for Solaris platform]) AC_DEFINE_UNQUOTED([TS_CLASS], [1], [Prevent being scheduled RR]) AC_DEFINE_UNQUOTED([_SEM_SEMUN_UNDEFINED], [1], [The semun structure is undefined]) CP=rsync OS_CFLAGS="" OS_CPPFLAGS="-D_REENTRANT" OS_LDFLAGS="" OS_DYFLAGS="-Wl,-z,lazyload" DARWIN_OPTS="" SOLARIS_OPTS=" " ;; *) AC_MSG_ERROR([Unsupported OS? hmmmm]) ;; esac AC_SUBST(CP) # *FLAGS handling goes here ENV_CFLAGS="$CFLAGS" ENV_CPPFLAGS="$CPPFLAGS" ENV_LDFLAGS="$LDFLAGS" # debug build stuff if test "x${enable_debug}" = xyes; then AC_DEFINE_UNQUOTED([DEBUG], [1], [Compiling Debugging code]) OPT_CFLAGS="-O0" PACKAGE_FEATURES="$PACKAGE_FEATURES debug" else OPT_CFLAGS="-O3" fi # gdb flags if test "x${GCC}" = xyes; then GDB_FLAGS="-ggdb3" else GDB_FLAGS="-g" fi # extra warnings EXTRA_WARNINGS="" WARNLIST=" all shadow missing-prototypes missing-declarations strict-prototypes declaration-after-statement pointer-arith write-strings bad-function-cast missing-format-attribute format=2 format-security format-nonliteral no-long-long unsigned-char gnu89-inline no-strict-aliasing " for j in $WARNLIST; do if cc_supports_flag -W$j; then EXTRA_WARNINGS="$EXTRA_WARNINGS -W$j"; fi done if test "x${enable_coverage}" = xyes && \ cc_supports_flag -ftest-coverage && \ cc_supports_flag -fprofile-arcs ; then AC_MSG_NOTICE([Enabling Coverage (enable -O0 by default)]) OPT_CFLAGS="-O0" COVERAGE_CFLAGS="-ftest-coverage -fprofile-arcs" COVERAGE_LDFLAGS="-ftest-coverage -fprofile-arcs" COVERAGE_LCRSO_EXTRA_LDFLAGS="-rdynamic" PACKAGE_FEATURES="$PACKAGE_FEATURES coverage" else COVERAGE_CFLAGS="" COVERAGE_LDFLAGS="" COVERAGE_LCRSO_EXTRA_LDFLAGS="" fi if test "x${enable_small_memory_footprint}" = xyes ; then AC_DEFINE_UNQUOTED([HAVE_SMALL_MEMORY_FOOTPRINT], 1, [have small_memory_footprint]) PACKAGE_FEATURES="$PACKAGE_FEATURES small-memory-footprint" fi if test "x${enable_ansi}" = xyes && \ cc_supports_flag -std=iso9899:199409 ; then AC_MSG_NOTICE([Enabling ANSI Compatibility]) ANSI_CPPFLAGS="-ansi -D_GNU_SOURCE -DANSI_ONLY" PACKAGE_FEATURES="$PACKAGE_FEATURES ansi" else ANSI_CPPFLAGS="" fi if test "x${enable_fatal_warnings}" = xyes && \ cc_supports_flag -Werror ; then AC_MSG_NOTICE([Enabling Fatal Warnings (-Werror)]) WERROR_CFLAGS="-Werror" PACKAGE_FEATURES="$PACKAGE_FEATURES fatal-warnings" else WERROR_CFLAGS="" fi # don't add addtional cflags if test "x${enable_user_flags}" = xyes; then OPT_CFLAGS="" GDB_FLAGS="" EXTRA_WARNINGS="" fi # final build of *FLAGS CFLAGS="$ENV_CFLAGS $OPT_CFLAGS $GDB_FLAGS $OS_CFLAGS \ $COVERAGE_CFLAGS $EXTRA_WARNINGS $WERROR_CFLAGS $NSS_CFLAGS" CPPFLAGS="$ENV_CPPFLAGS $ANSI_CPPFLAGS $OS_CPPFLAGS $GLIB_CFLAGS $RESMON_CFLAGS" LDFLAGS="$ENV_LDFLAGS $COVERAGE_LDFLAGS $OS_LDFLAGS" # substitute what we need: AC_SUBST([INITDDIR]) AC_SUBST([COVERAGE_LCRSO_EXTRA_LDFLAGS]) AC_SUBST([OS_DYFLAGS]) AC_SUBST([OS_LDL]) AM_CONDITIONAL(BUILD_DARWIN, test -n "${DARWIN_OPTS}") AM_CONDITIONAL(BUILD_SOLARIS, test -n "${SOLARIS_OPTS}") AC_SUBST([DARWIN_OPTS]) AC_SUBST([SOLARIS_OPTS]) AM_CONDITIONAL(BUILD_HTML_DOCS, test -n "${GROFF}") AC_SUBST([LINT_FLAGS]) AC_DEFINE_UNQUOTED([LCRSODIR], "$(eval echo ${LCRSODIR})", [LCRSO directory]) AC_DEFINE_UNQUOTED([SOCKETDIR], "$(eval echo ${SOCKETDIR})", [Socket directory]) AC_DEFINE_UNQUOTED([LOCALSTATEDIR], "$(eval echo ${localstatedir})", [localstate directory]) BOOTHSYSCONFDIR=${sysconfdir}/booth AC_SUBST([HAVE_LOG_CIB_DIFF]) AC_SUBST([HAVE_XML_LOG_PATCHSET]) AC_SUBST([BOOTHSYSCONFDIR]) AC_DEFINE_UNQUOTED([BOOTHSYSCONFDIR], "$(eval echo ${BOOTHSYSCONFDIR})", [booth config directory]) AC_DEFINE_UNQUOTED([PACKAGE_FEATURES], "${PACKAGE_FEATURES}", [booth built-in features]) AC_OUTPUT AC_MSG_RESULT([]) AC_MSG_RESULT([$PACKAGE configuration:]) AC_MSG_RESULT([ Version = ${VERSION}]) AC_MSG_RESULT([ Prefix = ${prefix}]) AC_MSG_RESULT([ Executables = ${sbindir}]) AC_MSG_RESULT([ Man pages = ${mandir}]) AC_MSG_RESULT([ Doc dir = ${docdir}]) AC_MSG_RESULT([ Libraries = ${libdir}]) AC_MSG_RESULT([ Header files = ${includedir}]) AC_MSG_RESULT([ Arch-independent files = ${datadir}]) AC_MSG_RESULT([ State information = ${localstatedir}]) AC_MSG_RESULT([ System configuration = ${sysconfdir}]) AC_MSG_RESULT([ System init.d directory = ${INITDDIR}]) AC_MSG_RESULT([ booth config dir = ${BOOTHSYSCONFDIR}]) AC_MSG_RESULT([ SOCKETDIR = ${SOCKETDIR}]) AC_MSG_RESULT([ Features =${PACKAGE_FEATURES}]) AC_MSG_RESULT([]) AC_MSG_RESULT([$PACKAGE build info:]) AC_MSG_RESULT([ Library SONAME = ${SONAME}]) LIB_MSG_RESULT(m4_shift(local_soname_list))dnl AC_MSG_RESULT([ Default optimization = ${OPT_CFLAGS}]) AC_MSG_RESULT([ Default debug options = ${GDB_CFLAGS}]) AC_MSG_RESULT([ Extra compiler warnings = ${EXTRA_WARNING}]) AC_MSG_RESULT([ Env. defined CFLAG = ${ENV_CFLAGS}]) AC_MSG_RESULT([ Env. defined CPPFLAGS = ${ENV_CPPFLAGS}]) AC_MSG_RESULT([ Env. defined LDFLAGS = ${ENV_LDFLAGS}]) AC_MSG_RESULT([ OS defined CFLAGS = ${OS_CFLAGS}]) AC_MSG_RESULT([ OS defined CPPFLAGS = ${OS_CPPFLAGS}]) AC_MSG_RESULT([ OS defined LDFLAGS = ${OS_LDFLAGS}]) AC_MSG_RESULT([ OS defined LDL = ${OS_LDL}]) AC_MSG_RESULT([ OS defined DYFLAGS = ${OS_DYFLAGS}]) AC_MSG_RESULT([ ANSI defined CPPFLAGS = ${ANSI_CPPFLAGS}]) AC_MSG_RESULT([ Coverage CFLAGS = ${COVERAGE_CFLAGS}]) AC_MSG_RESULT([ Coverage LDFLAGS = ${COVERAGE_LDFLAGS}]) AC_MSG_RESULT([ Fatal War. CFLAGS = ${WERROR_CFLAGS}]) AC_MSG_RESULT([ Final CFLAGS = ${CFLAGS}]) AC_MSG_RESULT([ Final CPPFLAGS = ${CPPFLAGS}]) AC_MSG_RESULT([ Final LDFLAGS = ${LDFLAGS}]) diff --git a/docs/boothd.8.txt b/docs/boothd.8.txt index 15a7efb..c3765c9 100644 --- a/docs/boothd.8.txt +++ b/docs/boothd.8.txt @@ -1,367 +1,379 @@ BOOTHD(8) =========== :doctype: manpage NAME ---- boothd - The Booth Cluster Ticket Manager. SYNOPSIS -------- *boothd* 'daemon' ['-D'] [-c 'config'] *booth* ['client'] {'list'} [-S 'site'] ['-D'] [-c 'config'] *booth* ['client'] {'grant'|'revoke'} [-S 'site'] ['-D'] [-t] 'ticket' [-c 'config'] *booth* 'status' ['-D'] [-c 'config'] DESCRIPTION ----------- Booth manages tickets which authorizes one of the cluster sites located in geographically dispersed distances to run certain resources. It is designed to be an add-on to Pacemaker, which extends Pacemaker to support geographically distributed clustering. -It is based on the PAXOS protocol, see eg. - +It is based on the RAFT protocol, see eg. + for details. SHORT EXAMPLES -------------- --------------------- # boothd daemon # boothd client list # boothd client grant -t ticket-nfs # boothd client revoke -t ticket-nfs --------------------- OPTIONS ------- *-c*:: Configuration to use. + Can be a full path to a configuration file, or a short name; in the latter case, the directory '/etc/booth' and suffix '.conf' are added. Per default 'booth' is used, which results in the path '/etc/booth/booth.conf'. + The configuration name also determines the name of the PID file - for the defaults, '/var/run/booth/booth.pid'. *-D*:: Debug output/don't daemonize. Increases the debug output level; for 'boothd daemon', keeps the process in the foreground. *-h*, *--help*:: Give a short usage output. *-s*:: Site address. *-t*:: Ticket name. *-v*, *--version*:: Report version information. *-S*:: 'systemd' mode: don't fork. This is like '-D' but without the debug output. COMMANDS -------- Whether the binary is called as 'boothd' or 'booth' doesn't matter; the first argument determines the mode of operation. *'daemon'*:: Tells 'boothd' to serve a site. The locally configured interfaces are searched for an IP address that got defined in the configuration, so that Booth can operate in /arbitrator/ resp. /site/ mode. *'client'*:: Allows to list the ticket information (see also 'crm_ticket -L'), and to revoke or (initially) grant tickets to a site. + In this mode the configuration file is searched for an IP address that is locally reachable, ie. matches a configured subnet. This allows to run the client commands on another node in the same cluster, as long as the config file and the service IP is locally reachable. + Example: If the booth service IP is 192.168.55.200, and the local node has 192.168.55.15 configured on an interface, it knows which site it belongs to. + The client can also ask another site; use '-s' to tell where to connect to. *'status'*:: 'boothd' looks for the (locked) PID file and the UDP socket, prints some output to stdout (for use in shell scripts) and returns a OCF-compatible return code. With '-D', a human-readable message is printed to STDERR as well. CONFIGURATION FILE ------------------ A basic file looks like this: ----------------------- site="192.168.201.100" site="192.168.202.100" arbitrator="192.168.203.100" ticket="I-want-a-pony" ----------------------- You can use comment lines, by starting them with a hash-sign (''#''). Whitespace at the start and end of the line, and around the ''='', are ignored. The following key/value pairs are defined: *'port'*:: The UDP/TCP port to use. Default is '9929'. *'transport'*:: - The transport protocol to use for PAXOS exchanges. + The transport protocol to use for Raft exchanges. Currently only UDP is available. + Please note that the client mode always uses TCP to talk to a daemon; Booth will always bind and listen to *both* UDP and TCP ports. *'site'*, *'arbitrator'*:: - Defines a PAXOS member with the given IP, which should be a service IP. + Defines a Raft member with the given IP, which should be a service IP. + You will need at least three members for normal operation; an odd number is preferred. *'ticket'*:: Registers a ticket. Multiple tickets can be handled in a single Booth instance. The next items modify per-ticket defaults. They are stored as defaults for further tickets, and are used as value for the last defined ticket (if any). *'expire'*:: - The lease time for a ticket, in seconds. After that time the ticket gets + The lease time for a ticket, in seconds. After that time the ticket can be revoked, and another site can get it. + Typically 'booth' will try to renew a held ticket after half the lease time. *'timeout'*:: After that time 'booth' will re-send packets if there was an insufficient number of replies. + The default is '3'. *'weights'*:: A comma-separated list of integers that define the weight of individual - PAXOS members, in the same order as the 'site' and 'arbitrator' lines. + Raft members, in the same order as the 'site' and 'arbitrator' lines. + Default is '0' for all; this means that the ordering within the configuration file defines a kind of priority for conflicting requests. *'acquire-after'*:: Setting this to a positive value will make 'booth' try to acquire a ticket that got lost. + Ie. if the site that _had_ the ticket is not reachable any more, then 'acquire-after' seconds after ticket expiration other sites will try to activate the ticket. (Only one will succeed, though.) + A typical delay might be 60 seconds. *'retries'*:: Defines how often broadcast packets are sent out before the current action (grant, revoke) is aborted. + Default is 10; values lower than 3 are forbidden, and high values won't make much sense, too. + Please note that this counts only for a single packet; if ticket *renewal* runs into this limit (because the network was temporarily down), but the ticket is still valid afterwards, a new renewal run will be started automatically. *'site-user'*, *'site-group'*, *'arbitrator-user'*, *'arbitrator-group'*:: These define the credentials 'boothd' will be running with. + On a (Pacemaker) site the booth process will have to call 'crm_ticket', so the default is to use 'hacluster':'haclient'; for an arbitrator this user and group might not exists, so that will default to 'nobody':'nobody'. -*'before-acquire-handler':: +*'before-acquire-handler'*:: If set, this script/program will be called before 'boothd' tries to acquire or renew a ticket. Only a clean exit will allow 'boothd' to proceed; any other return value will cancel the operation. + This makes it possible to check whether it makes sense to try to acquire the ticket; eg. if a service in the dependency-chain has a failcount of 'INFINITY' on all available nodes, the service will be unable to run - and so another cluster (and not this one!) should try to start it. + Please assume that 'boothd' will wait synchronously for the result of that call, so having that program return quickly would be an advantage. + Please see below for details about available environment variables. A more verbose example of a configuration file might be ----------------------- transport = udp port = 9930 # D-85774 site="192.168.201.100" # D-90409 site="::ffff:192.168.202.100" # A-1120 arbitrator="192.168.203.100" ticket="I-want-a-pony" expire = 600 acquire-after = 60 timeout = 10 retries = 5 ----------------------- NOTES ----- Please note that Booth tickets are not meant to be real-time - a reasonable 'expire' time might be 300 seconds (5 minutes). Due to possible delays on the WAN connections it makes no sense to expect detection of problems and failover within a few seconds. 'booth' works with IPv6 addresses, too. 'booth' will start to renew a ticket before it expires, to account for transmission delays. This will happen so that (the bigger one of) half the 'expire' time, or 'timeout'*'retries'/2 seconds, will be left for the renewal. Of course, that means that with bad configuration values (eg. 'expire' 60 seconds, 'timeout' 3 seconds, and 'retries' > 40) the ticket renewal process will be started just after the ticket got acquired. HANDLERS -------- Currently, there's only one external handler defined (see the 'before-acquire-handler' configuration item above). It gets the following data via the environment: *'BOOTH_TICKET':: The ticket name, as given in the configuration file. (See 'ticket' item above.) *'BOOTH_LOCAL':: The local site specification, as defined in 'site'. *'BOOTH_CONF_PATH':: The path to the active configuration file. *'BOOTH_CONF_NAME':: The configuration name, as used by the '-c' commandline argument. *'BOOTH_TICKET_EXPIRES':: Timestamp for the ticket expiration (seconds since 1.1.1970), or '0'. FILES ----- *'/etc/booth/booth.conf'*:: The default configuration file name. See also the '-c' argument. *'/var/run/booth/'*:: Directory that holds PID/lock files. See also the 'status' command. +RAFT IMPLEMENTATION +------------------- + +Basically, each Pacemaker ticket corresponds to a separate Raft cluster. + +A ticket is granted _only_ to the Raft _Leader_, but a Leader needs not grant the ticket to Pacemaker. +To move a ticket, the Leader withdraws, and votes for the new Leader instead. + +So, the Raft "log" consists of -- nothing, more or less; there's no history to keep. + + SYSTEMD INTEGRATION ------------------- The Booth sources (and, very likely, packages too) include a 'systemd' unit file for 'boothd'. So don't forget to install 'boothd' into 'systemd' after configuration! ----------- # systemctl enable booth@{configurationname}.service # systemctl start booth@{configurationname}.service ----------- EXIT STATUS ----------- *0*:: Success. For the 'status' command: Daemon running. *1* (PCMK_OCF_UNKNOWN_ERROR):: General error code. *7* (PCMK_OCF_NOT_RUNNING):: No daemon process for that configuration active. BUGS ---- Probably. Please report them on GitHub: AUTHOR ------ 'boothd' was originally written (mostly) by Jiaju Zhang. Many people have contributed to it. In 2013 Philipp Marek took over maintainership. RESOURCES --------- GitHub: Documentation: COPYING ------- Copyright (C) 2011 Jiaju Zhang Copyright (C) 2013-2014 Philipp Marek Free use of this software is granted under the terms of the GNU General Public License (GPL). +// vim: set ft=asciidoc : diff --git a/script/unit-test.py b/script/unit-test.py index 7498b21..a17cc0b 100755 --- a/script/unit-test.py +++ b/script/unit-test.py @@ -1,625 +1,627 @@ #!/usr/bin/python # vim: fileencoding=utf-8 # see http://stackoverflow.com/questions/728891/correct-way-to-define-python-source-code-encoding import os, sys, time, signal, tempfile, socket, posix, time import re, shutil, pexpect, logging, pprint import random, copy, glob, traceback # Don't make that much sense - function/line is write(). # Would have to use traceback.extract_stack() manually. # %(funcName)10.10s:%(lineno)3d %(levelname)8s # The second ":" is to get correct syntax highlightning, # eg. messages with ERROR etc. are red in vim. default_log_format = '%(asctime)s: : %(message)s' default_log_datefmt = '%b %d %H:%M:%S' # {{{ pexpect-logging glue # needed for use as pexpect.logfile, to relay into existing logfiles class expect_logging(): prefix = "" test = None def __init__(self, pre, inst): self.prefix = pre self.test = inst def flush(self, *arg): pass def write(self, stg): if self.test.dont_log_expect == 0: # TODO: split by input/output, give program for line in re.split(r"[\r\n]+", stg): if line == self.test.prompt: continue if line == "": continue logging.debug(" " + self.prefix + " " + line) # }}} # {{{ dictionary plus second hash class dict_plus(dict): def __init__(self): self.aux = dict() # def aux(self): # return self.aux # }}} class UT(): # {{{ Members binary = None test_base = None lockfile = None defaults = None this_port = None this_site = "127.0.0.1" this_site_id = None running = False gdb = None booth = None prompt = "CUSTOM-GDB-PROMPT-%d-%d" % (os.getpid(), time.time()) dont_log_expect = 0 current_nr = None udp_sock = None # http://stackoverflow.com/questions/384076/how-can-i-color-python-logging-output BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) # }}} # {{{ setup functions @classmethod def _filename(cls, desc): return "/tmp/booth-unittest.%s" % desc return "/tmp/booth-unittest.%d.%s" % (os.getpid(), desc) def __init__(self, bin, dir): self.binary = os.path.realpath(bin) self.test_base = os.path.realpath(dir) + "/" self.defaults = self.read_test_input(self.test_base + "_defaults.txt", state="ticket") self.lockfile = UT._filename("lock") self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) def read_test_input(self, file, state=None, m = dict()): fo = open(file, "r") state = None line_nr = 0 for line in fo.readlines(): line_nr += 1 # comment? if re.match(r"^\s*#", line): continue # empty line if re.match(r"^\s*$", line): continue # message resp. ticket # We allow a comment to have something to write out to screen res = re.match(r"^\s*(\w+)\s*:(?:\s*(#.*?\S))?\s*$", line) if res: state = res.group(1) if not m.has_key(state): m[state] = dict_plus() if res.group(2): m[state].aux["comment"] = res.group(2) m[state].aux["line"] = line_nr continue assert(state) res = re.match(r"^\s*(\S+)\s*(.*)\s*$", line) if res: m[state][ res.group(1) ] = res.group(2) return m def setup_log(self, **args): global default_log_format global default_log_datefmt this_test_log = logging.FileHandler( mode = "w", **args ) this_test_log.setFormatter( logging.Formatter(fmt = default_log_format, datefmt = default_log_datefmt) ) this_test_log.emit( logging.makeLogRecord( { "msg": "## vim: set ft=messages : ##", "lineno": 0, "levelname": "None", "level": None,} ) ) # in the specific files we want ALL information this_test_log.setLevel(logging.DEBUG) logging.getLogger('').addHandler(this_test_log) return this_test_log def running_on_console(self): return sys.stdout.isatty() def colored_string(self, stg, color): if self.running_on_console(): return "\033[%dm%s\033[0m" % (30+color, stg) return stg # We want shorthand in descriptions, ie. "state" # instead of "booth_conf->ticket[0].state". def translate_shorthand(self, name, context): if context == 'ticket': return "booth_conf->ticket[0]." + name if context == 'message': return "msg->" + name if context == 'inject': return "ntohl(((struct boothc_ticket_msg *)buf)->" + name + ")" assert(False) def stop_processes(self): if os.access(self.lockfile, os.F_OK): os.unlink(self.lockfile) # In case the boothd process is already dead, isalive() would still return True # (because GDB still has it), but terminate() does fail. # So we just quit GDB, and that might take the boothd with it - # if not, we terminate it ourselves. if self.gdb: self.gdb.close( force=True ); self.drain_booth_log() if self.booth: self.booth.close( force=self.booth.isalive() ) def start_a_process(self, bin, env_add=[], **args): name = re.sub(r".*/", "", bin) # How to get stderr, too? expct = pexpect.spawn(bin, env = dict( os.environ.items() + [('PATH', self.test_base + "/bin/:" + os.getenv('PATH')), ('UNIT_TEST_PATH', self.test_base), ('LC_ALL', 'C'), ('LANG', 'C')] + env_add ), timeout = 30, maxread = 32768, **args) expct.setecho(False) expct.logfile_read = expect_logging("<- %s" % name, self) expct.logfile_send = expect_logging(" -> %s" % name, self) return expct def start_processes(self, test): self.booth = self.start_a_process(self.binary, args = [ "daemon", "-D", "-c", self.test_base + "/booth.conf", "-s", "127.0.0.1", "-l", self.lockfile, ], env_add=[ ('UNIT_TEST', test), ('UNIT_TEST_FILE', os.path.realpath(test)), # provide some space, so that strcpy(getenv()) works ('UNIT_TEST_AUX', "".zfill(1024)), ]); logging.info("started booth with PID %d, lockfile %s" % (self.booth.pid, self.lockfile)) self.booth.expect("BOOTH site daemon is starting", timeout=2) #print self.booth.before; exit self.gdb = self.start_a_process("gdb", args=["-quiet", "-p", str(self.booth.pid), # Don't use .gdbinit "-nx", "-nh", # Run until the defined point. # This is necessary so that ticket state setting doesn't # happen _before_ the call to pcmk_load_ticket() # (which would overwrite our data) "-ex", "break ticket_cron", "-ex", "continue", ]) logging.info("started GDB with PID %d" % self.gdb.pid) self.gdb.expect("(gdb)") self.gdb.sendline("set pagination off\n") self.gdb.sendline("set interactive-mode off\n") self.gdb.sendline("set verbose off\n") ## sadly to late for the initial "symbol not found" messages self.gdb.sendline("set prompt " + self.prompt + "\\n\n"); self.sync(2000) # Only stop for this recipient, so that broadcasts are not seen multiple times self.send_cmd("break booth_udp_send if to == &(booth_conf->site[1])") self.send_cmd("break recvfrom") # ticket_cron is still a breakpoint # Now we're set up. self.this_site_id = self.query_value("local->site_id") self.this_port = int(self.query_value("booth_conf->port")) # do a self-test assert(self.check_value("local->site_id", self.this_site_id)) self.running = False # }}} # {{{ GDB communication def sync(self, timeout=-1): self.gdb.expect(self.prompt, timeout) answer = self.gdb.before self.dont_log_expect += 1 # be careful not to use RE characters like +*.[] etc. r = str(random.randint(2**19, 2**20)) self.gdb.sendline("print " + r) self.gdb.expect(r, timeout) self.gdb.expect(self.prompt, timeout) self.dont_log_expect -= 1 return answer # send a command to GDB, returning the GDB answer as string. def drain_booth_log(self): try: self.booth.read_nonblocking(64*1024, 0) except pexpect.EOF: pass except pexpect.TIMEOUT: pass finally: pass def send_cmd(self, stg, timeout=-1): # give booth a chance to get its messages out self.drain_booth_log() self.gdb.sendline(stg) return self.sync(timeout=timeout) def _query_value(self, which): val = self.send_cmd("print " + which) cleaned = re.search(r"^\$\d+ = (.*\S)\s*$", val, re.MULTILINE) if not cleaned: self.user_debug("query failed") return cleaned.group(1) def query_value(self, which): res = self._query_value(which) logging.debug("query_value: «%s» evaluates to «%s»" % (which, res)) return res def check_value(self, which, value): val = self._query_value("(" + which + ") == (" + value + ")") logging.debug("check_value: «%s» is «%s»: %s" % (which, value, val)) if val == "1": return True # for easier (test) debugging we'll show the _real_ value, too. want = self._query_value(value) # Order is important, so that next query works!! has = self._query_value(which) # for informational purposes self._query_value('state_to_string($$)') logging.error("«%s»: got «%s», expected «%s». ERROR." % (which, has, want)) return False # Send data to GDB, to inject them into the binary. # Handles different data types def set_val(self, name, value, numeric_conv=None): logging.debug("setting value «%s» to «%s» (num_conv %s)" %(name, value, numeric_conv)) res = None # string value? if re.match(r'^"', value): res = self.send_cmd("print strcpy(" + name + ", " + value + ")") - if re.match(r"^'", value): + elif re.match(r"^'", value): # single-quoted; GDB only understands double quotes. v1 = re.sub(r"^'", '', value) v2 = re.sub(r"'$", '', v1) # TODO: replace \\\\" etc. v3 = re.sub(r'"', '\\"', v2) res = self.send_cmd("print strcpy(" + name + ', "' + v3 + '")') # numeric elif numeric_conv: res = self.send_cmd("set variable " + name + " = " + numeric_conv + "(" + value + ")") else: res = self.send_cmd("set variable " + name + " = " + value) for r in [r"There is no member named", r"Structure has no component named", r"No symbol .* in current context", ]: assert(not re.search(r, res, re.MULTILINE)) logging.debug("set_val %s done" % name) # }}} GDB communication # there has to be some event waiting, so that boothd stops again. def continue_debuggee(self, timeout=30): res = None if not self.running: res = self.send_cmd("continue", timeout) self.drain_booth_log() return res # {{{ High-level functions. # Generally, GDB is attached to BOOTHD, and has it stopped. def set_state(self, kv): if not kv: return self.current_nr = kv.aux.get("line") #os.system("strace -f -tt -s 2000 -e write -p" + str(self.gdb.pid) + " &") for n, v in kv.iteritems(): self.set_val( self.translate_shorthand(n, "ticket"), v) logging.info("set state") def user_debug(self, txt): logging.error("Problem detected: %s", txt) logging.info(self.gdb.buffer) if not sys.stdin.isatty(): logging.error("Not a terminal, stopping.") else: print "\n\nEntering interactive mode.\n\n" self.gdb.sendline("set prompt GDB> \n") self.gdb.setecho(True) # can't use send_cmd, doesn't reply with expected prompt anymore. self.gdb.interact() #while True: # sys.stdout.write("GDB> ") # sys.stdout.flush() # x = sys.stdin.readline() # if not x: # break # self.send_cmd(x) self.stop_processes() sys.exit(1) def wait_for_function(self, fn, timeout=20): until = time.time() + timeout while True: stopped_at = self.continue_debuggee(timeout=3) if not stopped_at: self.user_debug("Not stopped at any breakpoint?") + if re.search(r"^Program received signal SIGABRT,", stopped_at, re.MULTILINE): + self.user_debug("assert() failed") if re.search(r"^Program received signal SIGSEGV,", stopped_at, re.MULTILINE): self.user_debug("Segfault") if re.search(r"^Breakpoint \d+, (0x\w+ in )?%s " % fn, stopped_at, re.MULTILINE): break if time.time() > until: self.user_debug("Didn't stop in function %s" % fn) logging.info("Now in %s" % fn) # We break, change the data, and return the correct size. def send_message(self, msg): self.udp_sock.sendto('a', (socket.gethostbyname(self.this_site), self.this_port)) self.wait_for_function("recvfrom") # drain input, but stop afterwards for changing data self.send_cmd("finish") # step over length assignment self.send_cmd("next") # push message. for (n, v) in msg.iteritems(): self.set_val( self.translate_shorthand(n, "message"), v, "htonl") # set "received" length self.set_val("rv", "msg->header.length", "ntohl") # the next thing should run continue via wait_for_function def wait_outgoing(self, msg): self.wait_for_function("booth_udp_send") ok = True for (n, v) in msg.iteritems(): if re.search(r"\.", n): ok = self.check_value( self.translate_shorthand(n, "inject"), v) and ok else: ok = self.check_value( self.translate_shorthand(n, "ticket"), v) and ok if not ok: sys.exit(1) logging.info("out gone") #stopped_at = self.sync() def merge_dicts(self, base, overlay): return dict(base.items() + overlay.items()) def loop(self, fn, data): matches = map(lambda k: re.match(r"^(outgoing|message)(\d+)$", k), data.iterkeys()) valid_matches = filter(None, matches) nums = map(lambda m: int(m.group(2)), valid_matches) loop_max = max(nums) for counter in range(0, loop_max+1): # incl. last message kmsg = 'message%d' % counter msg = data.get(kmsg) ktkt = 'ticket%d' % counter tkt = data.get(ktkt) kout = 'outgoing%d' % counter out = data.get(kout) kgdb = 'gdb%d' % counter gdb = data.get(kgdb) if not any([msg, out, tkt]): continue logging.info("Part %d" % counter) if tkt: self.current_nr = tkt.aux.get("line") comment = tkt.aux.get("comment", "") logging.info("ticket change %s (%s:%d) %s" % (ktkt, fn, self.current_nr, comment)) self.set_state(tkt) if msg: self.current_nr = msg.aux.get("line") comment = msg.aux.get("comment", "") logging.info("sending %s (%s:%d) %s" % (kmsg, fn, self.current_nr, comment)) self.send_message(self.merge_dicts(data["message"], msg)) if gdb: for (k, v) in gdb.iteritems(): self.send_cmd(k + " " + v.replace("§", "\n")) if data.has_key(kgdb) and len(gdb) == 0: self.user_debug("manual override") if out: self.current_nr = out.aux.get("line") comment = out.aux.get("comment", "") logging.info("waiting for %s (%s:%d) %s" % (kout, fn, self.current_nr, comment)) self.wait_outgoing(out) logging.info("loop ends") def let_booth_go_a_bit(self): self.drain_booth_log() logging.debug("running: %d" % self.running) if not self.running: self.gdb.sendline("continue") time.sleep(1) self.drain_booth_log() # stop it - via GDB! self.gdb.sendintr() # If we sent the signal to booth, the next # "print state_to_string()" or "continue" # might catch the signal - and fail to do # what we want/need. # # This additional signal seems to be unnecessary. #posix.kill(self.gdb.pid, signal.SIGINT) # In case it's really needed we should drain booth's signals queue, # eg. by sending "print getpid()" twice, before the sync() call. self.running = False self.sync(2000) def do_finally(self, data): if not data: return self.current_nr = data.aux.get("line") # Allow debuggee to reach a stable state self.let_booth_go_a_bit() ok = True for (n, v) in data.iteritems(): ok = self.check_value( self.translate_shorthand(n, "ticket"), v) and ok if not ok: sys.exit(1) def run(self, start_from="000", end_with="999"): os.chdir(self.test_base) # TODO: sorted, random order tests = filter( (lambda f: re.match(r"^\d\d\d_.*\.txt$", f)), glob.glob("*")) tests.sort() failed = 0 for f in tests: if f[0:3] < start_from: continue if f[0:3] > end_with: continue log = None logfn = UT._filename(f) if self.running_on_console(): sys.stdout.write("\n") self.current_nr = "setup" try: log = self.setup_log(filename = logfn) log.setLevel(logging.DEBUG) logging.error(self.colored_string("Starting test '%s'" % f, self.BLUE) + ", logfile " + logfn) self.start_processes(f) test = self.read_test_input(f, m=copy.deepcopy(self.defaults)) logging.debug("data: %s" % pprint.pformat(test, width = 200)) self.set_state(test.get("ticket")) self.loop(f, test) self.do_finally(test.get("finally")) self.current_nr = "teardown" logging.warn(self.colored_string("Finished test '%s' - OK" % f, self.GREEN)) except: failed += 1 logging.error(self.colored_string("Broke in %s:%s %s" % (f, self.current_nr, sys.exc_info()), self.RED)) for frame in traceback.format_tb(sys.exc_traceback): logging.info(" - %s " % frame.rstrip()) finally: self.stop_processes() if log: log.close() logging.getLogger("").removeHandler(log) if self.running_on_console(): sys.stdout.write("\n") return failed # }}} #def traceit(frame, event, arg): # if event == "line": # lineno = frame.f_lineno # print frame.f_code.co_filename, ":", "line", lineno # return traceit # {{{ main if __name__ == '__main__': if os.geteuid() == 0: sys.stderr.write("Must be run non-root; aborting.\n") sys.exit(1) ut = UT(sys.argv[1], sys.argv[2] + "/") # "master" log object needs max level logging.basicConfig(level = logging.DEBUG, filename = "/dev/null", filemode = "a", format = default_log_format, datefmt = default_log_datefmt) # make sure no old processes are active anymore os.system("killall boothd > /dev/null 2> /dev/null") overview_log = ut.setup_log( filename = UT._filename('seq') ) overview_log.setLevel(logging.WARN) # http://stackoverflow.com/questions/9321741/printing-to-screen-and-writing-to-a-file-at-the-same-time console = logging.StreamHandler() console.setFormatter(logging.Formatter(' # %(message)s')) console.setLevel(logging.WARN) logging.getLogger('').addHandler(console) logging.info("Starting boothd unit tests.") #sys.settrace(traceit) starting = "0" if len(sys.argv) > 3: starting = sys.argv[3] ending = "999" if len(sys.argv) > 4: ending = sys.argv[4] ret = ut.run(starting, ending) sys.exit(ret) # }}} diff --git a/script/wireshark-dissector.lua b/script/wireshark-dissector.lua index f227ac0..c9d3ed8 100644 --- a/script/wireshark-dissector.lua +++ b/script/wireshark-dissector.lua @@ -1,67 +1,67 @@ -- dofile("wireshark-dissector.lua") -- do booth_proto = Proto("Booth","Booth") function T32(tree, buffer, start, format) local b = buffer(start, 4) return tree:add(b, string.format(format, b:uint())) end function booth_proto.dissector(buffer, pinfo, tree) local endbuf = buffer:len() pinfo.cols.protocol = "Booth" if (endbuf < 24) then pinfo.cols.info = "Booth - too small" else local hdr = tree:add(booth_proto, buffer(0, 24), "Booth header") local cmd = buffer(28, 4) local tcmd = T32(hdr, cmd, 0, "Cmd %08x, \"" .. cmd:string() .. "\""); local from = buffer(20, 4) local tfrom = T32(hdr, from, 0, "From %08x"); if bit.band(from:uint(), 0x80000000) > 0 then tfrom:add_expert_info(PI_PROTOCOL, PI_WARN, "Highest bit set") end local len = buffer(24, 4) local tlen = T32(hdr, len, 0, "Length %8d"); if len:uint() > 1000 then tlen:add_expert_info(PI_PROTOCOL, PI_WARN, "Length too big?") end T32(hdr, buffer, 32, "Result %08x"); T32(hdr, buffer, 12, "Magic %08x"); T32(hdr, buffer, 16, "Version %08x"); T32(hdr, buffer, 0, "IV %08x"); T32(hdr, buffer, 4, "Auth1 %08x"); T32(hdr, buffer, 8, "Auth2 %08x"); if (endbuf > 36) then local tick = tree:add(booth_proto, buffer(36, endbuf-36), "Booth data") local name = buffer(36, 64) tick:add(name, "Ticket name: ", name:string()) - T32(tick, buffer, 36+64 + 0, "Owner: %08x") - T32(tick, buffer, 36+64 + 4, "Ballot: %08x") - T32(tick, buffer, 36+64 + 8, "Prev. Ballot: %08x") - T32(tick, buffer, 36+64 + 12, "Expiry: %8d") + T32(tick, buffer, 36+64 + 0, "Leader: %08x") + T32(tick, buffer, 36+64 + 4, "Term: %08x") + T32(tick, buffer, 36+64 + 8, "Term valid for: %08x") + T32(tick, buffer, 36+64 + 12, "Leader commit: %8d") end pinfo.cols.info = "Booth, cmd " .. cmd:string() end tree:add(booth_proto, buffer(0, endbuf), "data") end local tbl = DissectorTable.get("udp.port") tbl:add(9929, booth_proto) local tbl = DissectorTable.get("tcp.port") tbl:add(9929, booth_proto) end diff --git a/src/Makefile.am b/src/Makefile.am index 3f4689f..4292a7a 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,33 +1,33 @@ MAINTAINERCLEANFILES = Makefile.in AM_CFLAGS = -fPIC -Werror -funsigned-char -Wno-pointer-sign AM_CPPFLAGS = -I$(top_builddir)/include sbin_PROGRAMS = boothd -boothd_SOURCES = config.c main.c paxos.c ticket.c transport.c \ +boothd_SOURCES = config.c main.c raft.c ticket.c transport.c \ pacemaker.c handler.c boothd_LDFLAGS = $(OS_DYFLAGS) -L./ boothd_LDADD = -lplumb -lplumbgpl -lz -lm boothd_CPPFLAGS = $(GLIB_CFLAGS) noinst_HEADERS = booth.h pacemaker.h \ - config.h log.h paxos.h ticket.h transport.h handler.h + config.h log.h raft.h ticket.h transport.h handler.h if HAVE_HELP2MAN man_MANS = booth.8 boothd.8 MAINTAINERCLEANFILES += $(man_MANS) EXTRA_DIST = $(man_MANS) %.8: % $(HELP2MAN) -s 8 -N -o $@ ./$< booth.8: boothd.8 cp $< $@ endif lint: -splint $(INCLUDES) $(LINT_FLAGS) $(CFLAGS) *.c diff --git a/src/booth.h b/src/booth.h index f81406d..eb1a802 100644 --- a/src/booth.h +++ b/src/booth.h @@ -1,260 +1,251 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _BOOTH_H #define _BOOTH_H #include #include #include #include #include #define BOOTH_RUN_DIR "/var/run/booth/" #define BOOTH_LOG_DIR "/var/log" #define BOOTH_LOGFILE_NAME "booth.log" #define BOOTH_DEFAULT_CONF_DIR "/etc/booth/" #define BOOTH_DEFAULT_CONF_NAME "booth" #define BOOTH_DEFAULT_CONF_EXT ".conf" #define BOOTH_DEFAULT_CONF \ BOOTH_DEFAULT_CONF_DIR BOOTH_DEFAULT_CONF_NAME BOOTH_DEFAULT_CONF_EXT #define DAEMON_NAME "boothd" #define BOOTH_PATH_LEN 127 #define BOOTH_DEFAULT_PORT 9929 /* TODO: remove */ #define BOOTH_PROTO_FAMILY AF_INET #define BOOTHC_MAGIC 0x5F1BA08C #define BOOTHC_VERSION 0x00010002 /** Timeout value for poll(). * Determines frequency of periodic jobs, eg. when send-retries are done. * See process_tickets(). */ #define POLL_TIMEOUT 1000 /** @{ */ /** The on-network data structures and constants. */ #define BOOTH_NAME_LEN 64 -#define NO_OWNER (-1) +#define CHAR2CONST(a,b,c,d) ((a << 24) | (b << 16) | (c << 8) | d) + + +/* Says that the ticket shouldn't be active anywhere. + * NONE wouldn't be specific enough. */ +#define NO_ONE (-1) +/* Says that another one should recover. */ +#define TICKET_LOST CHAR2CONST('L', 'O', 'S', 'T') + typedef unsigned char boothc_site [BOOTH_NAME_LEN]; typedef unsigned char boothc_ticket[BOOTH_NAME_LEN]; struct boothc_header { /** Authentication data; not used now. */ uint32_t iv; uint32_t auth1; uint32_t auth2; /** BOOTHC_MAGIC */ uint32_t magic; /** BOOTHC_VERSION */ uint32_t version; /** Packet source; site_id. See add_site(). */ uint32_t from; /** Length including header */ uint32_t length; /** The command respectively protocol state. See cmd_request_t. */ uint32_t cmd; /** Result of operation. 0 == OK */ uint32_t result; char data[0]; } __attribute__((packed)); struct ticket_msg { /** Ticket name. */ boothc_ticket id; - /** Owner. May be NO_OWNER. See add_site(). */ - uint32_t owner; + /** Current leader. May be NO_ONE. See add_site(). + * For a OP_REQ_VOTE this is */ + uint32_t leader; - /** Current ballot number. Might be < prev_ballot if overflown. */ - uint32_t ballot; - /** Previous ballot. */ - uint32_t prev_ballot; + /** Current term. */ + uint32_t term; + uint32_t term_valid_for; - /* Would we want to say _whose_ proposal is more important - * when sending OP_REJECTED ? */ + /* Perhaps we need to send a status along, too - like + * starting, running, stopping, error, ...? */ - /** Seconds until expiration. */ - uint32_t expiry; + uint32_t leader_commit; // TODO: NEEDED? } __attribute__((packed)); struct boothc_ticket_msg { struct boothc_header header; struct ticket_msg ticket; } __attribute__((packed)); -/** State and message IDs. - * - * These numbers are unlikely to conflict with other enums. - * All have to be swabbed to network order before sending. - * - * \dot - * digraph states { - * node [shape=box]; - * ST_INIT [label="ST_INIT"]; - * - * subgraph messages { // messages - * rank=same; - * node [shape=point, rank=same]; - * edge [style=tapered, penwidth=3, arrowtail=none, arrowhead=none, dir=forward]; - * - * ST_INIT:e -> ST_INITs [label="sends out CMD_CATCHUP"]; - * } - * - * ST_INIT -> ST_STABLE [label="recv CMR_CATCHUP"]; - * ST_STABLE; - * - * ST_STABLE -> OP_PROPOSING [label="booth call to assign ticket"]; - * } - * \enddot - * - * */ -#define CHAR2CONST(a,b,c,d) ((a << 24) | (b << 16) | (c << 8) | d) -#define STG2CONST(X) ({ const char _ggg[4] = X; return (uint32_t*)_ggg; }) typedef enum { /* 0x43 = "C"ommands */ CMD_LIST = CHAR2CONST('C', 'L', 's', 't'), CMD_GRANT = CHAR2CONST('C', 'G', 'n', 't'), CMD_REVOKE = CHAR2CONST('C', 'R', 'v', 'k'), - CMD_CATCHUP = CHAR2CONST('C', 'C', 't', 'p'), /* Replies */ CMR_GENERAL = CHAR2CONST('G', 'n', 'l', 'R'), // Increase distance to CMR_GRANT CMR_LIST = CHAR2CONST('R', 'L', 's', 't'), CMR_GRANT = CHAR2CONST('R', 'G', 'n', 't'), CMR_REVOKE = CHAR2CONST('R', 'R', 'v', 'k'), - CMR_CATCHUP = CHAR2CONST('R', 'C', 't', 'p'), - - /* Paxos */ - OP_PREPARING = CHAR2CONST('P', 'r', 'e', 'p'), - OP_PROMISING = CHAR2CONST('P', 'r', 'o', 'm'), - OP_PROPOSING = CHAR2CONST('P', 'r', 'o', 'p'), - OP_ACCEPTING = CHAR2CONST('A', 'c', 'p', 't'), - OP_RECOVERY = CHAR2CONST('R', 'c', 'v', 'y'), - OP_COMMITTED = CHAR2CONST('C', 'm', 'm', 't'), - OP_REJECTED = CHAR2CONST('R', 'J', 'C', '!'), - - /* These are not used over the wire */ - ST_INIT = CHAR2CONST('I', 'n', 'i', 't'), - ST_STABLE = CHAR2CONST('S', 't', 'b', 'l'), + + /* Raft */ + OP_REQ_VOTE = CHAR2CONST('R', 'V', 'o', 't'), + OP_VOTE_FOR = CHAR2CONST('V', 't', 'F', 'r'), + OP_HEARTBEAT= CHAR2CONST('H', 'r', 't', 'B'), /* AppendEntry in Raft */ + OP_MY_INDEX = CHAR2CONST('M', 'I', 'd', 'x'), /* Answer to Heartbeat */ + OP_REJECTED = CHAR2CONST('R', 'J', 'C', '!'), } cmd_request_t; /* TODO: make readable constants */ typedef enum { /* for compatibility with other functions */ RLT_SUCCESS = 0, RLT_ASYNC = CHAR2CONST('A', 's', 'y', 'n'), RLT_SYNC_SUCC = CHAR2CONST('S', 'c', 'c', 's'), RLT_SYNC_FAIL = CHAR2CONST('F', 'a', 'i', 'l'), RLT_INVALID_ARG = CHAR2CONST('I', 'A', 'r', 'g'), RLT_OVERGRANT = CHAR2CONST('O', 'v', 'e', 'r'), RLT_PROBABLY_SUCCESS = CHAR2CONST('S', 'u', 'c', '?'), RLT_BUSY = CHAR2CONST('B', 'u', 's', 'y'), + RLT_TERM_OUTDATED = CHAR2CONST('T', 'O', 'd', 't'), + RLT_TERM_STILL_VALID = CHAR2CONST('T', 'V', 'l', 'd'), } cmd_result_t; /** @} */ /** @{ */ struct booth_site { /** Calculated ID. See add_site(). */ int site_id; int type; int local; /** Roles, like ACCEPTOR, PROPOSER, or LEARNER. Not really used ATM. */ int role; char addr_string[BOOTH_NAME_LEN]; int tcp_fd; int udp_fd; /* 0-based, used for indexing into per-ticket weights */ int index; uint64_t bitmask; unsigned short family; union { struct sockaddr_in sa4; struct sockaddr_in6 sa6; }; int saddrlen; int addrlen; } __attribute__((packed)); extern struct booth_site *local; +extern struct booth_site * no_leader; /** @} */ struct booth_transport; struct client { int fd; const struct booth_transport *transport; void (*workfn)(int); void (*deadfn)(int); }; extern struct client *clients; extern struct pollfd *pollfds; int client_add(int fd, const struct booth_transport *tpt, void (*workfn)(int ci), void (*deadfn)(int ci)); int do_read(int fd, void *buf, size_t count); int do_write(int fd, void *buf, size_t count); void process_connection(int ci); void safe_copy(char *dest, char *value, size_t buflen, const char *description); struct command_line { int type; /* ACT_ */ int op; /* OP_ */ char configfile[BOOTH_PATH_LEN]; char lockfile[BOOTH_PATH_LEN]; char site[BOOTH_NAME_LEN]; struct boothc_ticket_msg msg; }; extern struct command_line cl; + + + +/* http://gcc.gnu.org/onlinedocs/gcc/Typeof.html */ +#define min(a__,b__) \ + ({ typeof (a__) _a = (a__); \ + typeof (b__) _b = (b__); \ + _a < _b ? _a : _b; }) +#define max(a__,b__) \ + ({ typeof (a__) _a = (a__); \ + typeof (b__) _b = (b__); \ + _a > _b ? _a : _b; }) + + + + + #endif /* _BOOTH_H */ diff --git a/src/config.c b/src/config.c index 3394d5c..1210ce0 100644 --- a/src/config.c +++ b/src/config.c @@ -1,716 +1,710 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include "booth.h" #include "config.h" -#include "paxos.h" +#include "raft.h" #include "ticket.h" #include "log.h" static int ticket_size = 0; static int ticket_realloc(void) { const int added = 5; int had, want; void *p; had = booth_conf->ticket_allocated; want = had + added; p = realloc(booth_conf->ticket, sizeof(struct ticket_config) * want); if (!booth_conf) { log_error("can't alloc more tickets"); return -ENOMEM; } booth_conf->ticket = p; memset(booth_conf->ticket + had, 0, sizeof(struct ticket_config) * added); booth_conf->ticket_allocated = want; return 0; } int add_site(char *address, int type); int add_site(char *addr_string, int type) { int rv; struct booth_site *site; uLong nid; uint32_t mask; rv = 1; if (booth_conf->site_count == MAX_NODES) { log_error("too many nodes"); goto out; } if (strlen(addr_string)+1 >= sizeof(booth_conf->site[0].addr_string)) { log_error("site address \"%s\" too long", addr_string); goto out; } site = booth_conf->site + booth_conf->site_count; site->family = BOOTH_PROTO_FAMILY; site->type = type; /* Make site_id start at a non-zero point. * Perhaps use hash over string or address? */ strcpy(site->addr_string, addr_string); nid = crc32(0L, NULL, 0); /* booth_config() uses memset(), so sizeof() is guaranteed to give * the same result everywhere - no uninitialized bytes. */ site->site_id = crc32(nid, site->addr_string, sizeof(site->addr_string)); - /* Make sure we will never collide with NO_OWNER, + /* Make sure we will never collide with NO_ONE, * or be negative (to get "get_local_id() < 0" working). */ mask = 1 << (sizeof(site->site_id)*8 -1); - assert(NO_OWNER & mask); + assert(NO_ONE & mask); site->site_id &= ~mask; site->index = booth_conf->site_count; site->bitmask = 1 << booth_conf->site_count; /* Catch site overflow */ assert(site->bitmask); booth_conf->site_bits |= site->bitmask; site->tcp_fd = -1; - if (site->type == SITE) - site->role = PROPOSER | ACCEPTOR | LEARNER; - else if (site->type == ARBITRATOR) - site->role = ACCEPTOR | LEARNER; - booth_conf->site_count++; rv = 0; memset(&site->sa6, 0, sizeof(site->sa6)); if (inet_pton(AF_INET, site->addr_string, &site->sa4.sin_addr) > 0) { site->family = AF_INET; site->sa4.sin_family = site->family; site->sa4.sin_port = htons(booth_conf->port); site->saddrlen = sizeof(site->sa4); site->addrlen = sizeof(site->sa4.sin_addr); } else if (inet_pton(AF_INET6, site->addr_string, &site->sa6.sin6_addr) > 0) { site->family = AF_INET6; site->sa6.sin6_family = site->family; site->sa6.sin6_flowinfo = 0; site->sa6.sin6_port = htons(booth_conf->port); site->saddrlen = sizeof(site->sa6); site->addrlen = sizeof(site->sa6.sin6_addr); } else { log_error("Address string \"%s\" is bad", site->addr_string); rv = EINVAL; } out: return rv; } inline static char *skip_while_in(const char *cp, int (*fn)(int), const char *allowed) { /* strchr() returns a pointer to the terminator if *cp == 0. */ while (*cp && (fn(*cp) || strchr(allowed, *cp))) cp++; /* discard "const" qualifier */ return (char*)cp; } inline static char *skip_while(char *cp, int (*fn)(int)) { while (fn(*cp)) cp++; return cp; } inline static char *skip_until(char *cp, char expected) { while (*cp && *cp != expected) cp++; return cp; } static inline int is_end_of_line(char *cp) { char c = *cp; return c == '\n' || c == 0 || c == '#'; } static int add_ticket(const char *name, struct ticket_config **tkp, const struct ticket_config *def) { int rv; struct ticket_config *tk; if (booth_conf->ticket_count == booth_conf->ticket_allocated) { rv = ticket_realloc(); if (rv < 0) return rv; } tk = booth_conf->ticket + booth_conf->ticket_count; booth_conf->ticket_count++; if (!check_max_len_valid(name, sizeof(tk->name))) { log_error("ticket name \"%s\" too long.", name); return -EINVAL; } if (find_ticket_by_name(name, NULL)) { log_error("ticket name \"%s\" used again.", name); return -EINVAL; } if (* skip_while_in(name, isalnum, "-/")) { log_error("ticket name \"%s\" invalid; only alphanumeric names.", name); return -EINVAL; } strcpy(tk->name, name); tk->timeout = def->timeout; - tk->expiry = def->expiry; + tk->term_duration = def->term_duration; tk->retries = def->retries; memcpy(tk->weight, def->weight, sizeof(tk->weight)); - tk->state = ST_INIT; if (tkp) *tkp = tk; return 0; } /* returns number of weights, or -1 on bad input. */ static int parse_weights(const char *input, int weights[MAX_NODES]) { int i, v; char *cp; for(i=0; iproto = UDP; booth_conf->port = BOOTH_DEFAULT_PORT; /* Provide safe defaults. -1 is reserved, though. */ booth_conf->uid = -2; booth_conf->gid = -2; strcpy(booth_conf->site_user, "hacluster"); strcpy(booth_conf->site_group, "haclient"); strcpy(booth_conf->arb_user, "nobody"); strcpy(booth_conf->arb_group, "nobody"); parse_weights("", defaults.weight); defaults.ext_verifier = NULL; - defaults.expiry = DEFAULT_TICKET_EXPIRY; + defaults.term_duration = DEFAULT_TICKET_EXPIRY; defaults.timeout = DEFAULT_TICKET_TIMEOUT; defaults.retries = DEFAULT_RETRIES; defaults.acquire_after = 0; error = ""; log_debug("reading config file %s", path); while (fgets(line, sizeof(line), fp)) { lineno++; s = skip_while(line, isspace); if (is_end_of_line(s)) continue; key = s; /* Key */ end_of_key = skip_while_in(key, isalnum, "-_"); if (end_of_key == key) { error = "No key"; goto err; } if (!*end_of_key) goto exp_equal; /* whitespace, and something else but nothing more? */ s = skip_while(end_of_key, isspace); if (*s != '=') { exp_equal: error = "Expected '=' after key"; goto err; } s++; /* It's my buffer, and I terminate if I want to. */ /* But not earlier than that, because we had to check for = */ *end_of_key = 0; /* Value tokenizing */ s = skip_while(s, isspace); switch (*s) { case '"': case '\'': val = s+1; s = skip_until(val, *s); /* Terminate value */ if (!*s) { error = "Unterminated quoted string"; goto err; } /* Remove and skip quote */ *s = 0; s++; if (* skip_while(s, isspace)) { error = "Surplus data after value"; goto err; } *s = 0; break; case 0: no_value: error = "No value"; goto err; break; default: val = s; /* Rest of line. */ i = strlen(s); /* i > 0 because of "case 0" above. */ while (i > 0 && isspace(s[i-1])) i--; s += i; *s = 0; } if (val == s) goto no_value; if (strlen(key) > BOOTH_NAME_LEN || strlen(val) > BOOTH_NAME_LEN) { error = "key/value too long"; goto err; } if (strcmp(key, "transport") == 0) { if (got_transport) { error = "config file has multiple transport lines"; goto err; } if (strcasecmp(val, "UDP") == 0) booth_conf->proto = UDP; else if (strcasecmp(val, "SCTP") == 0) booth_conf->proto = SCTP; else { error = "invalid transport protocol"; goto err; } got_transport = 1; continue; } if (strcmp(key, "port") == 0) { booth_conf->port = atoi(val); continue; } if (strcmp(key, "name") == 0) { safe_copy(booth_conf->name, val, BOOTH_NAME_LEN, "name"); continue; } if (strcmp(key, "site") == 0) { if (add_site(val, SITE)) goto out; continue; } if (strcmp(key, "arbitrator") == 0) { if (add_site(val, ARBITRATOR)) goto out; continue; } if (strcmp(key, "ticket") == 0) { if (add_ticket(val, &last_ticket, &defaults)) goto out; /* last_ticket is valid until another one is needed - * and then it already has the new address and * is valid again. */ continue; } if (strcmp(key, "expire") == 0) { - defaults.expiry = strtol(val, &s, 0); - if (*s || s == val || defaults.expiry<10) { + defaults.term_duration = strtol(val, &s, 0); + if (*s || s == val || defaults.term_duration<10) { error = "Expected plain integer value >=10 for expire"; goto err; } if (last_ticket) - last_ticket->expiry = defaults.expiry; + last_ticket->term_duration = defaults.term_duration; continue; } if (strcmp(key, "site-user") == 0) { safe_copy(booth_conf->site_user, optarg, BOOTH_NAME_LEN, "site-user"); continue; } if (strcmp(key, "site-group") == 0) { safe_copy(booth_conf->site_group, optarg, BOOTH_NAME_LEN, "site-group"); continue; } if (strcmp(key, "arbitrator-user") == 0) { safe_copy(booth_conf->arb_user, optarg, BOOTH_NAME_LEN, "arbitrator-user"); continue; } if (strcmp(key, "arbitrator-group") == 0) { safe_copy(booth_conf->arb_group, optarg, BOOTH_NAME_LEN, "arbitrator-group"); continue; } if (strcmp(key, "timeout") == 0) { defaults.timeout = strtol(val, &s, 0); if (*s || s == val || defaults.timeout<1) { error = "Expected plain integer value >=1 for timeout"; goto err; } if (last_ticket) last_ticket->timeout = defaults.timeout; continue; } if (strcmp(key, "retries") == 0) { defaults.retries = strtol(val, &s, 0); if (*s || s == val || defaults.retries<3 || defaults.retries > 100) { error = "Expected plain integer value in the range [3, 100] for retries"; goto err; } if (last_ticket) last_ticket->retries = defaults.retries; continue; } if (strcmp(key, "acquire-after") == 0) { defaults.acquire_after = strtol(val, &s, 0); if (*s || s == val || defaults.acquire_after<0) { error = "Expected plain integer value >=1 for acquire-after"; goto err; } if (last_ticket) last_ticket->acquire_after = defaults.acquire_after; continue; } if (strcmp(key, "before-acquire-handler") == 0) { defaults.ext_verifier = strdup(val); if (*s || s == val || defaults.timeout<1) { error = "Expected plain integer value >=1 for timeout"; goto err; } if (last_ticket) last_ticket->ext_verifier = defaults.ext_verifier; continue; } if (strcmp(key, "weights") == 0) { if (parse_weights(val, defaults.weight) < 0) goto out; if (last_ticket) memcpy(last_ticket->weight, defaults.weight, sizeof(last_ticket->weight)); continue; } error = "Unknown item"; goto out; } if ((booth_conf->site_count % 2) == 0) { log_warn("An odd number of nodes is strongly recommended!"); } /* Default: make config name match config filename. */ if (!booth_conf->name[0]) { cp = strrchr(path, '/'); if (!cp) cp = path; /* TODO: locale? */ /* NUL-termination by memset. */ for(i=0; iname[i] = *(cp++); /* Last resort. */ if (!booth_conf->name[0]) strcpy(booth_conf->name, "booth"); } return 0; err: out: log_error("%s in config file line %d", error, lineno); free(booth_conf); booth_conf = NULL; return -1; } int check_config(int type) { struct passwd *pw; struct group *gr; char *cp, *input; if (!booth_conf) return -1; input = (type == ARBITRATOR) ? booth_conf->arb_user : booth_conf->site_user; if (!*input) goto u_inval; if (isdigit(input[0])) { booth_conf->uid = strtol(input, &cp, 0); if (*cp != 0) { u_inval: log_error("User \"%s\" cannot be resolved into a UID.", input); return ENOENT; } } else { pw = getpwnam(input); if (!pw) goto u_inval; booth_conf->uid = pw->pw_uid; } input = (type == ARBITRATOR) ? booth_conf->arb_group : booth_conf->site_group; if (!*input) goto g_inval; if (isdigit(input[0])) { booth_conf->gid = strtol(input, &cp, 0); if (*cp != 0) { g_inval: log_error("Group \"%s\" cannot be resolved into a UID.", input); return ENOENT; } } else { gr = getgrnam(input); if (!gr) goto g_inval; booth_conf->gid = gr->gr_gid; } /* TODO: check whether uid or gid is 0 again? * The admin may shoot himself in the foot, though. */ return 0; } int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type) { struct booth_site *n; int i; if (!booth_conf) return 0; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if ((n->type == SITE || any_type) && strcmp(n->addr_string, site) == 0) { *node = n; return 1; } } return 0; } int find_site_by_id(uint32_t site_id, struct booth_site **node) { struct booth_site *n; int i; - if (site_id == NO_OWNER) { + if (site_id == NO_ONE) { *node = NULL; return 1; } if (!booth_conf) return 0; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if (n->site_id == site_id) { *node = n; return 1; } } return 0; } const char *type_to_string(int type) { switch (type) { case ARBITRATOR: return "arbitrator"; case SITE: return "site"; case CLIENT: return "client"; } return "??invalid-type??"; } diff --git a/src/config.h b/src/config.h index 4461ed0..2d659ff 100644 --- a/src/config.h +++ b/src/config.h @@ -1,154 +1,179 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _CONFIG_H #define _CONFIG_H #include #include "booth.h" +#include "raft.h" #include "transport.h" /** @{ */ /** Definitions for in-RAM data. */ #define MAX_NODES 16 #define TICKET_ALLOC 16 struct ticket_config { /** \name Configuration items. * @{ */ /** Name of ticket. */ boothc_ticket name; - /** How many seconds until expiration. */ - int expiry; + /** How many seconds a term lasts (if not refreshed). */ + int term_duration; /** Network related timeouts. */ int timeout; /** Retries before giving up. */ int retries; /** If >0, time to wait for a site to get fenced. * The ticket may be acquired after that timespan by * another site. */ - int acquire_after; + int acquire_after; /* TODO: needed? */ + /* Program to ask whether it makes sense to * acquire the ticket */ char *ext_verifier; /** Node weights. */ int weight[MAX_NODES]; /** @} */ /** \name Runtime values. * @{ */ /** Current state. */ - cmd_request_t state; + server_state_e state; /** When something has to be done */ struct timeval next_cron; - /** Current owner of ticket. */ - struct booth_site *owner; + /** Current leader. This is effectively the log[] in Raft. */ + struct booth_site *leader; + + /** Timestamp of leadership expiration */ + time_t term_expires; + /** End of election period */ + time_t election_end; + struct booth_site *voted_for; + - /** Timestamp of expiration. */ - time_t expires; + /** Who the various sites vote for. + * NO_OWNER = no vote yet. */ + struct booth_site *votes_for[MAX_NODES]; + /* bitmap */ + uint64_t votes_received; - /** Last ballot number that was agreed on. */ - uint32_t last_ack_ballot; + /** Last voting round that was seen. */ + uint32_t current_term; /** @} */ + /** */ + uint32_t commit_index; + + /** */ + uint32_t last_applied; + uint32_t next_index[MAX_NODES]; + uint32_t match_index[MAX_NODES]; + + + uint64_t hb_received; + time_t hb_sent_at; + /** \name Needed while proposals are being done. * @{ */ - /** Who tries to change the current status. */ - struct booth_site *proposer; + /** Whom to vote for the next time. + * Needed to push a ticket to someone else. */ - /** Current owner of ticket. */ - struct booth_site *proposed_owner; - /** New/current ballot number. - * Might be < prev_ballot if overflown. - * This only every goes "up" (logically). */ - uint32_t new_ballot; +#if 0 /** Bitmap of sites that acknowledge that state. */ uint64_t proposal_acknowledges; /** When an incompletely acknowledged proposal gets done. * If all peers agree, that happens sooner. * See switch_state_to(). */ struct timeval proposal_switch; /** Timestamp of proposal expiration. */ time_t proposal_expires; +#endif + /** Number of send retries left. * Used on the new owner. * Starts at 0, counts up. */ int retry_number; /** @} */ }; struct booth_config { char name[BOOTH_NAME_LEN]; transport_layer_t proto; uint16_t port; /** Stores the OR of the individual host bitmasks. */ uint64_t site_bits; char site_user[BOOTH_NAME_LEN]; char site_group[BOOTH_NAME_LEN]; char arb_user[BOOTH_NAME_LEN]; char arb_group[BOOTH_NAME_LEN]; uid_t uid; gid_t gid; int site_count; struct booth_site site[MAX_NODES]; int ticket_count; int ticket_allocated; struct ticket_config *ticket; }; extern struct booth_config *booth_conf; int read_config(const char *path); int check_config(int type); int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type); int find_site_by_id(uint32_t site_id, struct booth_site **node); const char *type_to_string(int type); +#include +#define R(tk_) printf("## %12s:%3d state %s, term %d, index %d, leader %s\n", __FILE__, __LINE__, state_to_string(tk_->state), tk_->current_term, tk_->commit_index, site_string(tk_->leader)) + + #endif /* _CONFIG_H */ diff --git a/src/handler.c b/src/handler.c index 8670385..719fc4c 100644 --- a/src/handler.c +++ b/src/handler.c @@ -1,68 +1,70 @@ /* * Copyright (C) 2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include "ticket.h" #include "config.h" #include "inline-fn.h" #include "log.h" #include "pacemaker.h" #include "booth.h" #include "handler.h" /** Runs an external handler. * See eg. 'before-acquire-handler'. * TODO: timeout, async operation?. */ int run_handler(struct ticket_config *tk, const char *cmd, int synchronous) { int rv; char expires[16]; + if (!cmd) + return 0; assert(synchronous); - sprintf(expires, "%" PRId64, tk->expires); + sprintf(expires, "%" PRId64, tk->term_expires); rv = setenv("BOOTH_TICKET", tk->name, 1) || setenv("BOOTH_LOCAL", local->addr_string, 1) || setenv("BOOTH_CONF_NAME", booth_conf->name, 1) || setenv("BOOTH_CONF_PATH", cl.configfile, 1) || setenv("BOOTH_TICKET_EXPIRES", expires, 1); if (rv) { log_error("Cannot set environment: %d", errno); } else { rv = system(cmd); if (rv) log_error("Error calling \"%s\": %s", cmd, interpret_rv(rv)); else log_info("Ran \"%s\" successfully.", cmd); } return rv; } diff --git a/src/inline-fn.h b/src/inline-fn.h index e443b10..aea1394 100644 --- a/src/inline-fn.h +++ b/src/inline-fn.h @@ -1,281 +1,304 @@ /* * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _INLINE_FN_H #define _INLINE_FN_H #include #include #include #include #include "config.h" +#include "ticket.h" #include "transport.h" inline static uint32_t get_local_id(void) { return local ? local->site_id : -1; } inline static uint32_t get_node_id(struct booth_site *node) { - return node ? node->site_id : NO_OWNER; + return node ? node->site_id : NO_ONE; } -inline static int ticket_valid_for(const struct ticket_config *tk) +inline static int term_valid_for(const struct ticket_config *tk) { int left; - left = tk->expires - time(NULL); + left = tk->term_expires - time(NULL); return (left < 0) ? 0 : left; } /** Returns number of seconds left, if any. */ -inline static int owner_and_valid(const struct ticket_config *tk) +inline static int leader_and_valid(const struct ticket_config *tk) { - if (tk->owner != local) + if (tk->leader != local) return 0; - return ticket_valid_for(tk); + return term_valid_for(tk); } + static inline void init_header_bare(struct boothc_header *h) { h->magic = htonl(BOOTHC_MAGIC); h->version = htonl(BOOTHC_VERSION); h->from = htonl(local->site_id); h->iv = htonl(0); h->auth1 = htonl(0); h->auth2 = htonl(0); } static inline void init_header(struct boothc_header *h, int cmd, int result, int data_len) { init_header_bare(h); h->length = htonl(data_len); h->cmd = htonl(cmd); h->result = htonl(result); } static inline void init_ticket_site_header(struct boothc_ticket_msg *msg, int cmd) { init_header(&msg->header, cmd, 0, sizeof(*msg)); } static inline void init_ticket_msg(struct boothc_ticket_msg *msg, int cmd, int rv, struct ticket_config *tk) { assert(sizeof(msg->ticket.id) == sizeof(tk->name)); init_header(&msg->header, cmd, rv, sizeof(*msg)); if (!tk) { memset(&msg->ticket, 0, sizeof(msg->ticket)); } else { memcpy(msg->ticket.id, tk->name, sizeof(msg->ticket.id)); - msg->ticket.expiry = htonl(ticket_valid_for(tk)); - msg->ticket.owner = htonl(get_node_id(tk->owner)); - msg->ticket.ballot = htonl(tk->new_ballot); - msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); + msg->ticket.leader = htonl(get_node_id(tk->leader ?: tk->voted_for)); + msg->ticket.term = htonl(tk->current_term); + msg->ticket.term_valid_for = htonl(term_valid_for(tk)); + + msg->ticket.leader_commit = htonl(tk->commit_index); } } static inline struct booth_transport const *transport(void) { return booth_transport + booth_conf->proto; } -static inline const char *ticket_owner_string(struct booth_site *site) +static inline const char *site_string(struct booth_site *site) { return site ? site->addr_string : "NONE"; } -static inline void disown_ticket(struct ticket_config *tk) +static inline const char *ticket_leader_string(struct ticket_config *tk) { - /* ONLY the "current state" is changed; - * current paxos rounds should not be affected. - * tk->proposed_owner = NULL; - */ - tk->owner = NULL; - time(&tk->expires); -} - -static inline void disown_if_expired(struct ticket_config *tk) -{ - if (time(NULL) >= tk->expires || - (!tk->proposed_owner && !tk->owner)) - disown_ticket(tk); + return site_string(tk->leader); } -static inline int all_agree(struct ticket_config *tk) +static inline void disown_ticket(struct ticket_config *tk) { - return tk->proposal_acknowledges == booth_conf->site_bits; + tk->leader = NULL; + time(&tk->term_expires); } -static inline int majority_agree(struct ticket_config *tk) +static inline int disown_if_expired(struct ticket_config *tk) { - /* Use ">" to get majority decision, even for an even number - * of participants. */ - return __builtin_popcount(tk->proposal_acknowledges) * 2 > - booth_conf->site_count; -} + if (time(NULL) >= tk->term_expires || + !tk->leader) { + disown_ticket(tk); + return 1; + } + return 0; +} /* We allow half of the uint32_t to be used; * half of that below, half of that above the current known "good" value. * 0 UINT32_MAX * |--------------------------+----------------+------------| * | | | * |--------+-------| allowed range * | - * current ballot + * current commit index * * So, on overflow it looks like that: * UINT32_MAX 0 * |--------------------------+-----------||---+------------| * | | | * |--------+-------| allowed range * | - * current ballot + * current commit index * * This should be possible by using the same datatype and relying * on the under/overflow semantics. * * * Having 30 bits available, and assuming an expire time of - * one minute and a (high) ballot step of 64 == 2^6 (because + * one minute and a (high) commit index step of 64 == 2^6 (because * of weights), we get 2^24 minutes of range - which is ~750 * years. "Should be enough for everybody." */ -static inline int ballot_is_higher_than(uint32_t b_high, uint32_t b_low) +static inline int index_is_higher_than(uint32_t c_high, uint32_t c_low) { uint32_t diff; - if (b_high == b_low) + if (c_high == c_low) return 0; - diff = b_high - b_low; + diff = c_high - c_low; if (diff < UINT32_MAX/4) return 1; - diff = b_low - b_high; + diff = c_low - c_high; if (diff < UINT32_MAX/4) return 0; - assert(!"ballot out of range - invalid"); + assert(!"commit index out of range - invalid"); } -static inline uint32_t ballot_max2(uint32_t a, uint32_t b) +static inline uint32_t index_max2(uint32_t a, uint32_t b) { - return ballot_is_higher_than(a, b) ? a : b; + return index_is_higher_than(a, b) ? a : b; } -static inline uint32_t ballot_max3(uint32_t a, uint32_t b, uint32_t c) +static inline uint32_t index_max3(uint32_t a, uint32_t b, uint32_t c) { - return ballot_max2( ballot_max2(a, b), c); + return index_max2( index_max2(a, b), c); } static inline double timeval_to_float(struct timeval tv) { return tv.tv_sec + tv.tv_usec*(double)1.0e-6; } static inline int timeval_msec(struct timeval tv) { int m; m = tv.tv_usec / 1000; if (m >= 1000) m = 999; return m; } static inline int timeval_compare(struct timeval tv1, struct timeval tv2) { if (tv1.tv_sec < tv2.tv_sec) return -1; if (tv1.tv_sec > tv2.tv_sec) return +1; if (tv1.tv_usec < tv2.tv_usec) return -1; if (tv1.tv_usec > tv2.tv_usec) return +1; return 0; } static inline int timeval_in_past(struct timeval which) { struct timeval tv; gettimeofday(&tv, NULL); return timeval_compare(tv, which) > 0; } -static inline time_t next_renewal_starts_at(struct ticket_config *tk) +static inline time_t next_vote_starts_at(struct ticket_config *tk) { time_t half_exp, retries_needed; /* If not owner, don't renew. */ - if (tk->owner != local) + if (tk->leader != local) return 0; /* Try to renew at half of expiry time. */ - half_exp = tk->expires - tk->expiry/2; + half_exp = tk->term_expires - tk->term_duration/2; /* Also start renewal if we couldn't get * a few message retransmission in the alloted * expiry time. */ - retries_needed = tk->expires - tk->timeout * tk->retries/2; + retries_needed = tk->term_expires - tk->timeout * tk->retries/2; /* Return earlier timestamp. */ return half_exp < retries_needed ? half_exp : retries_needed; } static inline int should_start_renewal(struct ticket_config *tk) { time_t now, when; - when = next_renewal_starts_at(tk); + when = next_vote_starts_at(tk); if (!when) return 0; time(&now); return when <= now; } +static inline int send_heartbeat(struct ticket_config *tk) +{ + tk->hb_received = local->bitmask; + tk->hb_sent_at = time(NULL); + + return ticket_broadcast(tk, OP_HEARTBEAT, RLT_SUCCESS); +} + +static inline struct booth_site *my_vote(struct ticket_config *tk) +{ + return tk->votes_for[ local->index ]; +} + + +static inline int count_bits(uint64_t val) { + return __builtin_popcount(val); +} + +static inline int majority_of_bits(struct ticket_config *tk, uint64_t val) +{ + /* Use ">" to get majority decision, even for an even number + * of participants. */ + return count_bits(val) * 2 > + booth_conf->site_count; +} + + + + #endif diff --git a/src/main.c b/src/main.c index 3b4a20f..d67dfa3 100644 --- a/src/main.c +++ b/src/main.c @@ -1,1311 +1,1317 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "log.h" #include "booth.h" #include "config.h" #include "transport.h" #include "inline-fn.h" #include "pacemaker.h" #include "ticket.h" #define RELEASE_VERSION "1.0" #define CLIENT_NALLOC 32 int daemonize = 0; /** Structure for "clients". * Filehandles with incoming data get registered here (and in pollfds), * along with their callbacks. * Because these can be reallocated with every new fd, addressing * happens _only_ by their numeric index. */ struct client *clients = NULL; struct pollfd *pollfds = NULL; static int client_maxi; static int client_size = 0; +static const struct booth_site _no_leader = { + .addr_string = "none", + .site_id = NO_ONE, +}; +struct booth_site *no_leader = (struct booth_site*)& _no_leader; + typedef enum { BOOTHD_STARTED=0, BOOTHD_STARTING } BOOTH_DAEMON_STATE; int poll_timeout = POLL_TIMEOUT; typedef enum { OP_LIST = 1, OP_GRANT, OP_REVOKE, } operation_t; struct booth_config *booth_conf; struct command_line cl; int do_read(int fd, void *buf, size_t count) { int rv, off = 0; while (off < count) { rv = read(fd, (char *)buf + off, count - off); if (rv == 0) return -1; if (rv == -1 && errno == EINTR) continue; if (rv == -1) return -1; off += rv; } return 0; } int do_write(int fd, void *buf, size_t count) { int rv, off = 0; retry: rv = write(fd, (char *)buf + off, count); if (rv == -1 && errno == EINTR) goto retry; /* If we cannot write _any_ data, we'd be in an (potential) loop. */ if (rv <= 0) { log_error("write failed: %s (%d)", strerror(errno), errno); return rv; } if (rv != count) { count -= rv; off += rv; goto retry; } return 0; } static void client_alloc(void) { int i; if (!clients) { clients = malloc(CLIENT_NALLOC * sizeof(struct client)); pollfds = malloc(CLIENT_NALLOC * sizeof(struct pollfd)); } else { clients = realloc(clients, (client_size + CLIENT_NALLOC) * sizeof(struct client)); pollfds = realloc(pollfds, (client_size + CLIENT_NALLOC) * sizeof(struct pollfd)); } if (!clients || !pollfds) { log_error("can't alloc for client array"); exit(1); } for (i = client_size; i < client_size + CLIENT_NALLOC; i++) { clients[i].workfn = NULL; clients[i].deadfn = NULL; clients[i].fd = -1; pollfds[i].fd = -1; pollfds[i].revents = 0; } client_size += CLIENT_NALLOC; } static void client_dead(int ci) { if (clients[ci].fd != -1) close(clients[ci].fd); clients[ci].fd = -1; clients[ci].workfn = NULL; pollfds[ci].fd = -1; } int client_add(int fd, const struct booth_transport *tpt, void (*workfn)(int ci), void (*deadfn)(int ci)) { int i; struct client *c; if (client_size + 2 >= client_maxi ) { client_alloc(); } for (i = 0; i < client_size; i++) { c = clients + i; if (c->fd != -1) continue; c->workfn = workfn; if (deadfn) c->deadfn = deadfn; else c->deadfn = client_dead; c->transport = tpt; c->fd = fd; pollfds[i].fd = fd; pollfds[i].events = POLLIN; if (i > client_maxi) client_maxi = i; return i; } assert(!"no client"); } /* Only used for client requests, TCP ???*/ void process_connection(int ci) { struct boothc_ticket_msg msg; int rv, len, exp, fd; void (*deadfn) (int ci); fd = clients[ci].fd; rv = do_read(fd, &msg.header, sizeof(msg.header)); if (rv < 0) { if (errno == ECONNRESET) log_debug("client %d connection reset for fd %d", ci, clients[ci].fd); goto kill; } if (check_boothc_header(&msg.header, -1) < 0) goto kill; /* Basic sanity checks already done. */ len = ntohl(msg.header.length); if (len) { if (len != sizeof(msg)) { bad_len: log_error("got wrong length %u", len); return; } exp = len - sizeof(msg.header); rv = do_read(clients[ci].fd, msg.header.data, exp); if (rv < 0) { log_error("connection %d read data error %d, wanted %d", ci, rv, exp); goto kill; } } /* For CMD_GRANT and CMD_REVOKE: * Don't close connection immediately, but send * result a second later? */ switch (ntohl(msg.header.cmd)) { case CMD_LIST: ticket_answer_list(fd, &msg); goto kill; case CMD_GRANT: /* Expect boothc_ticket_site_msg. */ if (len != sizeof(msg)) goto bad_len; ticket_answer_grant(fd, &msg); goto kill; case CMD_REVOKE: /* Expect boothc_ticket_site_msg. */ if (len != sizeof(msg)) goto bad_len; ticket_answer_revoke(fd, &msg); goto kill; default: log_error("connection %d cmd %x unknown", ci, ntohl(msg.header.cmd)); init_header(&msg.header,CMR_GENERAL, RLT_INVALID_ARG, sizeof(msg.header)); send_header_only(fd, &msg.header); goto kill; } assert(0); return; kill: deadfn = clients[ci].deadfn; if(deadfn) { deadfn(ci); } return; } /** Callback function for the listening TCP socket. */ static void process_listener(int ci) { int fd, i; fd = accept(clients[ci].fd, NULL, NULL); if (fd < 0) { log_error("process_listener: accept error for fd %d: %s (%d)", clients[ci].fd, strerror(errno), errno); if (clients[ci].deadfn) clients[ci].deadfn(ci); return; } i = client_add(fd, clients[ci].transport, process_connection, NULL); log_debug("add client connection %d fd %d", i, fd); } static int setup_config(int type) { int rv; rv = read_config(cl.configfile); if (rv < 0) goto out; /* Set "local" pointer, ignoring errors. */ if (cl.type == DAEMON && cl.site[0]) { if (!find_site_by_name(cl.site, &local, 1)) { log_error("Cannot find \"%s\" in the configuration.", cl.site); return -EINVAL; } local->local = 1; } else find_myself(NULL, type == CLIENT); rv = check_config(type); if (rv < 0) goto out; /* Per default the PID file name is derived from the * configuration name. */ if (!cl.lockfile[0]) { snprintf(cl.lockfile, sizeof(cl.lockfile)-1, "%s/%s.pid", BOOTH_RUN_DIR, booth_conf->name); } out: return rv; } static int setup_transport(void) { int rv; rv = transport()->init(message_recv); if (rv < 0) { log_error("failed to init booth_transport %s", transport()->name); goto out; } rv = booth_transport[TCP].init(NULL); if (rv < 0) { log_error("failed to init booth_transport[TCP]"); goto out; } out: return rv; } static int write_daemon_state(int fd, int state) { char buffer[1024]; int rv, size; size = sizeof(buffer) - 1; rv = snprintf(buffer, size, "booth_pid=%d " "booth_state=%s " "booth_type=%s " "booth_cfg_name='%s' " "booth_addr_string='%s' " "booth_port=%d\n", getpid(), ( state == BOOTHD_STARTED ? "started" : state == BOOTHD_STARTING ? "starting" : "invalid"), type_to_string(local->type), booth_conf->name, local->addr_string, booth_conf->port); if (rv < 0 || rv == size) { log_error("Buffer filled up in write_daemon_state()."); return -1; } size = rv; rv = ftruncate(fd, 0); if (rv < 0) { log_error("lockfile %s truncate error %d: %s", cl.lockfile, errno, strerror(errno)); return rv; } rv = lseek(fd, 0, SEEK_SET); if (rv < 0) { log_error("lseek set fd(%d) offset to 0 error, return(%d), message(%s)", fd, rv, strerror(errno)); rv = -1; return rv; } rv = write(fd, buffer, size); if (rv != size) { log_error("write to fd(%d, %d) returned %d, errno %d, message(%s)", fd, size, rv, errno, strerror(errno)); return -1; } return 0; } static int loop(int fd) { void (*workfn) (int ci); void (*deadfn) (int ci); int rv, i; rv = setup_transport(); if (rv < 0) goto fail; rv = setup_ticket(); if (rv < 0) goto fail; client_add(local->tcp_fd, booth_transport + TCP, process_listener, NULL); rv = write_daemon_state(fd, BOOTHD_STARTED); if (rv != 0) { log_error("write daemon state %d to lockfile error %s: %s", BOOTHD_STARTED, cl.lockfile, strerror(errno)); goto fail; } if (cl.type == ARBITRATOR) log_info("BOOTH arbitrator daemon started"); else if (cl.type == SITE) log_info("BOOTH cluster site daemon started"); while (1) { rv = poll(pollfds, client_maxi + 1, poll_timeout); if (rv == -1 && errno == EINTR) continue; if (rv < 0) { log_error("poll failed: %s (%d)", strerror(errno), errno); goto fail; } for (i = 0; i <= client_maxi; i++) { if (clients[i].fd < 0) continue; if (pollfds[i].revents & POLLIN) { workfn = clients[i].workfn; if (workfn) workfn(i); } if (pollfds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) { deadfn = clients[i].deadfn; if (deadfn) deadfn(i); } } process_tickets(); } return 0; fail: return -1; } static int query_get_string_answer(cmd_request_t cmd) { struct booth_site *site; struct boothc_header reply; char *data; int data_len; int rv; struct booth_transport const *tpt; data = NULL; init_header(&cl.msg.header, cmd, 0, sizeof(cl.msg)); if (!*cl.site) site = local; else if (!find_site_by_name(cl.site, &site, 1)) { log_error("cannot find site \"%s\"", cl.site); rv = ENOENT; goto out; } tpt = booth_transport + TCP; rv = tpt->open(site); if (rv < 0) goto out_free; rv = tpt->send(site, &cl.msg, sizeof(cl.msg)); if (rv < 0) goto out_free; rv = tpt->recv(site, &reply, sizeof(reply)); if (rv < 0) goto out_free; data_len = ntohl(reply.length) - sizeof(reply); data = malloc(data_len); if (!data) { rv = -ENOMEM; goto out_free; } rv = tpt->recv(site, data, data_len); if (rv < 0) goto out_free; do_write(STDOUT_FILENO, data, data_len); rv = 0; out_free: free(data); tpt->close(site); out: return rv; } static int do_command(cmd_request_t cmd) { struct booth_site *site; struct boothc_header reply; struct booth_transport const *tpt; int rv; rv = 0; site = NULL; if (!*cl.site) site = local; else { if (!find_site_by_name(cl.site, &site, 1)) { log_error("Site \"%s\" not configured.", cl.site); goto out_close; } if (site->type == ARBITRATOR) { log_error("Site \"%s\" is an arbitrator, cannot grant ticket there.", cl.site); goto out_close; } assert(site->type == SITE); } /* We don't check for existence of ticket, so that asking can be * done without local configuration, too. * Although, that means that the UDP port has to be specified, too. */ if (!cl.msg.ticket.id[0]) { /* If the loaded configuration has only a single ticket defined, use that. */ if (booth_conf->ticket_count == 1) { strcpy(cl.msg.ticket.id, booth_conf->ticket[0].name); } else { log_error("No ticket given."); goto out_close; } } init_header(&cl.msg.header, cmd, 0, sizeof(cl.msg)); /* Always use TCP for client - at least for now. */ tpt = booth_transport + TCP; rv = tpt->open(site); if (rv < 0) goto out_close; rv = tpt->send(site, &cl.msg, sizeof(cl.msg)); if (rv < 0) goto out_close; rv = tpt->recv(site, &reply, sizeof(reply)); if (rv < 0) goto out_close; if (reply.result == htonl(RLT_INVALID_ARG)) { log_info("invalid argument!"); rv = -1; goto out_close; } if (reply.result == htonl(RLT_OVERGRANT)) { log_info("You're granting a granted ticket. " "If you wanted to migrate a ticket, " "use revoke first, then use grant."); rv = -1; goto out_close; } rv = ntohl(reply.result); switch (rv) { case RLT_ASYNC: if (cmd == CMD_GRANT) log_info("grant command sent, result will be returned " "asynchronously, you can get the result from " "the log files"); else if (cmd == CMD_REVOKE) log_info("revoke command sent, result will be returned " "asynchronously, you can get the result from " "the log files."); else log_error("internal error reading reply result!"); rv = 0; break; case RLT_SYNC_SUCC: case RLT_SUCCESS: if (cmd == CMD_GRANT) log_info("grant succeeded!"); else if (cmd == CMD_REVOKE) log_info("revoke succeeded!"); rv = 0; break; case RLT_SYNC_FAIL: if (cmd == CMD_GRANT) log_info("grant failed!"); else if (cmd == CMD_REVOKE) log_info("revoke failed!"); rv = -1; break; case RLT_INVALID_ARG: log_error("\"Invalid argument\", most probably ticket name \"%s\" wrong.", cl.msg.ticket.id); break; default: log_error("got an error code: %x", rv); rv = -1; } out_close: if (site) local_transport->close(site); return rv; } static int do_grant(void) { return do_command(CMD_GRANT); } static int do_revoke(void) { return do_command(CMD_REVOKE); } static int _lockfile(int mode, int *fdp, pid_t *locked_by) { struct flock lock; int fd, rv; /* After reboot the directory may not yet exist. * Try to create it, but ignore errors. */ if (strncmp(cl.lockfile, BOOTH_RUN_DIR, strlen(BOOTH_RUN_DIR)) == 0) mkdir(BOOTH_RUN_DIR, 0775); if (locked_by) *locked_by = 0; *fdp = -1; fd = open(cl.lockfile, mode, 0664); if (fd < 0) return errno; *fdp = fd; lock.l_type = F_WRLCK; lock.l_start = 0; lock.l_whence = SEEK_SET; lock.l_len = 0; lock.l_pid = 0; if (fcntl(fd, F_SETLK, &lock) == 0) return 0; rv = errno; if (locked_by) if (fcntl(fd, F_GETLK, &lock) == 0) *locked_by = lock.l_pid; return rv; } static inline int is_root(void) { /* TODO: getuid()? Better way to check? */ return geteuid() == 0; } static int create_lockfile(void) { int rv, fd; fd = -1; rv = _lockfile(O_CREAT | O_WRONLY, &fd, NULL); if (fd == -1) { log_error("lockfile %s open error %d: %s", cl.lockfile, rv, strerror(rv)); return -1; } if (rv < 0) { log_error("lockfile %s setlk error %d: %s", cl.lockfile, rv, strerror(rv)); goto fail; } rv = write_daemon_state(fd, BOOTHD_STARTING); if (rv != 0) { log_error("write daemon state %d to lockfile error %s: %s", BOOTHD_STARTING, cl.lockfile, strerror(errno)); goto fail; } if (is_root()) { if (fchown(fd, booth_conf->uid, booth_conf->gid) < 0) log_error("fchown() on lockfile said %d: %s", errno, strerror(errno)); } return fd; fail: close(fd); return -1; } static void unlink_lockfile(int fd) { unlink(cl.lockfile); close(fd); } static void print_usage(void) { printf("Usages:\n"); printf(" booth daemon [-c config] [-D]\n"); printf(" booth [client] {list|grant|revoke} [options]\n"); printf(" booth status [-c config] [-D]\n"); printf("\n"); printf("Client operations:\n"); printf(" list: List all the tickets\n"); printf(" grant: Grant ticket to site\n"); printf(" revoke: Revoke ticket from site\n"); printf("\n"); printf("Options:\n"); printf(" -c FILE Specify config file [default " BOOTH_DEFAULT_CONF "]\n"); printf(" Can be a path or a name without \".conf\" suffix\n"); printf(" -D Enable debugging to stderr and don't fork\n"); printf(" -S Systemd mode (no forking)\n"); printf(" -t ticket name\n"); printf(" -s site name\n"); printf(" -l LOCKFILE Specify lock file path (daemon only)\n"); printf(" -h Print this help, then exit\n"); printf("\n"); printf("Please see the man page for details.\n"); } #define OPTION_STRING "c:Dl:t:s:hS" void safe_copy(char *dest, char *value, size_t buflen, const char *description) { int content_len = buflen - 1; if (strlen(value) >= content_len) { fprintf(stderr, "'%s' exceeds maximum %s length of %d\n", value, description, content_len); exit(EXIT_FAILURE); } strncpy(dest, value, content_len); dest[content_len] = 0; } static int host_convert(char *hostname, char *ip_str, size_t ip_size) { struct addrinfo *result = NULL, hints = {0}; int re = -1; memset(&hints, 0, sizeof(hints)); hints.ai_family = BOOTH_PROTO_FAMILY; hints.ai_socktype = SOCK_DGRAM; re = getaddrinfo(hostname, NULL, &hints, &result); if (re == 0) { struct in_addr addr = ((struct sockaddr_in *)result->ai_addr)->sin_addr; const char *re_ntop = inet_ntop(BOOTH_PROTO_FAMILY, &addr, ip_str, ip_size); if (re_ntop == NULL) { re = -1; } } freeaddrinfo(result); return re; } static int read_arguments(int argc, char **argv) { int optchar; char *arg1 = argv[1]; char *op = NULL; char *cp; char site_arg[INET_ADDRSTRLEN] = {0}; int left; if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") || !strcmp(arg1, "-h")) { print_usage(); exit(EXIT_SUCCESS); } if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") || !strcmp(arg1, "-V")) { printf("%s %s (built %s %s)\n", argv[0], RELEASE_VERSION, __DATE__, __TIME__); exit(EXIT_SUCCESS); } if (strcmp(arg1, "arbitrator") == 0 || strcmp(arg1, "site") == 0 || strcmp(arg1, "start") == 0 || strcmp(arg1, "daemon") == 0) { cl.type = DAEMON; optind = 2; } else if (strcmp(arg1, "status") == 0) { cl.type = STATUS; optind = 2; } else if (strcmp(arg1, "client") == 0) { cl.type = CLIENT; if (argc < 3) { print_usage(); exit(EXIT_FAILURE); } op = argv[2]; optind = 3; } else { cl.type = CLIENT; op = argv[1]; optind = 2; } if (cl.type == CLIENT) { if (!strcmp(op, "list")) cl.op = OP_LIST; else if (!strcmp(op, "grant")) cl.op = OP_GRANT; else if (!strcmp(op, "revoke")) cl.op = OP_REVOKE; else { fprintf(stderr, "client operation \"%s\" is unknown\n", op); exit(EXIT_FAILURE); } } while (optind < argc) { optchar = getopt(argc, argv, OPTION_STRING); switch (optchar) { case 'c': if (strchr(optarg, '/')) { safe_copy(cl.configfile, optarg, sizeof(cl.configfile), "config file"); } else { /* If no "/" in there, use with default directory. */ strcpy(cl.configfile, BOOTH_DEFAULT_CONF_DIR); cp = cl.configfile + strlen(BOOTH_DEFAULT_CONF_DIR); assert(cp > cl.configfile); assert(*(cp-1) == '/'); /* Write at the \0, ie. after the "/" */ safe_copy(cp, optarg, (sizeof(cl.configfile) - (cp - cl.configfile) - strlen(BOOTH_DEFAULT_CONF_EXT)), "config name"); /* If no extension, append ".conf". * Space is available, see -strlen() above. */ if (!strchr(cp, '.')) strcat(cp, BOOTH_DEFAULT_CONF_EXT); } break; case 'D': debug_level++; /* Fall through */ case 'S': daemonize = 1; break; case 'l': safe_copy(cl.lockfile, optarg, sizeof(cl.lockfile), "lock file"); break; case 't': if (cl.op == OP_GRANT || cl.op == OP_REVOKE) { safe_copy(cl.msg.ticket.id, optarg, sizeof(cl.msg.ticket.id), "ticket name"); } else { print_usage(); exit(EXIT_FAILURE); } break; case 's': /* For testing and debugging: allow "-s site" also for * daemon start, so that the address that should be used * can be set manually. * This makes it easier to start multiple processes * on one machine. */ if (cl.type == CLIENT || (cl.type == DAEMON && debug_level)) { int re = host_convert(optarg, site_arg, INET_ADDRSTRLEN); if (re == 0) { safe_copy(cl.site, site_arg, sizeof(cl.site), "site name"); } else { safe_copy(cl.site, optarg, sizeof(cl.site), "site name"); } } else { log_error("\"-s\" not allowed in daemon mode."); exit(EXIT_FAILURE); } break; case 'h': print_usage(); exit(EXIT_SUCCESS); break; case ':': case '?': fprintf(stderr, "Please use '-h' for usage.\n"); exit(EXIT_FAILURE); break; case -1: /* No more parameters on cmdline, only arguments. */ goto extra_args; default: goto unknown; }; } return 0; extra_args: if (cl.type == CLIENT && !cl.msg.ticket.id[0]) { /* Use additional argument as ticket name. */ safe_copy(cl.msg.ticket.id, argv[optind], sizeof(cl.msg.ticket.id), "ticket name"); optind++; } if (optind == argc) return 0; left = argc - optind; fprintf(stderr, "Superfluous argument%s: %s%s\n", left == 1 ? "" : "s", argv[optind], left == 1 ? "" : "..."); exit(EXIT_FAILURE); unknown: fprintf(stderr, "unknown option: %s\n", argv[optind]); exit(EXIT_FAILURE); } static void set_scheduler(void) { struct sched_param sched_param; struct rlimit rlimit; int rv; rlimit.rlim_cur = RLIM_INFINITY; rlimit.rlim_max = RLIM_INFINITY; setrlimit(RLIMIT_MEMLOCK, &rlimit); rv = mlockall(MCL_CURRENT | MCL_FUTURE); if (rv < 0) { log_error("mlockall failed"); } rv = sched_get_priority_max(SCHED_RR); if (rv != -1) { sched_param.sched_priority = rv; rv = sched_setscheduler(0, SCHED_RR, &sched_param); if (rv == -1) log_error("could not set SCHED_RR priority %d: %s (%d)", sched_param.sched_priority, strerror(errno), errno); } else { log_error("could not get maximum scheduler priority err %d", errno); } } static void set_oom_adj(int val) { FILE *fp; fp = fopen("/proc/self/oom_adj", "w"); if (!fp) return; fprintf(fp, "%i", val); fclose(fp); } static int do_status(int type) { pid_t pid; int rv, lock_fd, ret; const char *reason = NULL; char lockfile_data[1024], *cp; ret = PCMK_OCF_NOT_RUNNING; /* TODO: query all, and return quit only if it's _cleanly_ not * running, ie. _neither_ of port/lockfile/process is available? * * Currently a single failure says "not running", even if "only" the * lockfile has been removed. */ rv = setup_config(type); if (rv) { reason = "Error reading configuration."; ret = PCMK_OCF_UNKNOWN_ERROR; goto quit; } if (!local) { reason = "No Service IP active here."; goto quit; } rv = _lockfile(O_RDWR, &lock_fd, &pid); if (rv == 0) { reason = "PID file not locked."; goto quit; } if (lock_fd == -1) { reason = "No PID file."; goto quit; } if (pid) { fprintf(stdout, "booth_lockpid=%d ", pid); fflush(stdout); } rv = read(lock_fd, lockfile_data, sizeof(lockfile_data) - 1); if (rv < 4) { reason = "Cannot read lockfile data."; ret = PCMK_LSB_UNKNOWN_ERROR; goto quit; } lockfile_data[rv] = 0; if (lock_fd != -1) close(lock_fd); /* Make sure it's only a single line */ cp = strchr(lockfile_data, '\r'); if (cp) *cp = 0; cp = strchr(lockfile_data, '\n'); if (cp) *cp = 0; rv = setup_udp_server(1); if (rv == 0) { reason = "UDP port not in use."; goto quit; } fprintf(stdout, "booth_lockfile='%s' %s\n", cl.lockfile, lockfile_data); if (daemonize) fprintf(stderr, "Booth at %s port %d seems to be running.\n", local->addr_string, booth_conf->port); return 0; quit: log_debug("not running: %s", reason); /* Ie. "DEBUG" */ if (daemonize) fprintf(stderr, "not running: %s\n", reason); return ret; } static int limit_this_process(void) { int rv; if (!is_root()) return 0; if (setregid(booth_conf->gid, booth_conf->gid) < 0) { rv = errno; log_error("setregid() didn't work: %s", strerror(rv)); return rv; } if (setreuid(booth_conf->uid, booth_conf->uid) < 0) { rv = errno; log_error("setreuid() didn't work: %s", strerror(rv)); return rv; } /* TODO: ulimits? But that would restrict crm_ticket and handler * scripts, too! */ return 0; } static int do_server(int type) { int lock_fd = -1; int rv = -1; static char log_ent[128] = DAEMON_NAME "-"; rv = setup_config(type); if (rv < 0) goto out; if (!local) { log_error("Cannot find myself in the configuration."); exit(EXIT_FAILURE); } if (!daemonize) { if (daemon(0, 0) < 0) { perror("daemon error"); exit(EXIT_FAILURE); } } /* The lockfile must be written to _after_ the call to daemon(), so * that the lockfile contains the pid of the daemon, not the parent. */ lock_fd = create_lockfile(); if (lock_fd < 0) return lock_fd; strcat(log_ent, type_to_string(local->type)); cl_log_set_entity(log_ent); cl_log_enable_stderr(debug_level ? TRUE : FALSE); cl_log_set_facility(HA_LOG_FACILITY); cl_inherit_logging_environment(0); log_info("BOOTH %s daemon is starting, node id is 0x%08X (%d).", type_to_string(local->type), local->site_id, local->site_id); signal(SIGUSR1, (__sighandler_t)tickets_log_info); set_scheduler(); set_oom_adj(-16); set_proc_title("%s %s for [%s]:%d", DAEMON_NAME, type_to_string(local->type), local->addr_string, booth_conf->port); rv = limit_this_process(); if (rv) return rv; rv = loop(lock_fd); out: if (lock_fd >= 0) { /* We might not be able to delete it, but at least * make it empty. */ rv = ftruncate(lock_fd, 0); (void)rv; unlink_lockfile(lock_fd); } return rv; } static int do_client(void) { int rv = -1; rv = setup_config(CLIENT); if (rv < 0) { log_error("cannot read config"); goto out; } switch (cl.op) { case OP_LIST: rv = query_get_string_answer(CMD_LIST); break; case OP_GRANT: rv = do_grant(); break; case OP_REVOKE: rv = do_revoke(); break; } out: return rv; } int main(int argc, char *argv[], char *envp[]) { int rv; init_set_proc_title(argc, argv, envp); memset(&cl, 0, sizeof(cl)); strncpy(cl.configfile, BOOTH_DEFAULT_CONF, BOOTH_PATH_LEN - 1); cl.lockfile[0] = 0; debug_level = 0; cl_log_set_entity("booth"); cl_log_enable_stderr(TRUE); cl_log_set_facility(0); rv = read_arguments(argc, argv); if (rv < 0) goto out; switch (cl.type) { case STATUS: rv = do_status(cl.type); break; case ARBITRATOR: case DAEMON: case SITE: rv = do_server(cl.type); break; case CLIENT: rv = do_client(); break; } out: /* Normalize values. 0x100 would be seen as "OK" by waitpid(). */ return (rv >= 0 && rv < 0x70) ? rv : 1; } diff --git a/src/pacemaker.c b/src/pacemaker.c index c371744..27d42d2 100644 --- a/src/pacemaker.c +++ b/src/pacemaker.c @@ -1,326 +1,326 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include "log.h" #include "pacemaker.h" #include "inline-fn.h" enum atomic_ticket_supported { YES=0, NO, FILENOTFOUND, /* Ie. UNKNOWN */ UNKNOWN = FILENOTFOUND, }; /* http://thedailywtf.com/Articles/What_Is_Truth_0x3f_.aspx */ enum atomic_ticket_supported atomicity = UNKNOWN; #define COMMAND_MAX 1024 /** Determines whether the installed crm_ticket can do atomic ticket grants, * _including_ multiple attribute changes. * * See * https://bugzilla.novell.com/show_bug.cgi?id=855099 * * Run "crm_ticket" without "--force"; * - the old version asks for "Y/N" via STDIN, and returns 0 * when reading "no"; * - the new version just reports an error without asking. */ static void test_atomicity(void) { int rv; if (atomicity != UNKNOWN) return; rv = system("echo n | crm_ticket -g -t any-ticket-name > /dev/null 2> /dev/null"); if (rv == -1) { log_error("Cannot run \"crm_ticket\"!"); /* BIG problem. Abort. */ exit(1); } if (WIFSIGNALED(rv)) { log_error("\"crm_ticket\" terminated by a signal!"); /* Problem. Abort. */ exit(1); } switch (WEXITSTATUS(rv)) { case 0: atomicity = NO; log_info("Old \"crm_ticket\" found, using non-atomic ticket updates."); break; case 1: atomicity = YES; log_info("New \"crm_ticket\" found, using atomic ticket updates."); break; default: log_error("Unexpected return value from \"crm_ticket\" (%d), " "falling back to non-atomic ticket updates.", rv); atomicity = NO; } assert(atomicity == YES || atomicity == NO); } const char * interpret_rv(int rv) { static char text[64]; int p; if (rv == 0) return "0"; p = sprintf(text, "rv %d", WEXITSTATUS(rv)); if (WIFSIGNALED(rv)) sprintf(text + p, " signal %d", WTERMSIG(rv)); return text; } static int pcmk_write_ticket_atomic(struct ticket_config *tk, int grant) { char cmd[COMMAND_MAX]; int rv; - /* The values are appended to "-v", so that NO_OWNER + /* The values are appended to "-v", so that NO_ONE * (which is -1) isn't seen as another option. */ snprintf(cmd, COMMAND_MAX, "crm_ticket -t '%s' " "%s --force " "-S owner -v%" PRIi32 " " "-S expires -v%" PRIi64 " " - "-S ballot -v%" PRIi64, + "-S term -v%" PRIi64, tk->name, (grant > 0 ? "-g" : grant < 0 ? "-r" : ""), - (int32_t)get_node_id(tk->owner), - (int64_t)tk->expires, - (int64_t)tk->last_ack_ballot); + (int32_t)get_node_id(tk->leader), + (int64_t)tk->term_expires, + (int64_t)tk->current_term); rv = system(cmd); log_info("command: '%s' was executed", cmd); if (rv != 0) log_error("error: \"%s\" failed, %s", cmd, interpret_rv(rv)); return rv; } static int pcmk_store_ticket_nonatomic(struct ticket_config *tk); static int pcmk_grant_ticket(struct ticket_config *tk) { char cmd[COMMAND_MAX]; int rv; test_atomicity(); if (atomicity == YES) return pcmk_write_ticket_atomic(tk, +1); rv = pcmk_store_ticket_nonatomic(tk); if (rv) return rv; snprintf(cmd, COMMAND_MAX, "crm_ticket -t %s -g --force", tk->name); log_info("command: '%s' was executed", cmd); rv = system(cmd); if (rv != 0) log_error("error: \"%s\" failed, %s", cmd, interpret_rv(rv)); return rv; } static int pcmk_revoke_ticket(struct ticket_config *tk) { char cmd[COMMAND_MAX]; int rv; test_atomicity(); if (atomicity == YES) return pcmk_write_ticket_atomic(tk, -1); rv = pcmk_store_ticket_nonatomic(tk); if (rv) return rv; snprintf(cmd, COMMAND_MAX, "crm_ticket -t %s -r --force", tk->name); log_info("command: '%s' was executed", cmd); rv = system(cmd); if (rv != 0) log_error("error: \"%s\" failed, %s", cmd, interpret_rv(rv)); return rv; } static int crm_ticket_set(const struct ticket_config *tk, const char *attr, int64_t val) { char cmd[COMMAND_MAX]; int i, rv; snprintf(cmd, COMMAND_MAX, "crm_ticket -t '%s' -S '%s' -v %" PRIi64, tk->name, attr, val); /* If there are errors, there's not much we can do but retry ... */ for (i=0; i<3 && (rv = system(cmd)); i++) ; log_debug("'%s' gave result %s", cmd, interpret_rv(rv)); return rv; } static int pcmk_store_ticket_nonatomic(struct ticket_config *tk) { int rv; /* Always try to store *each* attribute, even if there's an error * for one of them. */ - rv = crm_ticket_set(tk, "owner", (int32_t)get_node_id(tk->owner)); - rv = crm_ticket_set(tk, "expires", tk->expires) || rv; - rv = crm_ticket_set(tk, "ballot", tk->last_ack_ballot) || rv; + rv = crm_ticket_set(tk, "owner", (int32_t)get_node_id(tk->leader)); + rv = crm_ticket_set(tk, "expires", tk->term_expires) || rv; + rv = crm_ticket_set(tk, "term", tk->current_term) || rv; if (rv) log_error("setting crm_ticket attributes failed; %s", interpret_rv(rv)); else log_info("setting crm_ticket attributes successful"); return rv; } static int crm_ticket_get(struct ticket_config *tk, const char *attr, int64_t *data) { char cmd[COMMAND_MAX]; char line[256]; int rv; int64_t v; FILE *p; *data = -1; v = 0; snprintf(cmd, COMMAND_MAX, "crm_ticket -t '%s' -G '%s' --quiet", tk->name, attr); p = popen(cmd, "r"); if (p == NULL) { rv = errno; log_error("popen error %d (%s) for \"%s\"", rv, strerror(rv), cmd); return rv || -EINVAL; } if (fgets(line, sizeof(line) - 1, p) == NULL) { rv = ENODATA; goto out; } rv = EINVAL; if (sscanf(line, "%" PRIi64, &v) == 1) rv = 0; *data = v; out: rv = pclose(p); log_debug("command \"%s\" returned %s, value %" PRIi64, cmd, interpret_rv(rv), v); return rv; } static int pcmk_load_ticket(struct ticket_config *tk) { int rv; int64_t v; /* This here gets run during startup; testing that here means that * normal operation won't be interrupted with that test. */ test_atomicity(); rv = crm_ticket_get(tk, "expires", &v); if (!rv) { - tk->expires = v; + tk->term_expires = v; } - rv = crm_ticket_get(tk, "ballot", &v); + rv = crm_ticket_get(tk, "term", &v); if (!rv) { - tk->new_ballot = - tk->last_ack_ballot = v; + tk->current_term = v; } rv = crm_ticket_get(tk, "owner", &v); if (!rv) { /* No check, node could have been deconfigured. */ - find_site_by_id(v, &tk->proposed_owner); + find_site_by_id(v, &tk->leader); } - disown_if_expired(tk); + if (disown_if_expired(tk)) + pcmk_revoke_ticket(tk); - tk->proposal_acknowledges = local->bitmask; +// tk->proposal_acknowledges = local->bitmask; /* We load only when the state is completely unknown. */ tk->state = ST_INIT; return rv; } struct ticket_handler pcmk_handler = { .grant_ticket = pcmk_grant_ticket, .revoke_ticket = pcmk_revoke_ticket, .load_ticket = pcmk_load_ticket, }; diff --git a/src/paxos.c b/src/paxos.c deleted file mode 100644 index 5b2e50a..0000000 --- a/src/paxos.c +++ /dev/null @@ -1,507 +0,0 @@ -/* - * Copyright (C) 2011 Jiaju Zhang - * Copyright (C) 2013-2014 Philipp Marek - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ - -#include -#include -#include -#include -#include -#include "booth.h" -#include "transport.h" -#include "inline-fn.h" -#include "config.h" -#include "paxos.h" -#include "log.h" - - -static uint32_t next_ballot_number(struct ticket_config *tk) -{ - uint32_t b; - - /* TODO: getenv() for debugging */ - - b = tk->new_ballot; - /* + unique number */ - b += local->bitmask; - /* + weight */ - b += booth_conf->site_bits * tk->weight[ local->index ]; - return b; -} - - -static inline void set_proposal_in_ticket(struct ticket_config *tk, - struct booth_site *from, - uint32_t ballot, struct booth_site *new_owner) -{ - tk->proposer = from; - tk->new_ballot = ballot; - tk->proposed_owner = new_owner; - tk->proposal_expires = 0; // TODO - needed? - tk->proposal_acknowledges = from->bitmask | local->bitmask; - - /* We lose (?) */ - tk->state = ST_STABLE; -} - - -int should_switch_state_p(struct ticket_config *tk) -{ - if (all_agree(tk)) { - log_debug("all agree"); - return 1; - } - - if (majority_agree(tk)) { - /* Time passed, and more than half agree. */ - if (timeval_in_past(tk->proposal_switch)) { - log_debug("majority, and enough time passed"); - return 2; - } - - if (!tk->proposal_switch.tv_sec) { - log_debug("majority, wait half a second"); - /* Wait half a second before doing the state change. */ - ticket_next_cron_in(tk, 0.5); - tk->proposal_switch = tk->next_cron; - } - } - - return 0; -} - - -static int retries_exceeded(struct ticket_config *tk) -{ - int ret; - - if (tk->retry_number >= tk->retries) { - log_info("ABORT %s for ticket \"%s\" - " - "not enough answers after %d retries (of %d)", - tk->state == OP_PREPARING ? "prepare" : "propose", - tk->name, tk->retry_number, tk->retries); - abort_proposal(tk); - - - /* Keep on trying to refresh. */ - if (owner_and_valid(tk)) - tk->state = ST_STABLE; - - ret = EBUSY; - } else { - /* We ask others for a change; retry to get - * consensus. - * But don't ask again immediately after a - * query, give the peers time to answer. */ - if (timeval_in_past(tk->proposal_switch)) { - ticket_broadcast_proposed_state(tk, tk->state); - } - ret = 0; - } - - - disown_if_expired(tk); - ticket_activate_timeout(tk); - - return ret; -} - - -static inline void change_ticket_owner(struct ticket_config *tk, - uint32_t ballot, - struct booth_site *new_owner) -{ - /* set "previous" value for next round */ - tk->last_ack_ballot = - tk->new_ballot = ballot; - - tk->owner = new_owner; - tk->expires = time(NULL) + tk->expiry; - tk->proposer = NULL; - - tk->state = ST_STABLE; - - set_ticket_wakeup(tk); - log_info("Now actively COMMITTED for \"%s\": new owner %s, ballot %d", - tk->name, - ticket_owner_string(tk->owner), - ballot); - - - ticket_write(tk); -} - - -void abort_proposal(struct ticket_config *tk) -{ - log_info("ABORTing proposal."); - tk->proposer = NULL; - tk->proposed_owner = tk->owner; - tk->retry_number = 0; - /* Ask others (repeatedly) until we know the new owner. */ - tk->state = ST_INIT; -} - - -int PROPOSE_to_COMMIT(struct ticket_config *tk) -{ - int rv; - - if (should_switch_state_p(tk)) { - change_ticket_owner(tk, tk->new_ballot, tk->proposed_owner); - - rv = ticket_broadcast_proposed_state(tk, OP_COMMITTED); - tk->state = ST_STABLE; - return rv; - } - - return retries_exceeded(tk); -} - - -int PREPARE_to_PROPOSE(struct ticket_config *tk) -{ - if (should_switch_state_p(tk)) { - return ticket_broadcast_proposed_state(tk, OP_PROPOSING); - } - - return retries_exceeded(tk); -} - - - -/** \defgroup msghdl Message handling functions. - * - * Not all use all arguments; but to keep the interface the same, - * they're all just passed everything we have. - * - * See also enum \ref cmd_request_t. - * @{ */ - - -/** Start a PAXOS round, by sending out an OP_PREPARING. */ -int paxos_start_round(struct ticket_config *tk, struct booth_site *new_owner) -{ - if (tk->state != ST_STABLE) - return RLT_BUSY; - - /* This may not be called repeatedly from cron, - * because the ballot number would simply - * get counted up without any benefit. - * The message may get retransmitted, though. - * Normal retry behaviour gets achieved during - * state OP_PREPARING anyway. */ - tk->proposer = local; - tk->new_ballot = next_ballot_number(tk); - tk->proposed_owner = new_owner; - - tk->retry_number = 0; - ticket_activate_timeout(tk); - - /* TODO: shorten renew exchange by just sending - * a new proposal? Ballot numbers should still be the - * same everywhere, owner doesn't change. */ - return ticket_broadcast_proposed_state(tk, OP_PREPARING); -} - - - -/** Answering OP_PREPARING means sending out OP_PROMISING. */ -inline static int answer_PREP( - struct ticket_config *tk, - struct booth_site *from, - struct boothc_ticket_msg *msg, - uint32_t ballot, - struct booth_site *new_owner) -{ - if (!(local->role & ACCEPTOR)) - return 0; - - /* Ignore if packet is too late, and state is already active. */ - if (tk->owner == new_owner && - ballot == tk->last_ack_ballot) - return 0; - - /* We have to be careful here. - * Getting multiple copies of the same message must not trigger - * rejections, but only repeated promises. */ - if (from == tk->proposer && - ballot == tk->new_ballot) - goto promise; - - - /* It doesn't matter whether it's the same or another host; - * the only distinction is the ballot number. */ - if (ballot > tk->new_ballot) { -promise: - msg->header.cmd = htonl(OP_PROMISING); - msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); - - set_proposal_in_ticket(tk, from, ballot, new_owner); - - log_info("PROMISING for ticket \"%s\" (by %s) for %d", - tk->name, from->addr_string, ballot); - } else { - msg->header.cmd = htonl(OP_REJECTED); - msg->ticket.ballot = htonl(tk->new_ballot); - msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); - - log_info("REJECTING (prep) for ticket \"%s\" from %s - have %d, wanted %d", - tk->name, from->addr_string, - tk->new_ballot, ballot); - } - init_header_bare(&msg->header); - return booth_udp_send(from, msg, sizeof(*msg)); -} - - -/** Getting OP_REJECTED means abandoning the current operation. */ -inline static int handle_REJ( - struct ticket_config *tk, - struct booth_site *from, - struct boothc_ticket_msg *msg, - uint32_t ballot, - struct booth_site *new_owner) -{ - if (tk->last_ack_ballot == ballot) { - log_debug("got a late REJECTED; ignored, as " - "ballot %d is already active.", - tk->last_ack_ballot); - return 0; - } - - - log_info("got REJECTED for ticket \"%s\", ballot %d (has %d), from %s", - tk->name, - tk->new_ballot, ballot, - from->addr_string); - - abort_proposal(tk); - - /* TODO: should we check whether that sequence is increasing? */ - tk->new_ballot = ballot_max2(tk->new_ballot, ballot); - tk->last_ack_ballot = ballot_max2(tk->last_ack_ballot, - ntohl(msg->ticket.prev_ballot)); - - /* No need to ask the others. */ - tk->state = ST_STABLE; - return 0; -} - - -/** After a few OP_PROMISING replies we can send out OP_PROPOSING. */ -inline static int got_a_PROM( - struct ticket_config *tk, - struct booth_site *from, - struct boothc_ticket_msg *msg, - uint32_t ballot, - struct booth_site *new_owner) -{ - int had_that; - - if (tk->proposer == local && - tk->state == OP_PREPARING && - tk->new_ballot == ballot) { - had_that = tk->proposal_acknowledges & from->bitmask; - - tk->proposal_acknowledges |= from->bitmask; - - log_info("Got PROMISE from %s for \"%s\", for %d, acks now 0x%" PRIx64, - from->addr_string, tk->name, - tk->new_ballot, - tk->proposal_acknowledges); - if (had_that) - return 0; - - return PREPARE_to_PROPOSE(tk); - } - - - /* Packet just delayed? Silently ignore. */ - if (ballot == tk->last_ack_ballot && - (new_owner == tk->owner || - new_owner == tk->proposed_owner)) - return 0; - - /* Message sent to wrong host? */ - log_debug("got unexpected PROMISE from %s for \"%s\"", - from->addr_string, tk->name); - - return 0; -} - - -/** Answering OP_PROPOSING means sending out OP_ACCEPTING. */ -inline static int answer_PROP( - struct ticket_config *tk, - struct booth_site *from, - struct boothc_ticket_msg *msg, - uint32_t ballot, - struct booth_site *new_owner) -{ - if (!(local->role & ACCEPTOR)) - return 0; - - - /* Repeated packet. */ - if (new_owner == tk->owner && - ballot == tk->new_ballot) - goto accepting; - - /* If packet is late, ie. we already have that state, - * just repeat the ack - perhaps it got lost. */ - if (new_owner == tk->owner && - ballot == tk->last_ack_ballot) - goto accepting; - - - /* We have to be careful here. - * Getting multiple copies of the same message must not trigger - * rejections, but only repeated OP_ACCEPTING messages. */ - if (ballot > tk->last_ack_ballot && - ballot == tk->new_ballot && - ntohl(msg->ticket.prev_ballot) == tk->last_ack_ballot) { - if (tk->proposer) { - /* Send OP_REJECTED to previous proposer? */ - log_info("new PROPOSAL for ticket \"%s\" overriding older one from %s", - tk->name, from->addr_string); - } - - tk->proposer = from; - -accepting: - init_ticket_msg(msg, OP_ACCEPTING, RLT_SUCCESS, tk); - - log_info("sending ACCEPT for ticket \"%s\" (by %s) for %d - new owner %s", - tk->name, from->addr_string, ballot, - ticket_owner_string(new_owner)); - change_ticket_owner(tk, ballot, new_owner); - } else if (ballot == tk->last_ack_ballot && - ballot == tk->new_ballot && - ntohl(msg->ticket.prev_ballot) == tk->last_ack_ballot) { - /* Silently ignore delayed messages. */ - } else { - msg->header.cmd = htonl(OP_REJECTED); - msg->ticket.ballot = htonl(tk->new_ballot); - msg->ticket.prev_ballot = htonl(tk->last_ack_ballot); - - log_info("REJECTING (prop) for ticket \"%s\" from %s - have %d, wanted %d", - tk->name, from->addr_string, - tk->new_ballot, ballot); - } - init_header_bare(&msg->header); - return booth_udp_send(from, msg, sizeof(*msg)); -} - - -/** After enough OP_ACCEPTING we can do the change, and send an OP_COMMITTED. */ -inline static int got_an_ACC( - struct ticket_config *tk, - struct booth_site *from, - struct boothc_ticket_msg *msg, - uint32_t ballot, - struct booth_site *new_owner) -{ - if (tk->proposer == local && - tk->state == OP_PROPOSING) { - tk->proposal_acknowledges |= from->bitmask; - - log_info("Got ACCEPTING from %s for \"%s\", acks now 0x%" PRIx64, - from->addr_string, tk->name, - tk->proposal_acknowledges); - - return PROPOSE_to_COMMIT(tk); - } - return 0; -} - - -/** An OP_COMMITTED gets no answer; just record the new state. */ -inline static int answer_COMM( - struct ticket_config *tk, - struct booth_site *from, - struct boothc_ticket_msg *msg, - uint32_t ballot, - struct booth_site *new_owner) -{ - /* We cannot check whether the packet is from an expected proposer - - * perhaps this is the _only_ message of the whole handshake? */ - - if (ballot == tk->last_ack_ballot && - new_owner == tk->owner) { - /* Ignored - just acknowledging the commit. */ - log_info("COMMIT message from \"%s\" received.", from->addr_string); - } else if (ballot > tk->new_ballot && - ntohl(msg->ticket.prev_ballot) == tk->last_ack_ballot) { - /* Received a COMMIT, but no prior packets; - * might happen because the network just got - * connected again. */ - log_info("COMMIT message from \"%s\" for new ballot %d received.", - from->addr_string, ballot); - change_ticket_owner(tk, ballot, new_owner); - } else { - log_info("commit message from \"%s\" discarded.", from->addr_string); - } - - /* Send ack? */ - return 0; - -} - -/** @} */ - - -int paxos_answer( - struct ticket_config *tk, - struct booth_site *from, - struct boothc_ticket_msg *msg, - uint32_t ballot, - struct booth_site *new_owner_p) -{ - int cmd; - - cmd = ntohl(msg->header.cmd); - - /* These are in roughly chronological order. - * What the first machine sends is an OP_PREPARING - * (see paxos_start_round()), which gets received - * (below) from the others ... */ - switch (cmd) { - case OP_PREPARING: - return answer_PREP(tk, from, msg, ballot, new_owner_p); - - case OP_REJECTED: - return handle_REJ(tk, from, msg, ballot, new_owner_p); - - case OP_PROMISING: - return got_a_PROM(tk, from, msg, ballot, new_owner_p); - - case OP_PROPOSING: - return answer_PROP(tk, from, msg, ballot, new_owner_p); - - case OP_ACCEPTING: - return got_an_ACC(tk, from, msg, ballot, new_owner_p); - - case OP_COMMITTED: - return answer_COMM(tk, from, msg, ballot, new_owner_p); - - default: - log_error("unprocessed message, cmd %x", cmd); - return -EINVAL; - } -} diff --git a/src/raft.c b/src/raft.c new file mode 100644 index 0000000..8e01f0d --- /dev/null +++ b/src/raft.c @@ -0,0 +1,508 @@ +/* + * Copyright (C) 2014 Philipp Marek + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include +#include +#include +#include +#include +#include "booth.h" +#include "transport.h" +#include "inline-fn.h" +#include "config.h" +#include "raft.h" +#include "ticket.h" +#include "log.h" + + + +inline static void clear_election(struct ticket_config *tk) +{ + int i; + struct booth_site *site; + + log_info("clear election"); + tk->votes_received = 0; + foreach_node(i, site) + tk->votes_for[site->index] = NULL; +} + + +inline static void site_voted_for(struct ticket_config *tk, + struct booth_site *who, + struct booth_site *vote) +{ + log_info("site \"%s\" votes for \"%s\"", + site_string(who), + site_string(vote)); + + if (!tk->votes_for[who->index]) { + tk->votes_for[who->index] = vote; + tk->votes_received |= who->bitmask; + } else { + if (tk->votes_for[who->index] != vote) + log_error("voted previously (but in same term!) for \"%s\"...", + tk->votes_for[who->index]->addr_string); + } +} + + +static void become_follower(struct ticket_config *tk, + struct boothc_ticket_msg *msg) +{ + uint32_t i; + int duration; + + tk->state = ST_FOLLOWER; + + + duration = tk->term_duration; + if (msg) + duration = min(duration, ntohl(msg->ticket.term_valid_for)); + tk->term_expires = time(NULL) + duration; + + + if (msg) { + i = ntohl(msg->ticket.term); + tk->current_term = max(i, tk->current_term); + + /* § 5.3 */ + i = ntohl(msg->ticket.leader_commit); + tk->commit_index = max(i, tk->commit_index); + } + + + ticket_write(tk); +} + + +static struct booth_site *majority_votes(struct ticket_config *tk) +{ + int i, n; + struct booth_site *v; + int count[MAX_NODES] = { 0, }; + + + for(i=0; isite_count; i++) { + v = tk->votes_for[i]; + if (!v) + continue; + + n = v->index; + count[n]++; + log_info("Majority: %d \"%s\" wants %d \"%s\" => %d", + i, booth_conf->site[i].addr_string, + n, v->addr_string, + count[n]); + + if (count[n]*2 <= booth_conf->site_count) + continue; + + + log_info("Majority reached: %d of %d for \"%s\"", + count[n], booth_conf->site_count, + v->addr_string); + return v; + } + + return NULL; +} + + +static int newer_term(struct ticket_config *tk, + struct booth_site *sender, + struct booth_site *leader, + struct boothc_ticket_msg *msg) +{ + uint32_t term; + + term = ntohl(msg->ticket.term); + /* §5.1 */ + if (term > tk->current_term) { + tk->state = ST_FOLLOWER; + tk->leader = leader; + log_info("higher term %d vs. %d, following \"%s\"", + term, tk->current_term, + ticket_leader_string(tk)); + + tk->current_term = term; + return 1; + } + + return 0; +} + +static int term_too_low(struct ticket_config *tk, + struct booth_site *sender, + struct booth_site *leader, + struct boothc_ticket_msg *msg) +{ + uint32_t term; + + term = ntohl(msg->ticket.term); + /* §5.1 */ + if (term < tk->current_term) + { + log_info("sending REJECT, term too low."); + send_reject(sender, tk, RLT_TERM_OUTDATED); + return 1; + } + + return 0; +} + + + + +/* For follower. */ +static int answer_HEARTBEAT ( + struct ticket_config *tk, + struct booth_site *sender, + struct booth_site *leader, + struct boothc_ticket_msg *msg + ) +{ + uint32_t term; + struct boothc_ticket_msg omsg; + + + + term = ntohl(msg->ticket.term); + log_debug("leader: %s, have %s; term %d vs %d", + site_string(leader), ticket_leader_string(tk), + term, tk->current_term); + + /* No reject. (?) */ + if (term < tk->current_term) + return 0; + + /* Needed? */ + newer_term(tk, sender, leader, msg); + + become_follower(tk, msg); + /* Racy??? */ + assert(sender == leader || !leader); + + tk->leader = leader; + + + /* Yeth, mathter. */ + init_ticket_msg(&omsg, OP_HEARTBEAT, RLT_SUCCESS, tk); + return booth_udp_send(sender, &omsg, sizeof(omsg)); +} + + +/* For leader. */ +static int process_HEARTBEAT( + struct ticket_config *tk, + struct booth_site *sender, + struct booth_site *leader, + struct boothc_ticket_msg *msg + ) +{ + uint32_t term; + + + if (newer_term(tk, sender, leader, msg)) { + /* Uh oh. Higher term?? Should we simply believe that? */ + log_error("Got higher term number from"); + return 0; + } + + + term = ntohl(msg->ticket.term); + + /* Don't send a reject. */ + if (term < tk->current_term) { + /* Doesn't know what he's talking about - perhaps + * doesn't receive our packets? */ + log_error("Stale/wrong heartbeat from \"%s\": " + "term %d instead of %d", + site_string(sender), + term, tk->current_term); + return 0; + } + + + if (term == tk->current_term && + leader == tk->leader) { + /* Hooray, an ACK! */ + /* So at least _someone_ is listening. */ + tk->hb_received |= sender->bitmask; + + log_debug("Got heartbeat ACK from \"%s\", %d/%d agree.", + site_string(sender), + count_bits(tk->hb_received), + booth_conf->site_count); + + + if (majority_of_bits(tk, tk->hb_received)) { + /* OK, at least half of the nodes are reachable; + * no need to do anything until + * the next heartbeat should be sent. */ + set_ticket_wakeup(tk); + tk->retry_number = 0; + } else { + /* Not enough answers yet; + * wait until timeout expires. */ + ticket_activate_timeout(tk); + } + } + + return 0; +} + + +static int process_VOTE_FOR( + struct ticket_config *tk, + struct booth_site *sender, + struct booth_site *leader, + struct boothc_ticket_msg *msg + ) +{ + uint32_t term; + struct booth_site *new_leader; + + + term = ntohl(msg->ticket.term); + if (term_too_low(tk, sender, leader, msg)) + return 0; + + + if (term == tk->current_term && + tk->election_end < time(NULL)) { + /* Election already ended - either by time or majority. + * Ignore. */ + return 0; + } + + + if (newer_term(tk, sender, leader, msg)) { + clear_election(tk); + } + + + site_voted_for(tk, sender, leader); + + + /* §5.2 */ + new_leader = majority_votes(tk); + if (new_leader) { + tk->leader = new_leader; + + tk->term_expires = time(NULL) + tk->term_duration; + tk->election_end = 0; + tk->voted_for = NULL; + + if ( new_leader == local) { + tk->commit_index++; // ?? + tk->state = ST_LEADER; + send_heartbeat(tk); + ticket_write(tk); + } + else + become_follower(tk, NULL); + } + + set_ticket_wakeup(tk); + return 0; +} + + +static int process_REJECTED( + struct ticket_config *tk, + struct booth_site *sender, + struct booth_site *leader, + struct boothc_ticket_msg *msg + ) +{ + uint32_t rv; + + rv = ntohl(msg->header.result); + + if (tk->state == ST_CANDIDATE && + rv == RLT_TERM_OUTDATED) { + log_info("Am out of date, become follower."); + tk->leader = leader; + become_follower(tk, msg); + return 0; + } + + + if (tk->state == ST_CANDIDATE && + rv == RLT_TERM_STILL_VALID) { + log_error("There's a leader that I don't see: \"%s\"", + site_string(leader)); + tk->leader = leader; + become_follower(tk, msg); + return 0; + } + + log_error("unhandled reject: in state %s, got %s.", + state_to_string(tk->state), + state_to_string(rv)); + tk->leader = leader; + become_follower(tk, msg); + return 0; +} + + +/* §5.2 */ +static int answer_REQ_VOTE( + struct ticket_config *tk, + struct booth_site *sender, + struct booth_site *leader, + struct boothc_ticket_msg *msg + ) +{ + uint32_t term; + int valid; + struct boothc_ticket_msg omsg; + + + if (term_too_low(tk, sender, leader, msg)) + return 0; + if (newer_term(tk, sender, leader, msg)) + goto vote_for_her; + + + term = ntohl(msg->ticket.term); + /* Important: Ignore duplicated packets! */ + valid = term_valid_for(tk); + if (valid && + term == tk->current_term && + sender == tk->leader) { + log_debug("Duplicate OP_VOTE_FOR ignored."); + return 0; + } + + if (valid) { + log_debug("no election allowed, term valid for %d??", valid); + return send_reject(sender, tk, RLT_TERM_STILL_VALID); + } + + /* §5.2, §5.4 */ + if (!tk->voted_for) { +vote_for_her: + tk->voted_for = sender; + site_voted_for(tk, sender, leader); + goto yes_you_can; + } + + +yes_you_can: + init_ticket_msg(&omsg, OP_VOTE_FOR, RLT_SUCCESS, tk); + omsg.ticket.leader = htonl(get_node_id(tk->voted_for)); + + return transport()->broadcast(&omsg, sizeof(omsg)); +} + + +int new_election(struct ticket_config *tk, struct booth_site *preference) +{ + struct booth_site *new_leader; + time_t now; + + + time(&now); + log_debug("start new election?, now=%" PRIi64 ", end %" PRIi64, + now, tk->election_end); + if (now <= tk->election_end) + return 0; + + + /* §5.2 */ + /* If there was _no_ answer, don't keep incrementing the term number + * indefinitely. If there was no peer, there'll probably be no one + * listening now either. + * Own vote can be disregarded. + * Not entirely correct? After startup the term should be incremented + * once, to speed up becoming a leader? + * Perhaps only increment once, and then try to rebuild with the same + * term number? With 5 nodes the 2 node partition would still increment + * endlessly. */ + if (count_bits(tk->votes_received) > 1) + tk->current_term++; + + tk->term_expires = 0; + tk->election_end = now + tk->term_duration; + + log_debug("start new election! term=%d, until %" PRIi64, + tk->current_term, tk->election_end); + clear_election(tk); + + if(preference) + new_leader = preference; + else + new_leader = (local->type == SITE) ? local : NULL; + site_voted_for(tk, local, new_leader); + tk->voted_for = new_leader; + + tk->state = ST_CANDIDATE; + + ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS); + return 0; +} + + +int raft_answer( + struct ticket_config *tk, + struct booth_site *from, + struct booth_site *leader, + struct boothc_ticket_msg *msg + ) +{ + int cmd; + int rv; + + rv = 0; + cmd = ntohl(msg->header.cmd); + R(tk); + + log_debug("got message %s from \"%s\"", + state_to_string(cmd), + from->addr_string); + + + switch (cmd) { + case OP_REQ_VOTE: + rv = answer_REQ_VOTE(tk, from, leader, msg); + break; + case OP_VOTE_FOR: + rv = process_VOTE_FOR(tk, from, leader, msg); + break; + case OP_HEARTBEAT: + if (tk->leader == local && + tk->state == ST_LEADER) + rv = process_HEARTBEAT(tk, from, leader, msg); + else if (tk->leader != local && + tk->state == ST_FOLLOWER) + rv = answer_HEARTBEAT(tk, from, leader, msg); + else + assert("invalid combination - leader, follower"); + break; + case OP_REJECTED: + rv = process_REJECTED(tk, from, leader, msg); + break; + default: + log_error("unprocessed message, cmd %x", cmd); + rv = -EINVAL; + } + R(tk); + return rv; +} diff --git a/src/paxos.h b/src/raft.h similarity index 51% rename from src/paxos.h rename to src/raft.h index 26afb4c..3f03196 100644 --- a/src/paxos.h +++ b/src/raft.h @@ -1,46 +1,44 @@ -/* - * Copyright (C) 2011 Jiaju Zhang - * Copyright (C) 2013-2014 Philipp Marek - * +/* + * Copyright (C) 2014 Philipp Marek + * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. - * + * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. - * + * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#ifndef _PAXOS_H -#define _PAXOS_H +#ifndef _RAFT_H +#define _RAFT_H -#include "config.h" -#include "ticket.h" +#include "booth.h" -#define PROPOSER 0x4 -#define ACCEPTOR 0x2 -#define LEARNER 0x1 +typedef enum { + ST_INIT = CHAR2CONST('I', 'n', 'i', 't'), + ST_FOLLOWER = CHAR2CONST('F', 'l', 'l', 'w'), + ST_CANDIDATE = CHAR2CONST('C', 'n', 'd', 'i'), + ST_LEADER = CHAR2CONST('L', 'e', 'a', 'd'), +} server_state_e; -int paxos_answer( - struct ticket_config *tk, +struct ticket_config; + +int raft_answer(struct ticket_config *tk, struct booth_site *from, - struct boothc_ticket_msg *msg, - uint32_t ballot, - struct booth_site *new_owner_p); + struct booth_site *leader, + struct boothc_ticket_msg *msg); -int paxos_start_round(struct ticket_config *tk, struct booth_site *new_owner); -void abort_proposal(struct ticket_config *tk); +int new_election(struct ticket_config *tk, struct booth_site *new_leader); +int start_election(struct ticket_config *tk, struct booth_site *new_leader); -int PREPARE_to_PROPOSE(struct ticket_config *tk); -int PROPOSE_to_COMMIT(struct ticket_config *tk); -int should_switch_state_p(struct ticket_config *tk); -#endif /* _PAXOS_H */ +#endif /* _RAFT_H */ diff --git a/src/ticket.c b/src/ticket.c index ea230c3..1182fff 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -1,799 +1,648 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include "ticket.h" #include "config.h" #include "pacemaker.h" #include "inline-fn.h" #include "log.h" #include "booth.h" -#include "paxos.h" +#include "raft.h" #include "handler.h" #define TK_LINE 256 /* Untrusted input, must fit (incl. \0) in a buffer of max chars. */ int check_max_len_valid(const char *s, int max) { int i; for(i=0; iticket_count; i++) { if (!strcmp(booth_conf->ticket[i].name, ticket)) { if (found) *found = booth_conf->ticket + i; return 1; } } return 0; } int check_ticket(char *ticket, struct ticket_config **found) { if (found) *found = NULL; if (!booth_conf) return 0; if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name))) return 0; return find_ticket_by_name(ticket, found); } int check_site(char *site, int *is_local) { struct booth_site *node; if (!check_max_len_valid(site, sizeof(node->addr_string))) return 0; if (find_site_by_name(site, &node, 0)) { *is_local = node->local; return 1; } return 0; } +#if 0 /** Find out what others think about this ticket. * * If we're a SITE, we can ask (and have to tell) Pacemaker. * An ARBITRATOR can only ask others. */ static int ticket_send_catchup(struct ticket_config *tk) { int i, rv = 0; struct booth_site *site; struct boothc_ticket_msg msg; foreach_node(i, site) { if (!site->local) { init_ticket_msg(&msg, CMD_CATCHUP, RLT_SUCCESS, tk); log_debug("attempting catchup from %s", site->addr_string); rv = booth_udp_send(site, &msg, sizeof(msg)); } } ticket_activate_timeout(tk); return rv; } +#endif int ticket_write(struct ticket_config *tk) { if (local->type != SITE) return -EINVAL; disown_if_expired(tk); - if (tk->owner == local) { + if (tk->leader == local) { pcmk_handler.grant_ticket(tk); } else { pcmk_handler.revoke_ticket(tk); } return 0; } /* Ask an external program whether getting the ticket * makes sense. * Eg. if the services have a failcount of INFINITY, * we can't serve here anyway. */ int get_ticket_locally_if_allowed(struct ticket_config *tk) { int rv; if (!tk->ext_verifier) goto get_it; rv = run_handler(tk, tk->ext_verifier, 1); if (rv) { log_error("May not acquire ticket."); /* Give it to somebody else. * Just send a commit message, as the * others couldn't help anyway. */ - if (owner_and_valid(tk)) { + if (leader_and_valid(tk)) { disown_ticket(tk); +#if 0 tk->proposed_owner = NULL; /* Just go one further - others may easily override. */ tk->new_ballot++; ticket_broadcast_proposed_state(tk, OP_COMMITTED); tk->state = ST_STABLE; +#endif + ticket_broadcast(tk, OP_VOTE_FOR, RLT_SUCCESS); } return rv; } else { - log_info("May keep ticket."); + log_info("May get/keep ticket."); } get_it: - return paxos_start_round(tk, local); + if (leader_and_valid(tk)) { + return send_heartbeat(tk); + } else { + /* Ticket should now become active locally, wasn't before. */ + new_election(tk, local); + return ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS); + } } /** Try to get the ticket for the local site. * */ int do_grant_ticket(struct ticket_config *tk) { int rv; - if (tk->owner == local) + if (tk->leader == local) return RLT_SUCCESS; - if (tk->owner) + if (tk->leader) return RLT_OVERGRANT; rv = get_ticket_locally_if_allowed(tk); return rv; } /** Start a PAXOS round for revoking. * That can be started from any site. */ int do_revoke_ticket(struct ticket_config *tk) { int rv; - if (!tk->owner) + if (!tk->leader) return RLT_SUCCESS; + disown_ticket(tk); + tk->voted_for = no_leader; + ticket_write(tk); + + tk->state = ST_FOLLOWER; + /* Start a new vote round, with a new term number. */ + tk->current_term++; + return ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS); +#if 0 rv = paxos_start_round(tk, NULL); +#endif return rv; } int list_ticket(char **pdata, unsigned int *len) { struct ticket_config *tk; char timeout_str[64]; char *data, *cp; int i, alloc; *pdata = NULL; *len = 0; alloc = 256 + booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128); data = malloc(alloc); if (!data) return -ENOMEM; cp = data; foreach_ticket(i, tk) { - if (tk->expires != 0) + if (tk->term_expires != 0) strftime(timeout_str, sizeof(timeout_str), "%F %T", - localtime(&tk->expires)); + localtime(&tk->term_expires)); else strcpy(timeout_str, "INF"); cp += sprintf(cp, - "ticket: %s, owner: %s, expires: %s, ballot: %d\n", + "ticket: %s, leader: %s, expires: %s, commit: %d\n", tk->name, - tk->owner ? tk->owner->addr_string : "None", + ticket_leader_string(tk), timeout_str, - tk->last_ack_ballot); + tk->commit_index); *len = cp - data; assert(*len < alloc); } *pdata = data; return 0; } int setup_ticket(void) { struct ticket_config *tk; int i; /* TODO */ foreach_ticket(i, tk) { - tk->owner = NULL; - tk->expires = 0; + tk->leader = NULL; + tk->term_expires = 0; - abort_proposal(tk); + // abort_proposal(tk); - if (local->role & PROPOSER) { + if (local->type == SITE) { pcmk_handler.load_ticket(tk); } + + /* There might be a leader; wait for its notification. */ + tk->term_expires = time(NULL) + tk->term_duration; + tk->state = ST_FOLLOWER; + /* TODO: send query packet to see sooner who's online. */ } return 0; } int ticket_answer_list(int fd, struct boothc_ticket_msg *msg) { char *data; int olen, rv; struct boothc_header hdr; rv = list_ticket(&data, &olen); if (rv < 0) return rv; init_header(&hdr, CMR_LIST, RLT_SUCCESS, sizeof(hdr) + olen); return send_header_plus(fd, &hdr, data, olen); } int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { log_error("Client asked to grant unknown ticket"); rv = RLT_INVALID_ARG; goto reply; } - if (tk->owner) { + if (tk->leader) { log_error("client wants to get an (already granted!) ticket \"%s\"", msg->ticket.id); rv = RLT_OVERGRANT; goto reply; } rv = do_grant_ticket(tk); reply: init_header(&msg->header, CMR_GRANT, rv ?: RLT_ASYNC, sizeof(*msg)); return send_ticket_msg(fd, msg); } int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { log_error("Client asked to grant unknown ticket"); rv = RLT_INVALID_ARG; goto reply; } - if (!tk->owner) { + if (!tk->leader) { log_info("client wants to revoke a free ticket \"%s\"", msg->ticket.id); /* Return a different result code? */ rv = RLT_SUCCESS; goto reply; } rv = do_revoke_ticket(tk); if (rv == 0) rv = RLT_ASYNC; reply: init_ticket_msg(msg, CMR_REVOKE, rv, tk); return send_ticket_msg(fd, msg); } -/** Got a CMD_CATCHUP query. - * In this file because it's mostly used during startup. */ -static int ticket_answer_catchup( - struct ticket_config *tk, - struct booth_site *from, - struct boothc_ticket_msg *msg, - uint32_t ballot, - struct booth_site *new_owner) -{ - int rv; - - - log_debug("got CATCHUP query for \"%s\" from %s", - msg->ticket.id, from->addr_string); - - /* We do _always_ answer. - * In case all booth daemons are restarted at the same time, nobody - * would answer any questions, leading to timeouts and delays. - * Just admit we don't know. */ - - rv = (tk->state == ST_INIT) ? - RLT_PROBABLY_SUCCESS : RLT_SUCCESS; - - init_ticket_msg(msg, CMR_CATCHUP, rv, tk); - - /* On catchup, don't tell about ongoing proposals; - * if we did, the other site might believe that the - * ballot numbers have already been used. - * Send the known ballot number, so that a PREPARE - * gets accepted. */ - msg->ticket.ballot = msg->ticket.prev_ballot; - - return booth_udp_send(from, msg, sizeof(*msg)); -} - - -/** Got a CMR_CATCHUP message. - * Gets handled here because it's not PAXOS per se, - * but only needed during startup. */ -static int ticket_process_catchup( - struct ticket_config *tk, - struct booth_site *from, - struct boothc_ticket_msg *msg, - uint32_t ballot, - struct booth_site *new_owner) -{ - int rv; - uint32_t prev_ballot; - time_t peer_expiry; - - - log_info("got CATCHUP answer for \"%s\" from %s; says owner %s with ballot %d", - tk->name, from->addr_string, - ticket_owner_string(new_owner), ballot); - prev_ballot = ntohl(msg->ticket.prev_ballot); - - rv = ntohl(msg->header.result); - if (rv != RLT_SUCCESS && - rv != RLT_PROBABLY_SUCCESS) { - log_error("dropped because of wrong rv: 0x%x", rv); - return -EINVAL; - } - - if (ballot == tk->new_ballot && - ballot == tk->last_ack_ballot && - new_owner == tk->owner) { - /* Peer says the same thing we're believing. */ - tk->proposal_acknowledges |= from->bitmask | local->bitmask; - tk->expires = ntohl(msg->ticket.expiry) + time(NULL); - - if (should_switch_state_p(tk)) { - if (tk->state == ST_INIT) - tk->state = ST_STABLE; - } - - disown_if_expired(tk); - log_debug("catchup: peer ack 0x%" PRIx64 ", now state '%s'", - tk->proposal_acknowledges, - state_to_string(tk->state)); - goto ex; - } - - - if (ticket_valid_for(tk) == 0 && !tk->owner) { - /* We see the ticket as expired, and therefore don't know an owner. - * So believe some other host. */ - tk->state = ST_STABLE; - log_debug("catchup: no owner locally, believe peer."); - goto accept; - } - - - if (ballot >= tk->new_ballot && - ballot >= tk->last_ack_ballot && - rv == RLT_SUCCESS) { - /* Peers seems to know better, but as yet we only have _her_ - * word for that. */ - log_debug("catchup: peer has higher ballot: %d >= %d/%d", - ballot, tk->new_ballot, tk->last_ack_ballot); - -accept: - peer_expiry = ntohl(msg->ticket.expiry) + time(NULL); - tk->expires = (tk->expires > peer_expiry) ? - tk->expires : peer_expiry; - tk->new_ballot = ballot_max2(ballot, tk->new_ballot); - tk->last_ack_ballot = ballot_max2(prev_ballot, tk->last_ack_ballot); - tk->owner = new_owner; - tk->proposal_acknowledges = from->bitmask; - - /* We stay in ST_INIT and wait for confirmation. */ - goto ex; - } - - - if (ballot >= tk->last_ack_ballot && - rv == RLT_PROBABLY_SUCCESS && - tk->state == ST_INIT && - tk->retry_number > 3) { - /* Peer seems to know better than us, and there's no - * convincing other report. Just take it. */ - tk->state = ST_STABLE; - log_debug("catchup: exceeded retries, peer has higher ballot."); - goto accept; - } - - - if (ballot < tk->new_ballot || - ballot < tk->last_ack_ballot) { - /* Peer seems outdated ... tell it to reload? */ - log_debug("catchup: peer outdated?"); -#if 0 - init_ticket_msg(&msg, CMD_DO_CATCHUP, RLT_SUCCESS, tk, &tk->current_state); -#endif - goto ex; - } - - - if (ballot >= tk->last_ack_ballot && - local->type == SITE && - new_owner == tk->owner) { - /* We've got some information (local Pacemaker?), and a peer - * says same owner, with same or higher ballot number. */ - log_debug("catchup: peer agrees about owner."); - goto ex; - } - - log_debug("catchup: unhandled situation!"); - -ex: - ticket_write(tk); - - if (tk->state == ST_STABLE) { - /* If we believe to have enough information, we can try to - * acquire the ticket (again). */ - time(&tk->expires); - } - - /* Allow further actions. */ - ticket_activate_timeout(tk); - - return 0; -} - - -/** Send new state request to all sites. - * Perhaps this should take a flag for ACCEPTOR etc.? - * No need currently, as all nodes are more or less identical. */ -int ticket_broadcast_proposed_state(struct ticket_config *tk, cmd_request_t state) +int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_result_t res) { struct boothc_ticket_msg msg; - if (state != tk->state) { - tk->proposal_acknowledges = local->bitmask; - tk->retry_number = 0; - } - - tk->state = state; - init_ticket_msg(&msg, state, RLT_SUCCESS, tk); - msg.ticket.owner = htonl(get_node_id(tk->proposed_owner)); - + init_ticket_msg(&msg, cmd, res, tk); log_debug("broadcasting '%s' for ticket \"%s\"", - state_to_string(state), tk->name); - - /* Switch state after one second, if the majority says ok. */ - gettimeofday(&tk->proposal_switch, NULL); - tk->proposal_switch.tv_sec++; - + state_to_string(cmd), tk->name); return transport()->broadcast(&msg, sizeof(msg)); } static void ticket_cron(struct ticket_config *tk) { time_t now; + int rv; now = time(NULL); + R(tk); /* Has an owner, has an expiry date, and expiry date in the past? * Losing the ticket must happen in _every_ state. */ - if (tk->expires && - tk->owner && - now > tk->expires) { + if (tk->term_expires && + tk->leader && + now > tk->term_expires) { log_info("LOST ticket: \"%s\" no longer at %s", tk->name, - ticket_owner_string(tk->owner)); + ticket_leader_string(tk)); /* Couldn't renew in time - ticket lost. */ - tk->owner = NULL; disown_ticket(tk); - /* This gets us into ST_INIT again; we couldn't - * talk to a majority of sites, so we don't know - * whether somebody else has the ticket now. - * Keep asking until we know. */ - abort_proposal(tk); + + /* New vote round; §5.2 */ + if (local->type == SITE) + new_election(tk, NULL); +/* should be "always" that way + else + tk->state = ST_FOLLOWER; + */ +// abort_proposal(tk); TODO ticket_write(tk); ticket_activate_timeout(tk); /* May not try to re-acquire now, need to find out * what others think. */ return; } + R(tk); switch(tk->state) { case ST_INIT: /* Unknown state, ask others. */ - ticket_send_catchup(tk); - return; - - - case OP_COMMITTED: - case ST_STABLE: - - /* No matter whether the ticket just got lost by someone, - * or whether is wasn't active anywhere - if automatic - * acquiration is configured, try to get it active. - * Condition: - * - no owner, - * - no active proposal, - * - acquire_after has passed, - * - could activate locally. - * Now the sites can try to trump each other. */ - if (!tk->owner && - !tk->proposed_owner && - !tk->proposer && - tk->expires && - tk->acquire_after && - tk->expires + tk->acquire_after >= now && - local->type == SITE) { - if (!get_ticket_locally_if_allowed(tk)) - log_info("ACQUIRE ticket \"%s\" after timeout; ac=%d", tk->name, tk->acquire_after); - break; - } - +// ticket_send_catchup(tk); + break; - /* Are we the current owner, and do we need to refresh? - * This is not the same as above. */ - if (should_start_renewal(tk)) { - if (!get_ticket_locally_if_allowed(tk)) - log_info("RENEW ticket \"%s\"", tk->name); - /* TODO: remember when we started, and restart afresh after some retries */ + case ST_FOLLOWER: + if (tk->term_expires && + now > tk->term_expires) { + new_election(tk, NULL); } - break; - case OP_PREPARING: - PREPARE_to_PROPOSE(tk); + case ST_CANDIDATE: + /* §5.2 */ + if (now > tk->election_end) + new_election(tk, NULL); break; - case OP_PROPOSING: - PROPOSE_to_COMMIT(tk); - break; + case ST_LEADER: + if (tk->hb_sent_at + tk->timeout > now) { + /* Heartbeat timeout reached. Oops ... */ + tk->retry_number ++; + log_error("Not enough answers to heartbeat on try #%d: " + "only got %d answers (mask 0x%" PRIx64 ")!", + tk->retry_number, + count_bits(tk->hb_received), + tk->hb_received); + + /* Don't give up, though - there's still some time until leadership is lost. */ + } - case OP_PROMISING: - case OP_ACCEPTING: - case OP_RECOVERY: - case OP_REJECTED: + rv = run_handler(tk, tk->ext_verifier, 1); + if (rv) { + tk->state = ST_FOLLOWER; + tk->leader= NULL; + // resp. no owner anymore, new takers? + ticket_broadcast(tk, OP_REQ_VOTE, RLT_SUCCESS); + ticket_write(tk); + } else { + tk->term_expires = now + tk->term_duration; + send_heartbeat(tk); + // ticket_write(tk); // not correct here -- no acks received yet + } + ticket_activate_timeout(tk); break; default: break; } + R(tk); } void process_tickets(void) { struct ticket_config *tk; int i; struct timeval now; float sec_until; gettimeofday(&now, NULL); foreach_ticket(i, tk) { sec_until = timeval_to_float(tk->next_cron) - timeval_to_float(now); if (0) log_debug("ticket %s next cron %" PRIx64 ".%03d, " "now %" PRIx64 "%03d, in %f", tk->name, (uint64_t)tk->next_cron.tv_sec, timeval_msec(tk->next_cron), (uint64_t)now.tv_sec, timeval_msec(now), sec_until); if (sec_until > 0.0) continue; log_debug("ticket cron: doing %s", tk->name); /* Set next value, handler may override. * This should already be handled via the state logic; * but to be on the safe side the renew repetition is * duplicated here, too. */ set_ticket_wakeup(tk); ticket_cron(tk); } } void tickets_log_info(void) { struct ticket_config *tk; int i; foreach_ticket(i, tk) { log_info("Ticket %s: state '%s' " - "mask %" PRIx64 "/%" PRIx64 " " - "ballot %d (current %d) " + "commit index %d " + "leader \"%s\" " "expires %-24.24s", tk->name, state_to_string(tk->state), - tk->proposal_acknowledges, - booth_conf->site_bits, - tk->last_ack_ballot, tk->new_ballot, - ctime(&tk->expires)); + tk->commit_index, + ticket_leader_string(tk), + ctime(&tk->term_expires)); } } /* UDP message receiver. */ int message_recv(struct boothc_ticket_msg *msg, int msglen) { - int cmd, rv; uint32_t from; - struct booth_site *dest; + struct booth_site *source; struct ticket_config *tk; - struct booth_site *new_owner_p; - uint32_t ballot, new_owner; + struct booth_site *leader; + uint32_t leader_u; if (check_boothc_header(&msg->header, sizeof(*msg)) < 0 || msglen != sizeof(*msg)) { log_error("message receive error"); return -1; } from = ntohl(msg->header.from); - if (!find_site_by_id(from, &dest) || !dest) { + if (!find_site_by_id(from, &source) || !source) { log_error("unknown sender: %08x", from); return -1; } if (!check_ticket(msg->ticket.id, &tk)) { log_error("got invalid ticket name \"%s\" from %s", - msg->ticket.id, dest->addr_string); + msg->ticket.id, source->addr_string); return -EINVAL; } - cmd = ntohl(msg->header.cmd); - ballot = ntohl(msg->ticket.ballot); - - new_owner = ntohl(msg->ticket.owner); - if (!find_site_by_id(new_owner, &new_owner_p)) { - log_error("Message with unknown owner %x received", new_owner); + leader_u = ntohl(msg->ticket.leader); + if (!find_site_by_id(leader_u, &leader)) { + log_error("Message with unknown owner %x received", leader_u); return -EINVAL; } - switch (cmd) { - case CMD_CATCHUP: - return ticket_answer_catchup(tk, dest, msg, ballot, new_owner_p); - - case CMR_CATCHUP: - return ticket_process_catchup(tk, dest, msg, ballot, new_owner_p); - - default: - /* only used in catchup, and not even really there ?? */ - assert(ntohl(msg->header.result) == 0); - - - rv = paxos_answer(tk, dest, msg, ballot, new_owner_p); - assert((tk->proposal_acknowledges & ~booth_conf->site_bits) == 0); - return rv; - } - return 0; + return raft_answer(tk, source, leader, msg); } void set_ticket_wakeup(struct ticket_config *tk) { struct timeval tv, now; - if (tk->owner == local) { + /* At least every hour, perhaps sooner. */ + ticket_next_cron_in(tk, 3600); + + switch (tk->state) { + case ST_LEADER: + assert(tk->leader == local); gettimeofday(&now, NULL); tv = now; - tv.tv_sec = next_renewal_starts_at(tk); + tv.tv_sec = next_vote_starts_at(tk); /* If timestamp is in the past, look again in one second. */ if (timeval_compare(tv, now) <= 0) tv.tv_sec = now.tv_sec + 1; ticket_next_cron_at(tk, tv); - } else { + break; + + case ST_CANDIDATE: + assert(tk->election_end); + ticket_next_cron_at_coarse(tk, tk->election_end); + break; + + case ST_INIT: + case ST_FOLLOWER: /* If there is (or should be) some owner, check on her later on. * If no one is interested - don't care. */ - if ((tk->owner || tk->acquire_after) && + if ((tk->leader || tk->acquire_after) && (local->type == SITE)) - ticket_next_cron_in(tk, tk->expiry + tk->acquire_after); - else - ticket_next_cron_in(tk, 3600); + ticket_next_cron_at_coarse(tk, + tk->term_expires + tk->acquire_after); + break; + + default: + log_error("why here?"); } } /* Given a state (in host byte order), return a human-readable (char*). * An array is used so that multiple states can be printed in a single printf(). */ char *state_to_string(uint32_t state_ho) { union mu { cmd_request_t s; char c[5]; }; static union mu cache[6] = { { 0 } }, *cur; static int current = 0; current ++; if (current >= sizeof(cache)/sizeof(cache[0])) current = 0; cur = cache + current; cur->s = htonl(state_ho); /* Shouldn't be necessary, union array is initialized with zeroes, and * these bytes never get written. */ cur->c[4] = 0; return cur->c; } + + +int send_reject(struct booth_site *dest, struct ticket_config *tk, cmd_result_t code) +{ + struct boothc_ticket_msg msg; + + + init_ticket_msg(&msg, OP_REJECTED, code, tk); + return booth_udp_send(dest, &msg, sizeof(msg)); +} diff --git a/src/ticket.h b/src/ticket.h index 7f535bd..e0af8b2 100644 --- a/src/ticket.h +++ b/src/ticket.h @@ -1,96 +1,104 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _TICKET_H #define _TICKET_H #include #include #include #include "config.h" #define DEFAULT_TICKET_EXPIRY 600 #define DEFAULT_TICKET_TIMEOUT 10 #define DEFAULT_RETRIES 10 #define foreach_ticket(i_,t_) for(i=0; (t_=booth_conf->ticket+i, iticket_count); i++) #define foreach_node(i_,n_) for(i=0; (n_=booth_conf->site+i, isite_count); i++) int check_ticket(char *ticket, struct ticket_config **tc); int check_site(char *site, int *local); int do_grant_ticket(struct ticket_config *ticket); int revoke_ticket(struct ticket_config *ticket); int list_ticket(char **pdata, unsigned int *len); int message_recv(struct boothc_ticket_msg *msg, int msglen); int setup_ticket(void); int check_max_len_valid(const char *s, int max); int do_grant_ticket(struct ticket_config *tk); int do_revoke_ticket(struct ticket_config *tk); int find_ticket_by_name(const char *ticket, struct ticket_config **found); void set_ticket_wakeup(struct ticket_config *tk); int get_ticket_locally_if_allowed(struct ticket_config *tk); int ticket_answer_list(int fd, struct boothc_ticket_msg *msg); int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg); int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg); int ticket_broadcast_proposed_state(struct ticket_config *tk, cmd_request_t state); int ticket_write(struct ticket_config *tk); void process_tickets(void); void tickets_log_info(void); char *state_to_string(uint32_t state_ho); +int send_reject(struct booth_site *dest, struct ticket_config *tk, cmd_result_t code); +int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_result_t res); static inline void ticket_next_cron_at(struct ticket_config *tk, struct timeval when) { tk->next_cron = when; } +static inline void ticket_next_cron_at_coarse(struct ticket_config *tk, time_t when) +{ + tk->next_cron.tv_sec = when; + tk->next_cron.tv_usec = 0; +} + static inline void ticket_next_cron_in(struct ticket_config *tk, float seconds) { struct timeval tv; gettimeofday(&tv, NULL); tv.tv_sec += trunc(seconds); tv.tv_usec += (seconds - trunc(seconds)) * 1e6; ticket_next_cron_at(tk, tv); } static inline void ticket_activate_timeout(struct ticket_config *tk) { /* TODO: increase timeout when no answers */ ticket_next_cron_in(tk, tk->timeout); tk->retry_number ++; } #endif /* _TICKET_H */ diff --git a/src/transport.c b/src/transport.c index c0757fa..0c02019 100644 --- a/src/transport.c +++ b/src/transport.c @@ -1,693 +1,711 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include "booth.h" #include "inline-fn.h" #include "log.h" #include "config.h" #include "ticket.h" #include "transport.h" #define BOOTH_IPADDR_LEN (sizeof(struct in6_addr)) #define NETLINK_BUFSIZE 16384 #define SOCKET_BUFFER_SIZE 160000 #define FRAME_SIZE_MAX 10000 struct booth_site *local = NULL; static int (*deliver_fn) (void *msg, int msglen); static void parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int len) { while (RTA_OK(rta, len)) { if (rta->rta_type <= max) tb[rta->rta_type] = rta; rta = RTA_NEXT(rta,len); } } static int find_address(unsigned char ipaddr[BOOTH_IPADDR_LEN], int family, int prefixlen, int fuzzy_allowed, struct booth_site **me, int *address_bits_matched) { int i; struct booth_site *node; int bytes, bits_left, mask; unsigned char node_bits, ip_bits; uint8_t *n_a; int matched, did_match; bytes = prefixlen / 8; bits_left = prefixlen % 8; /* One bit left to check means ignore 7 lowest bits. */ mask = ~( (1 << (8 - bits_left)) -1); did_match = 0; for (i = 0; i < booth_conf->site_count; i++) { node = booth_conf->site + i; if (family != node->family) continue; n_a = node_to_addr_pointer(node); for(matched = 0; matched < node->addrlen; matched++) if (ipaddr[matched] != n_a[matched]) break; if (matched == node->addrlen) { /* Full match. */ *address_bits_matched = matched * 8; found: *me = node; did_match = 1; continue; } if (!fuzzy_allowed) continue; /* Check prefix, whole bytes */ if (matched < bytes) continue; if (matched * 8 < *address_bits_matched) continue; if (!bits_left) goto found; node_bits = n_a[bytes]; ip_bits = ipaddr[bytes]; if (((node_bits ^ ip_bits) & mask) == 0) { /* _At_least_ prefixlen bits matched. */ *address_bits_matched = prefixlen; goto found; } } return did_match; } int _find_myself(int family, struct booth_site **mep, int fuzzy_allowed); int _find_myself(int family, struct booth_site **mep, int fuzzy_allowed) { int fd; struct sockaddr_nl nladdr; struct booth_site *me; unsigned char ipaddr[BOOTH_IPADDR_LEN]; static char rcvbuf[NETLINK_BUFSIZE]; struct { struct nlmsghdr nlh; struct rtgenmsg g; } req; int address_bits_matched; if (local) goto found; me = NULL; address_bits_matched = 0; if (mep) *mep = NULL; fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE); if (fd < 0) { log_error("failed to create netlink socket"); return 0; } setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); memset(&nladdr, 0, sizeof(nladdr)); nladdr.nl_family = AF_NETLINK; memset(&req, 0, sizeof(req)); req.nlh.nlmsg_len = sizeof(req); req.nlh.nlmsg_type = RTM_GETADDR; req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST; req.nlh.nlmsg_pid = 0; req.nlh.nlmsg_seq = 1; req.g.rtgen_family = family; if (sendto(fd, (void *)&req, sizeof(req), 0, (struct sockaddr*)&nladdr, sizeof(nladdr)) < 0) { close(fd); log_error("failed to send data to netlink socket"); return 0; } while (1) { int status; struct nlmsghdr *h; struct iovec iov = { rcvbuf, sizeof(rcvbuf) }; struct msghdr msg = { (void *)&nladdr, sizeof(nladdr), &iov, 1, NULL, 0, 0 }; status = recvmsg(fd, &msg, 0); if (!status) { close(fd); log_error("failed to recvmsg from netlink socket"); return 0; } h = (struct nlmsghdr *)rcvbuf; if (h->nlmsg_type == NLMSG_DONE) break; if (h->nlmsg_type == NLMSG_ERROR) { close(fd); log_error("netlink socket recvmsg error"); return 0; } while (NLMSG_OK(h, status)) { if (h->nlmsg_type == RTM_NEWADDR) { struct ifaddrmsg *ifa = NLMSG_DATA(h); struct rtattr *tb[IFA_MAX+1]; int len = h->nlmsg_len - NLMSG_LENGTH(sizeof(*ifa)); memset(tb, 0, sizeof(tb)); parse_rtattr(tb, IFA_MAX, IFA_RTA(ifa), len); memset(ipaddr, 0, BOOTH_IPADDR_LEN); memcpy(ipaddr, RTA_DATA(tb[IFA_ADDRESS]), BOOTH_IPADDR_LEN); /* First try with exact addresses, then optionally with subnet matching. */ if (ifa->ifa_prefixlen > address_bits_matched) find_address(ipaddr, ifa->ifa_family, ifa->ifa_prefixlen, fuzzy_allowed, &me, &address_bits_matched); } h = NLMSG_NEXT(h, status); } } close(fd); if (!me) return 0; me->local = 1; local = me; found: if (mep) *mep = local; return 1; } int find_myself(struct booth_site **mep, int fuzzy_allowed) { return _find_myself(AF_INET6, mep, fuzzy_allowed) || _find_myself(AF_INET, mep, fuzzy_allowed); } /** Checks the header fields for validity. * cf. init_header(). * For @len_incl_data < 0 the length is not checked. * Return <0 if error, else bytes read. */ int check_boothc_header(struct boothc_header *h, int len_incl_data) { int l; if (h->magic != htonl(BOOTHC_MAGIC)) { log_error("magic error %x", ntohl(h->magic)); return -EINVAL; } if (h->version != htonl(BOOTHC_VERSION)) { log_error("version error %x", ntohl(h->version)); return -EINVAL; } l = ntohl(h->length); if (l < sizeof(*h)) { log_error("length %d out of range", l); return -EINVAL; } if (len_incl_data < 0) return 0; if (l != len_incl_data) { log_error("length error - got %d, wanted %d", l, len_incl_data); return -EINVAL; } return len_incl_data; } static void process_tcp_listener(int ci) { int fd, i, one = 1; socklen_t addrlen = sizeof(struct sockaddr); struct sockaddr addr; fd = accept(clients[ci].fd, &addr, &addrlen); if (fd < 0) { log_error("process_tcp_listener: accept error %d %d", fd, errno); return; } setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one)); i = client_add(fd, clients[ci].transport, process_connection, NULL); log_debug("client connection %d fd %d", i, fd); } static int setup_tcp_listener(void) { int s, rv; s = socket(local->family, SOCK_STREAM, 0); if (s == -1) { log_error("failed to create tcp socket %s", strerror(errno)); return s; } rv = bind(s, &local->sa6, local->saddrlen); if (rv == -1) { log_error("failed to bind socket %s", strerror(errno)); return rv; } rv = listen(s, 5); if (rv == -1) { log_error("failed to listen on socket %s", strerror(errno)); return rv; } return s; } static int booth_tcp_init(void * unused __attribute__((unused))) { int rv; if (get_local_id() < 0) return -1; rv = setup_tcp_listener(); if (rv < 0) return rv; client_add(rv, booth_transport + TCP, process_tcp_listener, NULL); return 0; } static int connect_nonb(int sockfd, const struct sockaddr *saptr, socklen_t salen, int sec) { int flags, n, error; socklen_t len; fd_set rset, wset; struct timeval tval; flags = fcntl(sockfd, F_GETFL, 0); fcntl(sockfd, F_SETFL, flags | O_NONBLOCK); error = 0; if ( (n = connect(sockfd, saptr, salen)) < 0) if (errno != EINPROGRESS) return -1; if (n == 0) goto done; /* connect completed immediately */ FD_ZERO(&rset); FD_SET(sockfd, &rset); wset = rset; tval.tv_sec = sec; tval.tv_usec = 0; if ((n = select(sockfd + 1, &rset, &wset, NULL, sec ? &tval : NULL)) == 0) { /* leave outside function to close */ /* timeout */ /* close(sockfd); */ errno = ETIMEDOUT; return -1; } if (FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) { len = sizeof(error); if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) return -1; /* Solaris pending error */ } else { log_error("select error: sockfd not set"); return -1; } done: fcntl(sockfd, F_SETFL, flags); /* restore file status flags */ if (error) { /* leave outside function to close */ /* close(sockfd); */ errno = error; return -1; } return 0; } int booth_tcp_open(struct booth_site *to) { int s, rv; if (to->tcp_fd >= STDERR_FILENO) goto found; s = socket(to->family, SOCK_STREAM, 0); if (s == -1) { log_error("cannot create socket of family %d", to->family); return -1; } rv = connect_nonb(s, (struct sockaddr *)&to->sa6, to->saddrlen, 10); if (rv == -1) { if( errno == ETIMEDOUT) log_error("connect to \"%s\" got a timeout", to->addr_string); else log_error("connect to \"%s\" got an error: %s", to->addr_string, strerror(errno)); goto error; } to->tcp_fd = s; found: return 1; error: if (s >= 0) close(s); return -1; } int booth_tcp_send(struct booth_site *to, void *buf, int len) { return do_write(to->tcp_fd, buf, len); } static int booth_tcp_recv(struct booth_site *from, void *buf, int len) { int got; /* Needs timeouts! */ got = do_read(from->tcp_fd, buf, len); if (got < 0) return got; return len; } static int booth_tcp_close(struct booth_site *to) { if (to) { if (to->tcp_fd > STDERR_FILENO) close(to->tcp_fd); to->tcp_fd = -1; } return 0; } static int booth_tcp_exit(void) { return 0; } int setup_udp_server(int try_only) { int rv, fd; unsigned int recvbuf_size; fd = socket(local->family, SOCK_DGRAM, 0); if (fd == -1) { log_error("failed to create UDP socket %s", strerror(errno)); goto ex; } rv = fcntl(fd, F_SETFL, O_NONBLOCK); if (rv == -1) { log_error("failed to set non-blocking operation " "on UDP socket: %s", strerror(errno)); goto ex; } rv = bind(fd, (struct sockaddr *)&local->sa6, local->saddrlen); if (try_only) { rv = (rv == -1) ? errno : 0; close(fd); return rv; } if (rv == -1) { log_error("failed to bind UDP socket to [%s]:%d: %s", local->addr_string, booth_conf->port, strerror(errno)); goto ex; } recvbuf_size = SOCKET_BUFFER_SIZE; rv = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, sizeof(recvbuf_size)); if (rv == -1) { log_error("failed to set recvbuf size"); goto ex; } local->udp_fd = fd; return 0; ex: if (fd >= 0) close(fd); return -1; } /* Receive/process callback for UDP */ static void process_recv(int ci) { struct sockaddr_storage sa; int rv; socklen_t sa_len; char buffer[256]; /* Used for unit tests */ struct boothc_ticket_msg *msg; sa_len = sizeof(sa); msg = (void*)buffer; rv = recvfrom(clients[ci].fd, buffer, sizeof(buffer), MSG_NOSIGNAL | MSG_DONTWAIT, (struct sockaddr *)&sa, &sa_len); if (rv == -1) return; deliver_fn(msg, rv); } static int booth_udp_init(void *f) { int rv; rv = setup_udp_server(0); if (rv < 0) return rv; deliver_fn = f; client_add(local->udp_fd, booth_transport + UDP, process_recv, NULL); return 0; } int booth_udp_send(struct booth_site *to, void *buf, int len) { int rv; rv = sendto(local->udp_fd, buf, len, MSG_NOSIGNAL, (struct sockaddr *)&to->sa6, to->saddrlen); + if (rv == len) { + rv = 0; + } else if (rv < 0) { + rv = errno; + log_error("Cannot send to \"%s\": %d %s", + to->addr_string, + errno, + strerror(errno)); + } else { + rv = EBUSY; + log_error("Packet sent to \"%s\" got truncated", + to->addr_string); + } + return rv; } static int booth_udp_broadcast(void *buf, int len) { - int i; + int i, rv, rvs; struct booth_site *site; if (!booth_conf || !booth_conf->site_count) return -1; + rvs = 0; foreach_node(i, site) { - if (site != local) - booth_udp_send(site, buf, len); + if (site != local) { + rv = booth_udp_send(site, buf, len); + if (!rvs) + rvs = rv; + } } - return 0; + return rvs; } static int booth_udp_exit(void) { return 0; } /* SCTP transport layer has not been developed yet */ static int booth_sctp_init(void *f __attribute__((unused))) { return 0; } static int booth_sctp_send(struct booth_site * to __attribute__((unused)), void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int booth_sctp_broadcast(void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int return_0_booth_site(struct booth_site *v __attribute((unused))) { return 0; } static int return_0(void) { return 0; } const struct booth_transport booth_transport[TRANSPORT_ENTRIES] = { [TCP] = { .name = "TCP", .init = booth_tcp_init, .open = booth_tcp_open, .send = booth_tcp_send, .recv = booth_tcp_recv, .close = booth_tcp_close, .exit = booth_tcp_exit }, [UDP] = { .name = "UDP", .init = booth_udp_init, .open = return_0_booth_site, .send = booth_udp_send, .close = return_0_booth_site, .broadcast = booth_udp_broadcast, .exit = booth_udp_exit }, [SCTP] = { .name = "SCTP", .init = booth_sctp_init, .open = return_0_booth_site, .send = booth_sctp_send, .broadcast = booth_sctp_broadcast, .exit = return_0, } }; const struct booth_transport *local_transport = booth_transport+TCP; int send_header_only(int fd, struct boothc_header *hdr) { int rv; rv = do_write(fd, hdr, sizeof(*hdr)); return rv; } int send_ticket_msg(int fd, struct boothc_ticket_msg *msg) { int rv; rv = do_write(fd, msg, sizeof(*msg)); return rv; } int send_header_plus(int fd, struct boothc_header *hdr, void *data, int len) { int rv; int l; if (data == hdr->data) { l = sizeof(*hdr) + len; assert(l == ntohl(hdr->length)); /* One struct */ rv = do_write(fd, hdr, l); } else { /* Header and data in two locations */ rv = send_header_only(fd, hdr); if (rv >= 0 && len) rv = do_write(fd, data, len); } return rv; } diff --git a/unit-tests/001_init-catchup.txt b/unit-tests/001_init-catchup.txt deleted file mode 100644 index 1cdc308..0000000 --- a/unit-tests/001_init-catchup.txt +++ /dev/null @@ -1,34 +0,0 @@ -# vim: ft=sh et : - - -ticket: - state ST_INIT - last_ack_ballot 1 - new_ballot 2012 - -# No message0 -# expecting catchup query - -# outgoing packet: expect this data -outgoing0: - header.cmd CMD_CATCHUP - header.result RLT_SUCCESS - - -# should be accepted -message2: # valid CMR_CATCHUP - header.cmd CMR_CATCHUP - header.result RLT_SUCCESS - header.from booth_conf->site[2].site_id - ticket.ballot 2062 - ticket.prev_ballot 2052 - ticket.owner -1 - -# nothing goes out - - -# after a delay, check final state -finally: -# should be overwritten - last_ack_ballot 2052 - new_ballot 2062 diff --git a/unit-tests/001_init-get-heartbeat.txt b/unit-tests/001_init-get-heartbeat.txt new file mode 100644 index 0000000..fa79a06 --- /dev/null +++ b/unit-tests/001_init-get-heartbeat.txt @@ -0,0 +1,25 @@ +# vim: ft=sh et : + + +ticket: + state ST_FOLLOWER + current_term 1 + leader 0 + +# should be accepted +message0: # valid heartbeat + header.cmd OP_HEARTBEAT + header.result RLT_SUCCESS + header.from booth_conf->site[2].site_id + ticket.leader booth_conf->site[2].site_id + ticket.term_valid_for 3 + ticket.term 20 + +# nothing goes out + + +# after a delay, check final state +finally: +# should be overwritten + current_term 20 + leader booth_conf->site+2 diff --git a/unit-tests/002_bad_packets.txt b/unit-tests/002_bad_packets.txt index 78e7c62..8a70805 100644 --- a/unit-tests/002_bad_packets.txt +++ b/unit-tests/002_bad_packets.txt @@ -1,82 +1,70 @@ # vim: ft=sh et : # # This test is mostly concerned with ignoring invalid packets. -# We're expecting catchup queries. +# We're expecting heartbeat packets. ticket: - state ST_INIT - last_ack_ballot 1 - new_ballot 12 + state ST_LEADER + leader local + current_term 500 # defaults message: - header.cmd CMR_CATCHUP - ticket.ballot 99 - ticket.prev_ballot 98 - header.from booth_conf->site[2].site_id + header.cmd OP_HEARTBEAT + ticket.term 500 + #header.from booth_conf->site[2].site_id + header.from local->site_id header.result 0 message0: # bad result code header.result 243521741 outgoing0: - state ST_INIT + state ST_LEADER message1: # bad sender header.from 71 outgoing1: - state ST_INIT + state ST_LEADER message2: # bad version header.version 512 outgoing2: - state ST_INIT + state ST_LEADER message3: # bad magic header.version 31 outgoing3: - state ST_INIT + state ST_LEADER message4: # bad length header.length 16 outgoing4: - state ST_INIT + state ST_LEADER message5: # bad ticket ID ticket.id "gibtsnich" outgoing5: - state ST_INIT - - -## TODO: ballot number ranges, discarding invalid. -## see assert in ballot_is_higher_than() -##message6: # bad ballot number -## ticket.ballot -15 + (1 << 29) -## -##outgoing6: -## state ST_INIT + state ST_LEADER message100: # should work - ticket.ballot 199 - ticket.prev_ballot 198 + ticket.term 510 # no outgoing message finally: - state ST_STABLE - new_ballot 199 - last_ack_ballot 198 - + state ST_FOLLOWER + current_term 510 diff --git a/unit-tests/003_pacemaker.txt b/unit-tests/003_pacemaker.txt index 41abea8..2b039a7 100644 --- a/unit-tests/003_pacemaker.txt +++ b/unit-tests/003_pacemaker.txt @@ -1,24 +1,22 @@ # vim: ft=sh et : # # Checks whether Pacemaker gets correct command lines. ticket: - state ST_STABLE - last_ack_ballot 40 - new_ballot 50 + state ST_FOLLOWER + current_term 40 + term_expires time(0) + 30 message0: - header.cmd OP_COMMITTED + header.cmd OP_HEARTBEAT header.from booth_conf->site[2].site_id header.result 0 - ticket.ballot 99 -# must match last_ack_ballot above - ticket.prev_ballot 40 - ticket.owner booth_conf->site[2].site_id + ticket.term 44 + ticket.leader booth_conf->site[2].site_id #getenv("UNIT_TEST_AUX") "ballot 99 owner +" finally: - last_ack_ballot 99 + current_term 44 diff --git a/unit-tests/010_retries.txt b/unit-tests/010_retries.txt index 95f0515..9eaeabf 100644 --- a/unit-tests/010_retries.txt +++ b/unit-tests/010_retries.txt @@ -1,52 +1,62 @@ # vim: ft=sh et : # # Testing whether retries are sent, and if they're stopped again. ticket: - state ST_STABLE - last_ack_ballot 40 - new_ballot 50 - owner local - retries 6 + state ST_LEADER + current_term 40 + leader local + retries 10000 # needed so that heartbeats are sent _now_ timeout 1 # may keep ticket all the time - expiry 3000 + term_duration 3000 # but shall start renewal now - expires time(0) + 1000 + term_expires time(0) + 1000 outgoing0: - header.cmd OP_PREPARING + header.cmd OP_HEARTBEAT + ticket.term 40 outgoing1: - header.cmd OP_PREPARING + header.cmd OP_HEARTBEAT + ticket.term 40 outgoing2: - header.cmd OP_PREPARING + header.cmd OP_HEARTBEAT + ticket.term 40 outgoing3: - header.cmd OP_PREPARING + header.cmd OP_HEARTBEAT + ticket.term 40 +# yes, you're the leader. message4: - header.cmd OP_PROMISING + header.cmd OP_HEARTBEAT header.from booth_conf->site[2].site_id header.result 0 - ticket.prev_ballot 45 - ticket.ballot booth_conf->ticket[0].new_ballot + ticket.term 40 + ticket.leader local->site_id + +# doesn't stop ... there is no retry limit outgoing5: - header.cmd OP_PROPOSING + header.cmd OP_HEARTBEAT outgoing6: - header.cmd OP_PROPOSING + header.cmd OP_HEARTBEAT outgoing7: - header.cmd OP_PROPOSING + header.cmd OP_HEARTBEAT outgoing8: - header.cmd OP_PROPOSING + header.cmd OP_HEARTBEAT outgoing9: - header.cmd OP_PROPOSING + header.cmd OP_HEARTBEAT outgoing10: - header.cmd OP_PROPOSING + header.cmd OP_HEARTBEAT -# Now retry counter should trigger, and restart paxos -outgoing11: - header.cmd OP_PREPARING +# Now term expires +ticket11: + term_expires time(0) - 1 + +# no outgoing message, gets to be follower +finally: + state ST_FOLLOWER diff --git a/unit-tests/020_ext-verifier.txt b/unit-tests/020_ext-verifier.txt index 227a7c5..73fa442 100644 --- a/unit-tests/020_ext-verifier.txt +++ b/unit-tests/020_ext-verifier.txt @@ -1,54 +1,52 @@ # vim: ft=sh et : # # Testing whether the external verifier (before-acquire-handler) # is obeyed. ticket: name "tick1" - state ST_STABLE - last_ack_ballot 40 - new_ballot 50 - owner local - retries 6 - timeout 1 + state ST_LEADER + current_term 40 + leader local # may keep ticket all the time - expiry 3000 + term_duration 3000 # but shall start renewal now - expires time(0) + 1000 + term_expires time(0) + 1000 ext_verifier "test `set|grep ^BOOTH|wc -l` -ge 5" + hb_sent_at time(0) - 10 outgoing0: - header.cmd OP_PREPARING + header.cmd OP_HEARTBEAT ticket1: ext_verifier 'test "$BOOTH_TICKET" == "tick1"' # cause re-query of the verifier - state ST_STABLE + hb_sent_at time(0) - 10 # #gdb1: # break ticket_broadcast_proposed_state § commands § bt § c § end outgoing1: - header.cmd OP_PREPARING + header.cmd OP_HEARTBEAT # now say that we may not have it anymore. ticket2: ext_verifier 'test "$BOOTH_TICKET" == "tick2FOO"' # cause re-query of the verifier - state ST_STABLE + hb_sent_at time(0) - 10 # We just tell the others we don't have it anymore. outgoing2: - header.cmd OP_COMMITTED - ticket.owner -1 + header.cmd OP_REQ_VOTE + ticket.leader -1 finally: - state ST_STABLE - owner NULL + state ST_FOLLOWER + leader NULL diff --git a/unit-tests/060_catchup_same_owner.txt b/unit-tests/060_catchup_same_owner.txt index 56fce2e..73899cd 100644 --- a/unit-tests/060_catchup_same_owner.txt +++ b/unit-tests/060_catchup_same_owner.txt @@ -1,37 +1,38 @@ # vim: ft=sh et : -# We've got the ticket; on catchup, peer agrees with us re. owner, but has a -# higher ballot number. +# We've got the ticket; a peer agrees with us re. owner, but has a +# higher term number. # We must not lose the ticket. ticket: - state ST_INIT - last_ack_ballot 100 - new_ballot 100 - owner local - expires time(0) + 35 + state ST_LEADER + current_term 100 + leader local + term_expires time(0) + 35 + term_duration 3000 + retries 6 + timeout 1 + hb_sent_at time(0) - 2000 + gdb0: - watch booth_conf->ticket[0].owner § commands § bt § c § end + watch booth_conf->ticket[0].leader § commands § bt § c § end # No message0 outgoing0: - header.cmd CMD_CATCHUP + header.cmd OP_HEARTBEAT header.result RLT_SUCCESS - ticket.ballot 100 - ticket.prev_ballot 100 - ticket.owner local->site_id + ticket.term 100 + ticket.leader local->site_id -message1: # catchup, same owner - header.cmd CMR_CATCHUP +message1: # same owner + header.cmd OP_HEARTBEAT header.result RLT_SUCCESS header.from booth_conf->site[2].site_id - ticket.expiry 5 - ticket.ballot 120 - ticket.prev_ballot 120 - ticket.owner local->site_id + ticket.term 110 + ticket.leader local->site_id finally: - owner local + leader local diff --git a/unit-tests/_defaults.txt b/unit-tests/_defaults.txt index 80618d7..55e8875 100644 --- a/unit-tests/_defaults.txt +++ b/unit-tests/_defaults.txt @@ -1,52 +1,55 @@ # vim: ft=sh et : # ticket defaults, mostly set via config file. ticket: name "ticket" ## these would matter if testing via GDB had high latencies #expiry 60 #timeout 10 acquire_after 0 # defaults for all tests state ST_INIT next_cron 0 # time(0)+1 # local is site[0] per convention - owner booth_conf->site+1 - expires time(0)+1 - last_ack_ballot 242 + leader booth_conf->site+1 + #owner booth_conf->site+1 + #expires time(0)+1 + term_expires time(0)+1 + #last_ack_ballot 242 - proposer 0 - proposed_owner 0 - new_ballot 0 - proposal_acknowledges 0 - retry_number 0 + leader 0 + #proposer 0 + #proposed_owner 0 + #new_ballot 0 + #proposal_acknowledges 0 + #retry_number 0 # defaults for input message. # sender is a peer, and it wants something. message: ticket.id "ticket" # invalid by default header.cmd -1 # invalid by default header.result 1 # invalid by default header.from -1 header.version BOOTHC_VERSION header.magic BOOTHC_MAGIC header.length sizeof(struct boothc_ticket_msg) - ticket.owner -1 - ticket.ballot 0 - ticket.prev_ballot 0 - ticket.expiry 0 + ticket.leader -1 + ticket.term 0 + ticket.term_valid_for 0 + ticket.leader_commit -1